GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/divideconquer.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_DIVIDECONQUER_H
22 #define GRPPI_NATIVE_DIVIDECONQUER_H
23 
25 
26 #include <thread>
27 #include <atomic>
28 
29 namespace grppi {
30 
31 template <typename Input, typename Divider, typename Solver, typename Combiner>
32 typename std::result_of<Solver(Input)>::type
34  Input & input,
35  Divider && divide_op, Solver && solve_op,
36  Combiner && combine_op,
37  std::atomic<int> & num_threads)
38 {
39  // Sequential execution fo internal implementation
40  using Output = typename std::result_of<Solver(Input)>::type;
42  Output out;
43  if(num_threads.load()>0){
44  auto subproblems = divide_op(input);
45 
46  if(subproblems.size()>1){
47  std::vector<Output> partials(subproblems.size()-1);
48 // num_threads -= subproblems.size();
49  int division = 0;
50  std::vector<std::thread> tasks;
51  auto i = subproblems.begin();
52  for(i = subproblems.begin()+1; i != subproblems.end() && num_threads.load()>0 ; i++, division++){
53  //THREAD
54  tasks.emplace_back([&](auto i, int division) {
55  auto manager = p.thread_manager();
56 
57  partials[division] = internal_divide_conquer(p, *i,
58  std::forward<Divider>(divide_op),
59  std::forward<Solver>(solve_op),
60  std::forward<Combiner>(combine_op),
61  num_threads);
62 
63  }, i, division);
64 
65  num_threads--;
66  //END TRHEAD
67  }
68 
69  for(i; i != subproblems.end(); i++){
70  partials[division] = divide_conquer(seq,*i,
71  std::forward<Divider>(divide_op),
72  std::forward<Solver>(solve_op),
73  std::forward<Combiner>(combine_op));
74  }
75  //Main thread works on the first subproblem.
76 
77  out = internal_divide_conquer(p, *subproblems.begin(),
78  std::forward<Divider>(divide_op),
79  std::forward<Solver>(solve_op),
80  std::forward<Combiner>(combine_op), num_threads);
81  //JOIN
82  for(int i=0; i< tasks.size(); i++){
83  tasks[i].join();
84  }
85 
86  for(int i = 0; i<partials.size();i++){ // MarcoA - this is moved to the user code
87  out =combine_op(out,partials[i]);
88  }
89  }else{
90  out = solve_op(input);
91  }
92  }else{
93  return divide_conquer(seq, input,
94  std::forward<Divider>(divide_op),
95  std::forward<Solver>(solve_op),
96  std::forward<Combiner>(combine_op));
97  }
98  return out;
99 }
100 
122 template <typename Input, typename Divider, typename Solver, typename Combiner>
123 typename std::result_of<Solver(Input)>::type
125  Input & problem,
126  Divider && divide_op, Solver && solve_op,
127  Combiner && combine_op)
128 {
129 
130  // Sequential execution fo internal implementation
132  std::atomic<int> num_threads{ex.concurrency_degree()-1};
133 
134  if (num_threads.load()>0) {
135  return divide_conquer(seq, problem,
136  std::forward<Divider>(divide_op),
137  std::forward<Solver>(solve_op),
138  std::forward<Combiner>(combine_op));
139  }
140 
141  auto subproblems = divide_op(problem);
142 
143  if(subproblems.size()>1) {
144  using Output = typename std::result_of<Solver(Input)>::type;
145  std::vector<Output> partials(subproblems.size()-1);
146  int division = 0;
147  std::vector<std::thread> tasks;
148  auto i = subproblems.begin();
149  for(i = subproblems.begin()+1; i != subproblems.end() && num_threads.load()>0; i++, division++) {
150  //THREAD
151  tasks.emplace_back([&](auto i, int division) {
152  auto manager = ex.thread_manager();
153  partials[division] = internal_divide_conquer(ex, *i,
154  std::forward<Divider>(divide_op),
155  std::forward<Solver>(solve_op),
156  std::forward<Combiner>(combine_op),
157  num_threads);
158  }, i, division);
159 
160  num_threads--;
161  //END TRHEAD
162  }
163 
164  for(i; i != subproblems.end(); i++) {
165  partials[division] = divide_conquer(seq, *i,
166  std::forward<Divider>(divide_op),
167  std::forward<Solver>(solve_op),
168  std::forward<Combiner>(combine_op));
169  }
170 
171  //Main thread works on the first subproblem.
172  Output out = internal_divide_conquer(ex, *subproblems.begin(),
173  std::forward<Divider>(divide_op),
174  std::forward<Solver>(solve_op),
175  std::forward<Combiner>(combine_op),
176  num_threads);
177 
178 
179  //JOIN
180  for (auto && t : tasks) { t.join(); }
181 
182  for (auto && p : partials) { out = combine_op(out,p); }
183 
184  return out;
185  }
186  else {
187  return solve_op(problem);
188  }
189 }
190 
196 }
197 
198 #endif
Definition: callable_traits.h:24
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
std::result_of< Solver(Input)>::type divide_conquer(parallel_execution_native &ex, Input &problem, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op)
Invoke Divide/conquer pattern with native parallel execution.
Definition: native/divideconquer.h:124
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
Sequential execution policy.
Definition: sequential_execution.h:31
std::result_of< Solver(Input)>::type internal_divide_conquer(parallel_execution_native &p, Input &input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op, std::atomic< int > &num_threads)
Definition: native/divideconquer.h:33