GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
omp/pipeline.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_OMP_PIPELINE_H
22 #define GRPPI_OMP_PIPELINE_H
23 
24 #ifdef GRPPI_OMP
25 
26 #include "parallel_execution_omp.h"
27 
28 #include <experimental/optional>
29 
30 namespace grppi {
31 
32 //Last stage
33 template <typename InQueue, typename Consumer>
34 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
35  Consumer && consume_op)
36 {
37  using namespace std;
38  using gen_value_type = typename InQueue::value_type;
39 
40  if (ex.is_ordered()){
41  vector<gen_value_type> elements;
42  long current = 0;
43  auto item = input_queue.pop( );
44  while (item.first) {
45  if (current == item.second) {
46  consume_op(*item.first);
47  current ++;
48  }
49  else {
50  elements.push_back(item);
51  }
52  for (auto it=elements.begin(); it!=elements.end(); it++) {
53  if (it->second == current) {
54  consume_op(*it->first);
55  elements.erase(it);
56  current++;
57  break;
58  }
59  }
60  item = input_queue.pop( );
61  }
62  while(elements.size()>0){
63  for(auto it = elements.begin(); it != elements.end(); it++){
64  if(it->second == current) {
65  consume_op(*it->first);
66  elements.erase(it);
67  current++;
68  break;
69  }
70  }
71  }
72  }
73  else {
74  auto item = input_queue.pop();
75  while (item.first) {
76  consume_op(*item.first);
77  item = input_queue.pop();
78  }
79  }
80  //End task
81 }
82 
83 
84 //Reduction composition
85 template <typename Combiner, typename Identity, typename InQueue, typename ...MoreTransformers>
86 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
87  reduction_info<parallel_execution_omp, Combiner, Identity> & reduction_obj, MoreTransformers ...more_transform_ops)
88 {
89  using reduction_type =
91 
92  pipeline_impl(ex, input_queue, std::forward<reduction_type&&>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
93 }
94 
95 template <typename Combiner, typename Identity, typename InQueue, typename ...MoreTransformers>
96 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
97  reduction_info<parallel_execution_omp, Combiner, Identity> && reduction_obj, MoreTransformers ...more_transform_ops)
98 {
99  using reduction_type =
101  pipeline_impl_ordered(ex, input_queue, std::forward<reduction_type>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
102 }
103 
104 template <typename Combiner, typename Identity, typename InQueue, typename ...MoreTransformers>
105 void pipeline_impl_ordered(parallel_execution_omp & ex, InQueue & input_queue,
106  reduction_info<parallel_execution_omp,Combiner,Identity> && reduction_obj, MoreTransformers ...more_transform_ops)
107 {
108  using namespace std;
109  using namespace std::experimental;
110  using gen_value_type = typename InQueue::value_type;
111  using input_value_type = typename gen_value_type::first_type::value_type;
112 
113  using result_value_type = typename result_of<Combiner(input_value_type, input_value_type)>::type;
114  using result_type = pair<optional<result_value_type>, long>;
115 
116  auto output_queue = ex.make_queue<result_type>();
117 
118  #pragma omp task shared(output_queue, input_queue, reduction_obj)
119  {
120  vector<input_value_type> values;
121  long out_order=0;
122  auto item {input_queue.pop()};
123  for(;;){
124  while (item.first && values.size() != reduction_obj.window_size) {
125  values.push_back(*item.first);
126  item = input_queue.pop();
127  }
128  if (values.size() > 0) {
129  auto reduced_value = reduce(reduction_obj.exectype, values.begin(), values.end(), reduction_obj.identity,
130  std::forward<Combiner>(reduction_obj.combine_op));
131  output_queue.push({{reduced_value}, out_order});
132  out_order++;
133  if (item.first) {
134  if (reduction_obj.offset <= reduction_obj.window_size) {
135  values.erase(values.begin(), values.begin() + reduction_obj.offset);
136  }
137  else {
138  values.erase(values.begin(), values.end());
139  auto diff = reduction_obj.offset - reduction_obj.window_size;
140  while (diff > 0 && item.first) {
141  item = input_queue.pop();
142  diff--;
143  }
144  }
145  }
146  }
147  if (!item.first) break;
148  }
149  output_queue.push({{},-1});
150  }
151 
152  pipeline_impl(ex, output_queue, forward<MoreTransformers>(more_transform_ops) ... );
153  #pragma omp taskwait
154 }
155 
156 
157 
158 
159 template <typename Transformer, typename InQueue, typename... MoreTransformers>
160 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
162  MoreTransformers && ... more_transform_ops)
163 {
165 
166  pipeline_impl(ex,input_queue, std::forward<filter_type>(filter_obj),
167  std::forward<MoreTransformers>(more_transform_ops)...) ;
168 }
169 
170 template <typename Transformer, typename InQueue,
171  typename... MoreTransformers>
173  InQueue & input_queue,
175  MoreTransformers && ... more_transform_ops)
176 {
177  using namespace std;
178  using gen_value_type = typename InQueue::value_type;
179  using input_value_type = typename gen_value_type::first_type;
180  auto tmp_queue = ex.make_queue<gen_value_type>();
181 
182  atomic<int> done_threads{0};
183  for(int th = 0; th<filter_obj.exectype.concurrency_degree(); th++) {
184  #pragma omp task shared(tmp_queue,filter_obj,input_queue,done_threads)
185  {
186  auto item{input_queue.pop()};
187  while (item.first) {
188  if(filter_obj.task(*item.first)) {
189  tmp_queue.push(item);
190  }
191  else {
192  tmp_queue.push(make_pair(input_value_type{} ,item.second));
193  }
194  item = input_queue.pop();
195  }
196  done_threads++;
197  if (done_threads==filter_obj.exectype.concurrency_degree()) {
198  tmp_queue.push (make_pair(input_value_type{}, -1));
199  }
200  else {
201  input_queue.push(item);
202  }
203  }
204  }
205 
206  auto output_queue = ex.make_queue<gen_value_type>();
207  #pragma omp task shared (output_queue,tmp_queue)
208  {
209  vector<gen_value_type> elements;
210  int current = 0;
211  long order = 0;
212  auto item = tmp_queue.pop();
213  for (;;) {
214  if (!item.first && item.second == -1) break;
215  if (item.second == current) {
216  if (item.first) {
217  output_queue.push(make_pair(item.first, order++));
218  }
219  current++;
220  }
221  else {
222  elements.push_back(item);
223  }
224  for (auto it=elements.begin(); it<elements.end(); it++) {
225  if ((*it).second==current) {
226  if((*it).first){
227  output_queue.push(make_pair((*it).first,order++));
228  }
229  elements.erase(it);
230  current++;
231  break;
232  }
233  }
234  item = tmp_queue.pop();
235  }
236  while (elements.size()>0) {
237  for (auto it=elements.begin(); it<elements.end(); it++) {
238  if ((*it).second == current) {
239  if((*it).first) {
240  output_queue.push(make_pair((*it).first,order++));
241  }
242  elements.erase(it);
243  current++;
244  break;
245  }
246  }
247  }
248  output_queue.push(item);
249  }
250  pipeline_impl(ex, output_queue,
251  forward<MoreTransformers>(more_transform_ops)...);
252  #pragma omp taskwait
253 }
254 
255 template <typename Transformer, typename InQueue,typename... MoreTransformers>
256 void pipeline_impl_unordered(parallel_execution_omp & ex, InQueue & input_queue,
258  MoreTransformers && ... more_transform_ops)
259 {
260  using gen_value_type = typename InQueue::value_type;
261  using input_value_type = typename gen_value_type::first_type;
262  auto output_queue = ex.make_queue<gen_value_type>();
263 
264  std::atomic<int> done_threads{0};
265  for (int th=0; th<farm_obj.exectype.concurrency_degree(); th++) {
266  #pragma omp task shared(output_queue,farm_obj,input_queue,done_threads)
267  {
268  auto item = input_queue.pop( ) ;
269  while (item.first) {
270  if (farm_obj.task(*item.first)) {
271  output_queue.push(item);
272  }
273  item = input_queue.pop();
274  }
275  done_threads++;
276  if (done_threads==farm_obj.exectype.concurrency_degree()) {
277  output_queue.push(make_pair(input_value_type{}, -1));
278  }
279  else {
280  input_queue.push(item);
281  }
282  }
283  }
284  pipeline_impl(ex, output_queue,
285  std::forward<MoreTransformers>(more_transform_ops)...);
286  #pragma omp taskwait
287 }
288 
289 template <typename Transformer, typename InQueue,typename... MoreTransformers>
290 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
292  MoreTransformers && ... more_transform_ops)
293 {
295 
296  if (ex.is_ordered()) {
297  pipeline_impl_ordered(ex, input_queue,
298  std::forward<filter_type>(filter_obj),
299  std::forward<MoreTransformers>(more_transform_ops)...);
300  }
301  else {
302  pipeline_impl_unordered(ex, input_queue,
303  std::forward<filter_type>(filter_obj),
304  std::forward<MoreTransformers>(more_transform_ops)...);
305  }
306 }
307 
308 
309 template <typename Transformer, typename InQueue,typename... MoreTransformers>
310 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
312  MoreTransformers && ... more_transform_ops)
313 {
315  pipeline_impl(ex, input_queue, std::forward<farm_type>(farm_obj),
316  std::forward<MoreTransformers>(more_transform_ops)...) ;
317 }
318 
319 template <typename Transformer, typename InQueue,typename... MoreTransformers>
320 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
322  MoreTransformers && ... sgs )
323 {
324  using namespace std;
325  using gen_value_type = typename InQueue::value_type;
326  using input_value_type = typename gen_value_type::first_type::value_type;
327  using result_type = typename result_of<Transformer(input_value_type)>::type;
328  using output_value_type = experimental::optional<result_type>;
329  using output_type = pair<output_value_type,long>;
330 
331  auto output_queue = ex.make_queue<output_type>();
332  atomic<int> done_threads{0};
333  for (int th=0; th<farm_obj.exectype.concurrency_degree(); th++) {
334  #pragma omp task shared(done_threads,output_queue,farm_obj,input_queue)
335  {
336  auto item = input_queue.pop();
337  while (item.first) {
338  auto out = output_value_type{farm_obj.task(*item.first)};
339  output_queue.push(make_pair(out,item.second));
340  item = input_queue.pop();
341  }
342  input_queue.push(item);
343  done_threads++;
344  if (done_threads==farm_obj.exectype.concurrency_degree()) {
345  output_queue.push(make_pair(output_value_type{}, -1));
346  }
347  }
348  }
349  pipeline_impl(ex, output_queue, forward<MoreTransformers>(sgs) ... );
350  #pragma omp taskwait
351 }
352 
353 //Intermediate stages
354 template <typename Transformer, typename InQueue,typename ... MoreTransformers>
355 void pipeline_impl(parallel_execution_omp & ex, InQueue & input_queue,
356  Transformer && transform_op,
357  MoreTransformers && ... more_transform_ops)
358 {
359  using namespace std;
360  using gen_value_type = typename InQueue::value_type;
361  using input_value_type = typename gen_value_type::first_type::value_type;
362  using result_type = typename result_of<Transformer(input_value_type)>::type;
363  using output_value_type = experimental::optional<result_type>;
364  using output_type = pair<output_value_type,long>;
365  auto output_queue = ex.make_queue<output_type>();
366 
367  //Start task
368  #pragma omp task shared(transform_op, input_queue, output_queue)
369  {
370  auto item = input_queue.pop();
371  while (item.first) {
372  auto out = output_value_type{transform_op(*item.first)};
373  output_queue.push(make_pair(out, item.second));
374  item = input_queue.pop() ;
375  }
376  output_queue.push(make_pair(output_value_type{}, -1));
377  }
378  //End task
379 
380  pipeline_impl(ex, output_queue,
381  forward<MoreTransformers>(more_transform_ops)...);
382 }
383 
402 template <typename Generator, typename ... Transformers,
404 void pipeline(parallel_execution_omp & ex, Generator && generate_op,
405  Transformers && ... transform_ops)
406 {
407  using namespace std;
408 
409  using result_type = typename result_of<Generator()>::type;
410  auto output_queue = ex.make_queue<pair<result_type,long>>();
411 
412  #pragma omp parallel
413  {
414  #pragma omp single nowait
415  {
416  #pragma omp task shared(generate_op,output_queue)
417  {
418  long order = 0;
419  for (;;) {
420  auto item = generate_op();
421  output_queue.push(make_pair(item,order++)) ;
422  if (!item) break;
423  }
424  }
425  pipeline_impl(ex, output_queue,
426  forward<Transformers>(transform_ops)...);
427  #pragma omp taskwait
428  }
429  }
430 }
431 
437 }
438 
439 #endif
440 
441 #endif
Definition: callable_traits.h:24
void pipeline_impl_ordered(parallel_execution_native &ex, InQueue &input_queue, reduction_info< parallel_execution_native, Combiner, Identity > &&reduction_obj, MoreTransformers...more_transform_ops)
Definition: native/pipeline.h:171
Definition: patterns.h:61
Definition: patterns.h:38
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_omp.h:102
typename std::enable_if_t<!internal::has_arguments< F >(), int > requires_no_arguments
Definition: callable_traits.h:86
STL namespace.
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
void pipeline_impl_unordered(parallel_execution_native &ex, InQueue &input_queue, filter_info< parallel_execution_native, Transformer > &&filter_obj, MoreTransformers &&...more_transform_ops)
Definition: native/pipeline.h:327
Definition: patterns.h:51
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51