GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/pipeline.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_PIPELINE_H
22 #define GRPPI_NATIVE_PIPELINE_H
23 
25 #include "../common/pack_traits.h"
26 #include "../common/callable_traits.h"
27 
28 #include <thread>
29 #include <experimental/optional>
30 
31 namespace grppi{
32 
33 template <typename InQueue, typename OutQueue, int Index,
34  typename ... MoreTransformers,
35  internal::requires_index_last<Index,MoreTransformers...> = 0>
36 void composed_pipeline(InQueue & input_queue,
38  OutQueue & output_queue, std::vector<std::thread> & tasks)
39 {
40  composed_pipeline(pipe.exectype, input_queue,
41  std::get<Index>(pipe.stages), output_queue, tasks);
42 }
43 
44 
45 template <typename InQueue, typename OutQueue, int Index,
46  typename ... MoreTransformers,
47  internal::requires_index_not_last<Index,MoreTransformers...> = 0>
48 void composed_pipeline(InQueue & input_queue,
50  OutQueue & output_queue, std::vector<std::thread> & tasks)
51 {
52  using namespace std;
53  using namespace experimental;
54 
55  using stage_type =
56  typename tuple_element<Index,decltype(pipeline_obj.stages)>::type;
57  using gen_value_type = typename InQueue::value_type;
58  using input_value_type = typename gen_value_type::value_type;
59  using result_value_type =
60  typename result_of<stage_type(input_value_type)>::type;
61  using result_type = optional<result_value_type>;
62 
63  parallel_execution_native & ex = pipeline_obj.exectype;
64  static auto tmp_queue = ex.make_queue<result_type>();
65 
66  composed_pipeline(pipeline_obj.exectype, input_queue,
67  get<Index>(pipeline_obj.stages), tmp_queue, tasks);
68  composed_pipeline<mpmc_queue<result_type>,
69  OutQueue, Index+1, MoreTransformers ...>(
70  tmp_queue,pipeline_obj,output_queue,tasks);
71 }
72 
73 template <typename InQueue, typename Transformer, typename OutQueue>
74 void composed_pipeline(parallel_execution_native & ex, InQueue & input_queue,
75  Transformer && transform_op, OutQueue & output_queue,
76  std::vector<std::thread> & tasks)
77 {
78  using namespace std;
79  tasks.emplace_back([&]() {
80  auto manager = ex.thread_manager();
81  auto item = input_queue.pop();
82  for (;;) {
83  using output_type = typename OutQueue::value_type;
84  if (!item) {
85  output_queue.push(output_type{});
86  break;
87  }
88  else {
89  output_queue.push(transform_op(*item));
90  }
91  item = input_queue.pop();
92  }
93  });
94 }
95 
96 //Last stage
97 template <typename InQueue, typename Consumer>
98 void pipeline_impl(parallel_execution_native & ex, InQueue& input_queue,
99  Consumer && consume)
100 {
101  using namespace std;
102  using gen_value_type = typename InQueue::value_type;
103 
104  auto manager = ex.thread_manager();
105 
106  vector<gen_value_type> elements;
107  long current = 0;
108  if (ex.is_ordered()){
109  auto item = input_queue.pop();
110  while (item.first) {
111  if(current == item.second){
112  consume(*item.first);
113  current ++;
114  }
115  else {
116  elements.push_back(item);
117  }
118  // TODO: Probably find_if() + erase
119  for (auto it=elements.begin(); it!=elements.end(); it++) {
120  if(it->second == current) {
121  consume(*it->first);
122  elements.erase(it);
123  current++;
124  break;
125  }
126  }
127  item = input_queue.pop( );
128  }
129  while (elements.size()>0) {
130  // TODO: Probably find_if() + erase
131  for (auto it = elements.begin(); it != elements.end(); it++) {
132  if(it->second == current) {
133  consume(*it->first);
134  elements.erase(it);
135  current++;
136  break;
137  }
138  }
139  }
140  }
141  else {
142  auto item = input_queue.pop( );
143  while (item.first) {
144  consume(*item.first);
145  item = input_queue.pop();
146  }
147  }
148 }
149 
150 //Item reduce stage
151 template <typename Combiner, typename Identity, typename InQueue, typename ...MoreTransformers>
152 void pipeline_impl(parallel_execution_native & ex, InQueue & input_queue,
153  reduction_info<parallel_execution_native, Combiner, Identity> & reduction_obj, MoreTransformers ...more_transform_ops)
154 {
155  using reduction_type =
157 
158  pipeline_impl(ex, input_queue, std::forward<reduction_type&&>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
159 }
160 
161 template <typename Combiner, typename Identity, typename InQueue, typename ...MoreTransformers>
162 void pipeline_impl(parallel_execution_native & ex, InQueue & input_queue,
163  reduction_info<parallel_execution_native, Combiner, Identity> && reduction_obj, MoreTransformers ...more_transform_ops)
164 {
165  using reduction_type =
167  pipeline_impl_ordered(ex, input_queue, std::forward<reduction_type>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
168 }
169 
170 template <typename Combiner, typename Identity, typename InQueue, typename ...MoreTransformers>
171 void pipeline_impl_ordered(parallel_execution_native & ex, InQueue & input_queue,
172  reduction_info<parallel_execution_native,Combiner,Identity> && reduction_obj, MoreTransformers ...more_transform_ops)
173 {
174  using namespace std;
175  using namespace std::experimental;
176  vector<thread> tasks;
177  using gen_value_type = typename InQueue::value_type;
178  using input_value_type = typename gen_value_type::first_type::value_type;
179 
180  using result_value_type = typename result_of<Combiner(input_value_type, input_value_type)>::type;
181  using result_type = pair<optional<result_value_type>, long>;
182 
183  auto output_queue = ex.make_queue<result_type>();
184 
185  thread windower_task([&](){
186  vector<input_value_type> values;
187  long out_order=0;
188  auto item {input_queue.pop()};
189  for(;;){
190  while (item.first && values.size() != reduction_obj.window_size) {
191  values.push_back(*item.first);
192  item = input_queue.pop();
193  }
194  if (values.size() > 0) {
195  auto reduced_value = reduce(reduction_obj.exectype, values.begin(), values.end(), reduction_obj.identity,
196  std::forward<Combiner>(reduction_obj.combine_op));
197  output_queue.push({{reduced_value}, out_order});
198  out_order++;
199  if (item.first) {
200  if (reduction_obj.offset <= reduction_obj.window_size) {
201  values.erase(values.begin(), values.begin() + reduction_obj.offset);
202  }
203  else {
204  values.erase(values.begin(), values.end());
205  auto diff = reduction_obj.offset - reduction_obj.window_size;
206  while (diff > 0 && item.first) {
207  item = input_queue.pop();
208  diff--;
209  }
210  }
211  }
212  }
213  if (!item.first) break;
214  }
215  output_queue.push({{},-1});
216  });
217 
218  pipeline_impl(ex, output_queue, forward<MoreTransformers>(more_transform_ops) ... );
219  windower_task.join();
220 }
221 
222 //Filtering stage
223 template <typename Transformer, typename InQueue, typename... MoreTransformers>
224 void pipeline_impl(parallel_execution_native & ex, InQueue & input_queue,
226  MoreTransformers && ... more_transform_ops)
227 {
229 
230  pipeline_impl(ex,input_queue, std::forward<filter_type>(filter_obj),
231  std::forward<MoreTransformers>(more_transform_ops)... );
232 }
233 
234 template <typename Transformer, typename InQueue, typename... MoreTransformers>
235 void pipeline_impl_ordered(parallel_execution_native & ex, InQueue& input_queue,
237  MoreTransformers && ... more_transform_ops )
238 {
239  using namespace std;
240  vector<thread> tasks;
241 
242  using gen_value_type = typename InQueue::value_type;
243  using input_value_type = typename gen_value_type::first_type;
244  auto tmp_queue = ex.make_queue<gen_value_type>();
245 
246  atomic<int> done_threads{0};
247  for (int th=0; th<filter_obj.exectype.concurrency_degree(); th++) {
248  tasks.emplace_back([&]() {
249  auto manager = filter_obj.exectype.thread_manager();
250 
251  auto item{input_queue.pop()};
252  while (item.first) {
253  if (filter_obj.task(*item.first)) {
254  tmp_queue.push(item);
255  }
256  else {
257  tmp_queue.push(make_pair(input_value_type{},item.second) );
258  }
259  item = input_queue.pop();
260  }
261  done_threads++;
262  if (done_threads==filter_obj.exectype.concurrency_degree()) {
263  tmp_queue.push(make_pair(input_value_type{}, -1));
264  }
265  else {
266  input_queue.push(item);
267  }
268  });
269  }
270 
271  auto output_queue = ex.make_queue<gen_value_type>();
272  auto ordering_thread = thread{[&](){
273  auto manager = ex.thread_manager();
274  vector<gen_value_type> elements;
275  int current = 0;
276  long order = 0;
277  auto item{tmp_queue.pop()};
278  for (;;) {
279  if(!item.first && item.second == -1) break;
280  if (item.second == current) {
281  if (item.first) {
282  output_queue.push(make_pair(item.first,order));
283  order++;
284  }
285  current++;
286  }
287  else {
288  elements.push_back(item);
289  }
290  // TODO: Probably find_if() + erase
291  for (auto it=elements.begin(); it<elements.end(); it++) {
292  if (it->second == current) {
293  if (it->first) {
294  output_queue.push(make_pair(it->first,order));
295  order++;
296  }
297  elements.erase(it);
298  current++;
299  break;
300  }
301  }
302  item = tmp_queue.pop();
303  }
304  while (elements.size()>0) {
305  // TODO: Probably find_if() + erase
306  for (auto it=elements.begin(); it<elements.end(); it++) {
307  if (it->second == current) {
308  if(it->first) {
309  output_queue.push(make_pair(it->first,order));
310  order++;
311  }
312  elements.erase(it);
313  current++;
314  break;
315  }
316  }
317  }
318  output_queue.push(item);
319  }};
320 
321  pipeline_impl(ex, output_queue, forward<MoreTransformers>(more_transform_ops) ... );
322  ordering_thread.join();
323  for (auto && t : tasks) { t.join(); }
324 }
325 
326 template <typename Transformer, typename InQueue, typename ... MoreTransformers>
327 void pipeline_impl_unordered(parallel_execution_native & ex, InQueue & input_queue,
329  MoreTransformers && ... more_transform_ops)
330 {
331  using namespace std;
332  vector<thread> tasks;
333 
334  using gen_value_type = typename InQueue::value_type;
335  using input_value_type = typename gen_value_type::first_type;
336  auto output_queue = ex.make_queue<gen_value_type>();
337 
338  atomic<int> done_threads{0};
339 
340  for (int th=0; th<filter_obj.exectype.concurrency_degree(); th++) {
341  tasks.emplace_back([&]() {
342  auto manager = filter_obj.exectype.thread_manager();
343 
344  auto item{input_queue.pop()};
345  while (item.first) {
346  if (filter_obj.task(*item.first)) {
347  output_queue.push(item);
348  }
349  item = input_queue.pop();
350  }
351  done_threads++;
352  if (done_threads==filter_obj.exectype.concurrency_degree()) {
353  output_queue.push( make_pair(input_value_type{}, -1) );
354  }
355  else {
356  input_queue.push(item);
357  }
358  });
359  }
360 
361  pipeline_impl(ex, output_queue,
362  forward<MoreTransformers>(more_transform_ops) ... );
363 
364  for (auto && t : tasks) { t.join(); }
365 }
366 
367 template <typename Transformer, typename InQueue, typename ... MoreTransformers>
368 void pipeline_impl(parallel_execution_native & ex, InQueue& input_queue,
370  MoreTransformers && ... more_transform_ops)
371 {
373  if(ex.is_ordered()) {
374  pipeline_impl_ordered(ex, input_queue,
375  std::forward<filter_type>(filter_obj),
376  std::forward<MoreTransformers>(more_transform_ops)...);
377  }
378  else{
379  pipeline_impl_unordered(ex, input_queue,
380  std::forward<filter_type>(filter_obj),
381  std::forward<MoreTransformers>(more_transform_ops)...);
382  }
383 }
384 
385 template <typename Transformer, typename InQueue, typename... MoreTransformers>
386 void pipeline_impl(parallel_execution_native & ex, InQueue & input_queue,
388  MoreTransformers && ... more_transform_ops)
389 {
391  pipeline_impl(ex, input_queue, std::forward<farm_type>(farm_obj),
392  std::forward< MoreTransformers>(more_transform_ops) ... );
393 }
394 
395 
396 //Farm stage
397 template <typename Transformer, typename InQueue, typename... MoreTransformers>
398 void pipeline_impl(parallel_execution_native & p, InQueue & input_queue,
400  MoreTransformers && ... more_transform_ops)
401 {
402  using namespace std;
403 
404  using input_item_type = typename InQueue::value_type;
405  using input_item_value_type =
406  typename input_item_type::first_type::value_type;
407  using transform_result_type =
408  typename result_of<Transformer(input_item_value_type)>::type;
409  using output_item_value_type =
410  experimental::optional<transform_result_type>;
411  using output_item_type =
412  pair<output_item_value_type,long>;
413  auto output_queue = p.make_queue<output_item_type>();
414 
415  atomic<int> done_threads{0};
416  vector<thread> tasks;
417  for(int th = 0; th<farm_obj.exectype.concurrency_degree(); ++th){
418  tasks.emplace_back([&]() {
419  auto manager = farm_obj.exectype.thread_manager();
420 
421  long order = 0;
422  auto item{input_queue.pop()};
423  while (item.first) {
424  auto out = output_item_value_type{farm_obj.task(*item.first)};
425  output_queue.push(make_pair(out,item.second)) ;
426  item = input_queue.pop( );
427  }
428  input_queue.push(item);
429  done_threads++;
430  if (done_threads == farm_obj.exectype.concurrency_degree()) {
431  output_queue.push(make_pair(output_item_value_type{}, -1));
432  }
433  });
434  }
435  pipeline_impl(p, output_queue,
436  forward<MoreTransformers>(more_transform_ops)... );
437 
438  for (auto && t : tasks) { t.join(); }
439 }
440 
441 //Intermediate pipeline_impl
442 template <typename Transformer, typename InQueue, typename... MoreTransformers>
443 void pipeline_impl(parallel_execution_native & ex, InQueue & input_queue,
444  Transformer && transform_op,
445  MoreTransformers && ... more_transform_ops)
446 {
447  using namespace std;
448 
449  using input_item_type = typename InQueue::value_type;
450  using input_item_value_type = typename input_item_type::first_type::value_type;
451  using transform_result_type =
452  typename result_of<Transformer(input_item_value_type)>::type;
453  using output_item_value_type =
454  experimental::optional<transform_result_type>;
455  using output_item_type =
456  pair<output_item_value_type,long>;
457 
458  auto output_queue = ex.make_queue<output_item_type>();
459 
460  thread task(
461  [&]() {
462  auto manager = ex.thread_manager();
463 
464  long order = 0;
465  auto item{input_queue.pop()};
466  while(item.first) {
467  auto out = output_item_value_type{transform_op(*item.first)};
468  output_queue.push(make_pair(out, item.second));
469  item = input_queue.pop( ) ;
470  }
471  output_queue.push(make_pair(output_item_value_type{},-1));
472  }
473  );
474 
475  pipeline_impl(ex, output_queue,
476  forward<MoreTransformers>(more_transform_ops)...);
477  task.join();
478 }
479 
498 template <typename Generator, typename ... Transformers,
500 void pipeline(parallel_execution_native & ex, Generator && generate_op,
501  Transformers && ... transform_ops)
502 {
503  using namespace std;
504 
505  using result_type = typename result_of<Generator()>::type;
506  using output_type = pair<result_type,long>;
507  auto first_queue = ex.make_queue<output_type>();
508 
509  thread generator_task(
510  [&]() {
511  auto manager = ex.thread_manager();
512 
513  long order = 0;
514  for (;;) {
515  auto item{generate_op()};
516  first_queue.push(make_pair(item, order));
517  order++;
518  if (!item) break;
519  }
520  }
521  );
522 
523  pipeline_impl(ex, first_queue, forward<Transformers>(transform_ops)...);
524  generator_task.join();
525 }
526 
532 }
533 
534 #endif
Definition: callable_traits.h:24
void pipeline_impl_ordered(parallel_execution_native &ex, InQueue &input_queue, reduction_info< parallel_execution_native, Combiner, Identity > &&reduction_obj, MoreTransformers...more_transform_ops)
Definition: native/pipeline.h:171
Definition: patterns.h:61
Definition: patterns.h:38
Definition: patterns.h:29
typename std::enable_if_t<!internal::has_arguments< F >(), int > requires_no_arguments
Definition: callable_traits.h:86
STL namespace.
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
E & exectype
Definition: patterns.h:31
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
std::enable_if_t<(Index< sizeof...(T)-1), int > requires_index_not_last
Definition: pack_traits.h:34
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
std::tuple< Stage, Stages... > stages
Definition: patterns.h:32
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
void pipeline_impl_unordered(parallel_execution_native &ex, InQueue &input_queue, filter_info< parallel_execution_native, Transformer > &&filter_obj, MoreTransformers &&...more_transform_ops)
Definition: native/pipeline.h:327
Definition: patterns.h:51
void composed_pipeline(InQueue &input_queue, const pipeline_info< parallel_execution_native, MoreTransformers... > &pipe, OutQueue &output_queue, std::vector< std::thread > &tasks)
Definition: native/pipeline.h:36
std::enable_if_t<(Index==sizeof...(T)-1), int > requires_index_last
Definition: pack_traits.h:30
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_native.h:193
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51