GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
parallel_execution_tbb.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_TBB_PARALLEL_EXECUTION_TBB_H
22 #define GRPPI_TBB_PARALLEL_EXECUTION_TBB_H
23 
24 #ifdef GRPPI_TBB
25 
26 #include "../common/mpmc_queue.h"
27 #include "../common/iterator.h"
28 #include "../common/patterns.h"
29 #include "../common/farm_pattern.h"
30 #include "../common/execution_traits.h"
31 
32 #include <type_traits>
33 #include <tuple>
34 
35 #include <tbb/tbb.h>
36 
37 namespace grppi {
38 
45 public:
46 
56  parallel_execution_tbb{default_concurrency_degree}
57  {}
58 
67  parallel_execution_tbb(int concurrency_degree, bool order = true) noexcept :
68  concurrency_degree_{concurrency_degree},
69  ordering_{order}
70  {}
71 
75  void set_concurrency_degree(int degree) noexcept { concurrency_degree_ = degree; }
76 
80  int concurrency_degree() const noexcept { return concurrency_degree_; }
81 
85  void enable_ordering() noexcept { ordering_=true; }
86 
90  void disable_ordering() noexcept { ordering_=false; }
91 
95  bool is_ordered() const noexcept { return ordering_; }
96 
100  void set_queue_attributes(int size, queue_mode mode, int tokens) noexcept {
101  queue_size_ = size;
102  queue_mode_ = mode;
103  num_tokens_ = tokens;
104  }
105 
111  template <typename T>
113  return {queue_size_, queue_mode_};
114  }
115 
119  int tokens() const noexcept { return num_tokens_; }
120 
135  template <typename ... InputIterators, typename OutputIterator,
136  typename Transformer>
137  void map(std::tuple<InputIterators...> firsts,
138  OutputIterator first_out,
139  std::size_t sequence_size, Transformer transform_op) const;
140 
154  template <typename InputIterator, typename Identity, typename Combiner>
155  auto reduce(InputIterator first, std::size_t sequence_size,
156  Identity && identity, Combiner && combine_op) const;
157 
172  template <typename ... InputIterators, typename Identity,
173  typename Transformer, typename Combiner>
174  auto map_reduce(std::tuple<InputIterators...> firsts,
175  std::size_t sequence_size,
176  Identity && identity,
177  Transformer && transform_op, Combiner && combine_op) const;
178 
193  template <typename ... InputIterators, typename OutputIterator,
194  typename StencilTransformer, typename Neighbourhood>
195  void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
196  std::size_t sequence_size,
197  StencilTransformer && transform_op,
198  Neighbourhood && neighbour_op) const;
199 
212  template <typename Input, typename Divider, typename Solver, typename Combiner>
213  [[deprecated("Use new interface with predicate argument")]]
214  auto divide_conquer(Input && input,
215  Divider && divide_op,
216  Solver && solve_op,
217  Combiner && combine_op) const;
218 
219 
234  template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
235  auto divide_conquer(Input && input,
236  Divider && divide_op,
237  Predicate && predicate_op,
238  Solver && solve_op,
239  Combiner && combine_op) const;
240 
248  template <typename Generator, typename ... Transformers>
249  void pipeline(Generator && generate_op,
250  Transformers && ... transform_op) const;
251 
262  template <typename InputType, typename Transformer, typename OutputType>
263  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
264  mpmc_queue<OutputType> & output_queue) const
265  {
266  ::std::atomic<long> order {0};
267  pipeline(
268  [&](){
269  auto item = input_queue.pop();
270  if(!item.first) input_queue.push(item);
271  return item.first;
272  },
273  std::forward<Transformer>(transform_op),
274  [&](auto & item ){
275  output_queue.push(make_pair(typename OutputType::first_type{item}, order.load()));
276  order++;
277  }
278  );
279  output_queue.push(make_pair(typename OutputType::first_type{}, order.load()));
280  //sequential_execution seq{};
281  //seq.pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
282  }
283 
284 
285 private:
286 
287  template <typename Input, typename Divider, typename Solver, typename Combiner>
288  auto divide_conquer(Input && input,
289  Divider && divide_op,
290  Solver && solve_op,
291  Combiner && combine_op,
292  std::atomic<int> & num_threads) const;
293 
294  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
295  auto divide_conquer(Input && input,
296  Divider && divide_op,
297  Predicate && predicate_op,
298  Solver && solve_op,
299  Combiner && combine_op,
300  std::atomic<int> & num_threads) const;
301 
302  template <typename Input, typename Transformer,
304  auto make_filter(Transformer && transform_op) const;
305 
306  template <typename Input, typename Transformer, typename ... OtherTransformers,
307  requires_no_pattern<Transformer> = 0>
308  auto make_filter(Transformer && transform_op,
309  OtherTransformers && ... other_transform_ops) const;
310 
311  template <typename Input, typename FarmTransformer,
312  template <typename> class Farm,
314  auto make_filter(Farm<FarmTransformer> & farm_obj) const
315  {
316  return this->template make_filter<Input>(std::move(farm_obj));
317  }
318 
319  template <typename Input, typename FarmTransformer,
320  template <typename> class Farm,
321  requires_farm<Farm<FarmTransformer>> = 0>
322  auto make_filter(Farm<FarmTransformer> && farm_obj) const;
323 
324  template <typename Input, typename Execution, typename Transformer,
325  template <typename, typename> class Context,
326  typename ... OtherTransformers,
328  auto make_filter(Context<Execution,Transformer> && context_op,
329  OtherTransformers &&... other_ops) const
330  {
331  return this->template make_filter<Input>(std::forward<Transformer>(context_op.transformer()),
332  std::forward<OtherTransformers>(other_ops)...);
333  }
334 
335  template <typename Input, typename Execution, typename Transformer,
336  template <typename, typename> class Context,
337  typename ... OtherTransformers,
338  requires_context<Context<Execution,Transformer>> = 0>
339  auto make_filter(Context<Execution,Transformer> & context_op,
340  OtherTransformers &&... other_ops) const
341  {
342  return this->template make_filter<Input>(std::move(context_op),
343  std::forward<OtherTransformers>(other_ops)...);
344  }
345 
346 
347  template <typename Input, typename FarmTransformer,
348  template <typename> class Farm,
349  typename ... OtherTransformers,
350  requires_farm<Farm<FarmTransformer>> = 0>
351  auto make_filter(Farm<FarmTransformer> & filter_obj,
352  OtherTransformers && ... other_transform_ops) const
353  {
354  return this->template make_filter<Input>(std::move(filter_obj),
355  std::forward<OtherTransformers>(other_transform_ops)...);
356  }
357 
358  template <typename Input, typename FarmTransformer,
359  template <typename> class Farm,
360  typename ... OtherTransformers,
361  requires_farm<Farm<FarmTransformer>> = 0>
362  auto make_filter(Farm<FarmTransformer> && filter_obj,
363  OtherTransformers && ... other_transform_ops) const;
364 
365  template <typename Input, typename Predicate,
366  template <typename> class Filter,
368  auto make_filter(Filter<Predicate> & filter_obj) const
369  {
370  return this->template make_filter<Input>(std::move(filter_obj));
371  }
372 
373  template <typename Input, typename Predicate,
374  template <typename> class Filter,
375  requires_filter<Filter<Predicate>> = 0>
376  auto make_filter(Filter<Predicate> && filter_obj) const;
377 
378  template <typename Input, typename Predicate,
379  template <typename> class Filter,
380  typename ... OtherTransformers,
381  requires_filter<Filter<Predicate>> = 0>
382  auto make_filter(Filter<Predicate> & filter_obj,
383  OtherTransformers && ... other_transform_ops) const
384  {
385  return this->template make_filter<Input>(std::move(filter_obj),
386  std::forward<OtherTransformers>(other_transform_ops)...);
387  }
388 
389  template <typename Input, typename Predicate,
390  template <typename> class Filter,
391  typename ... OtherTransformers,
392  requires_filter<Filter<Predicate>> = 0>
393  auto make_filter(Filter<Predicate> && filter_obj,
394  OtherTransformers && ... other_transform_ops) const;
395 
396  template <typename Input, typename Combiner, typename Identity,
397  template <typename C, typename I> class Reduce,
398  typename ... OtherTransformers,
400  auto make_filter(Reduce<Combiner,Identity> & reduce_obj,
401  OtherTransformers && ... other_transform_ops) const
402  {
403  return this->template make_filter<Input>(std::move(reduce_obj),
404  std::forward<OtherTransformers>(other_transform_ops)...);
405  };
406 
407  template <typename Input, typename Combiner, typename Identity,
408  template <typename C, typename I> class Reduce,
409  typename ... OtherTransformers,
410  requires_reduce<Reduce<Combiner,Identity>> = 0>
411  auto make_filter(Reduce<Combiner,Identity> && reduce_obj,
412  OtherTransformers && ... other_transform_ops) const;
413 
414  template <typename Input, typename Transformer, typename Predicate,
415  template <typename T, typename P> class Iteration,
416  typename ... OtherTransformers,
418  requires_no_pattern<Transformer> =0>
419  auto make_filter(Iteration<Transformer,Predicate> & iteration_obj,
420  OtherTransformers && ... other_transform_ops) const
421  {
422  return this->template make_filter<Input>(std::move(iteration_obj),
423  std::forward<OtherTransformers>(other_transform_ops)...);
424  }
425 
426  template <typename Input, typename Transformer, typename Predicate,
427  template <typename T, typename P> class Iteration,
428  typename ... OtherTransformers,
429  requires_iteration<Iteration<Transformer,Predicate>> =0,
430  requires_no_pattern<Transformer> =0>
431  auto make_filter(Iteration<Transformer,Predicate> && iteration_obj,
432  OtherTransformers && ... other_transform_ops) const;
433 
434  template <typename Input, typename Transformer, typename Predicate,
435  template <typename T, typename P> class Iteration,
436  typename ... OtherTransformers,
437  requires_iteration<Iteration<Transformer,Predicate>> =0,
439  auto make_filter(Iteration<Transformer,Predicate> && iteration_obj,
440  OtherTransformers && ... other_transform_ops) const;
441 
442  template <typename Input, typename ... Transformers,
443  template <typename...> class Pipeline,
444  typename ... OtherTransformers,
445  requires_pipeline<Pipeline<Transformers...>> = 0>
446  auto make_filter(Pipeline<Transformers...> & pipeline_obj,
447  OtherTransformers && ... other_transform_ops) const
448  {
449  return this->template make_filter<Input>(std::move(pipeline_obj),
450  std::forward<OtherTransformers>(other_transform_ops)...);
451  }
452 
453  template <typename Input, typename ... Transformers,
454  template <typename...> class Pipeline,
455  typename ... OtherTransformers,
456  requires_pipeline<Pipeline<Transformers...>> = 0>
457  auto make_filter(Pipeline<Transformers...> && pipeline_obj,
458  OtherTransformers && ... other_transform_ops) const;
459 
460  template <typename Input, typename ... Transformers,
461  std::size_t ... I>
462  auto make_filter_nested(std::tuple<Transformers...> && transform_ops,
463  std::index_sequence<I...>) const;
464 
465 private:
466 
467  constexpr static int default_concurrency_degree = 4;
468  int concurrency_degree_ = default_concurrency_degree;
469 
470  bool ordering_ = true;
471 
472  constexpr static int default_queue_size = 100;
473  int queue_size_ = default_queue_size;
474 
475  constexpr static int default_num_tokens_ = 100;
476  int num_tokens_ = default_num_tokens_;
477 
478  queue_mode queue_mode_ = queue_mode::blocking;
479 };
480 
485 template <typename E>
486 constexpr bool is_parallel_execution_tbb() {
487  return std::is_same<E, parallel_execution_tbb>::value;
488 }
489 
494 template <>
495 constexpr bool is_supported<parallel_execution_tbb>() { return true; }
496 
501 template <>
502 constexpr bool supports_map<parallel_execution_tbb>() { return true; }
503 
508 template <>
509 constexpr bool supports_reduce<parallel_execution_tbb>() { return true; }
510 
515 template <>
516 constexpr bool supports_map_reduce<parallel_execution_tbb>() { return true; }
517 
522 template <>
523 constexpr bool supports_stencil<parallel_execution_tbb>() { return true; }
524 
529 template <>
530 constexpr bool supports_divide_conquer<parallel_execution_tbb>() { return true; }
531 
536 template <>
537 constexpr bool supports_pipeline<parallel_execution_tbb>() { return true; }
538 
539 template <typename ... InputIterators, typename OutputIterator,
540  typename Transformer>
542  std::tuple<InputIterators...> firsts,
543  OutputIterator first_out,
544  std::size_t sequence_size, Transformer transform_op) const
545 {
546  tbb::parallel_for(
547  std::size_t{0}, sequence_size,
548  [&] (std::size_t index){
549  first_out[index] = apply_iterators_indexed(transform_op, firsts, index);
550  }
551  );
552 
553 }
554 
555 template <typename InputIterator, typename Identity, typename Combiner>
557  InputIterator first,
558  std::size_t sequence_size,
559  Identity && identity,
560  Combiner && combine_op) const
561 {
562  constexpr sequential_execution seq;
563  return tbb::parallel_reduce(
564  tbb::blocked_range<InputIterator>(first, std::next(first,sequence_size)),
565  identity,
566  [combine_op,seq](const auto & range, auto value) {
567  return seq.reduce(range.begin(), range.size(), value, combine_op);
568  },
569  combine_op);
570 }
571 
572 template <typename ... InputIterators, typename Identity,
573  typename Transformer, typename Combiner>
575  std::tuple<InputIterators...> firsts,
576  std::size_t sequence_size,
577  Identity && identity,
578  Transformer && transform_op, Combiner && combine_op) const
579 {
580  constexpr sequential_execution seq;
581  tbb::task_group g;
582 
583  using result_type = std::decay_t<Identity>;
584  std::vector<result_type> partial_results(concurrency_degree_);
585 
586  auto process_chunk = [&](auto fins, std::size_t sz, std::size_t i) {
587  partial_results[i] = seq.map_reduce(fins, sz,
588  std::forward<result_type>(partial_results[i]),
589  std::forward<Transformer>(transform_op),
590  std::forward<Combiner>(combine_op));
591  };
592 
593  const auto chunk_size = sequence_size/concurrency_degree_;
594 
595  for(int i=0; i<concurrency_degree_-1;++i) {
596  const auto delta = chunk_size * i;
597  const auto chunk_firsts = iterators_next(firsts,delta);
598  const auto chunk_last = std::next(std::get<0>(chunk_firsts), chunk_size);
599 
600  g.run([&, chunk_firsts, chunk_last, i]() {
601  process_chunk(chunk_firsts, chunk_size, i);
602  });
603  }
604 
605  const auto delta = chunk_size * (concurrency_degree_ - 1);
606  const auto chunk_firsts = iterators_next(firsts,delta);
607  process_chunk(chunk_firsts, sequence_size - delta, concurrency_degree_-1);
608 
609  g.wait();
610 
611  return seq.reduce(std::next(partial_results.begin()),
612  partial_results.size()-1, std::forward<result_type>(partial_results[0]),
613  std::forward<Combiner>(combine_op));
614 }
615 
616 template <typename ... InputIterators, typename OutputIterator,
617  typename StencilTransformer, typename Neighbourhood>
619  std::tuple<InputIterators...> firsts, OutputIterator first_out,
620  std::size_t sequence_size,
621  StencilTransformer && transform_op,
622  Neighbourhood && neighbour_op) const
623 {
624  constexpr sequential_execution seq{};
625  const auto chunk_size = sequence_size / concurrency_degree_;
626  auto process_chunk = [&](auto f, std::size_t sz, std::size_t delta) {
627  seq.stencil(f, std::next(first_out,delta), sz,
628  std::forward<StencilTransformer>(transform_op),
629  std::forward<Neighbourhood>(neighbour_op));
630  };
631 
632  tbb::task_group g;
633  for (int i=0; i<concurrency_degree_-1; ++i) {
634  g.run([=](){
635  const auto delta = chunk_size * i;
636  const auto chunk_firsts = iterators_next(firsts,delta);
637  process_chunk(chunk_firsts, chunk_size, delta);
638  });
639  }
640 
641  const auto delta = chunk_size * (concurrency_degree_ - 1);
642  const auto chunk_firsts = iterators_next(firsts,delta);
643  const auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
644  process_chunk(chunk_firsts,
645  std::distance(std::get<0>(chunk_firsts), chunk_last), delta);
646 
647  g.wait();
648 }
649 
650 template <typename Input, typename Divider, typename Solver, typename Combiner>
652  Input && input,
653  Divider && divide_op,
654  Solver && solve_op,
655  Combiner && combine_op) const
656 {
657  std::atomic<int> num_threads{concurrency_degree_-1};
658  return divide_conquer(std::forward<Input>(input),
659  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
660  std::forward<Combiner>(combine_op), num_threads);
661 }
662 
663 template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
665  Input && input,
666  Divider && divide_op,
667  Predicate && predicate_op,
668  Solver && solve_op,
669  Combiner && combine_op) const
670 {
671  std::atomic<int> num_threads{concurrency_degree_-1};
672  return divide_conquer(std::forward<Input>(input),
673  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
674  std::forward<Solver>(solve_op),
675  std::forward<Combiner>(combine_op), num_threads);
676 }
677 
678 
679 template <typename Generator, typename ... Transformers>
681  Generator && generate_op,
682  Transformers && ... transform_ops) const
683 {
684  using namespace std;
685  using namespace experimental;
686 
687  using result_type = decay_t<typename result_of<Generator()>::type>;
688  using output_value_type = typename result_type::value_type;
689  using output_type = optional<output_value_type>;
690 
691  auto generator = tbb::make_filter<void, output_type>(
692  tbb::filter::serial_in_order,
693  [&](tbb::flow_control & fc) -> output_type {
694  auto item = generate_op();
695  if (item) {
696  return *item;
697  }
698  else {
699  fc.stop();
700  return {};
701  }
702  }
703  );
704 
705  auto rest =
706  this->template make_filter<output_value_type>(forward<Transformers>(transform_ops)...);
707 
708 
709  tbb::task_group_context context;
710  tbb::parallel_pipeline(tokens(),
711  generator
712  &
713  rest);
714 }
715 
716 // PRIVATE MEMBERS
717 
718 template <typename Input, typename Divider, typename Solver, typename Combiner>
720  Input && input,
721  Divider && divide_op,
722  Solver && solve_op,
723  Combiner && combine_op,
724  std::atomic<int> & num_threads) const
725 {
726  constexpr sequential_execution seq;
727 
728  if (num_threads.load()<=0) {
729  return seq.divide_conquer(std::forward<Input>(input),
730  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
731  std::forward<Combiner>(combine_op));
732  }
733 
734  auto subproblems = divide_op(std::forward<Input>(input));
735 
736  if (subproblems.size()<=1) { return solve_op(std::forward<Input>(input)); }
737 
738  using subresult_type = std::decay_t<typename std::result_of<Solver(Input)>::type>;
739  std::vector<subresult_type> partials(subproblems.size()-1);
740  int division = 0;
741 
742  tbb::task_group g;
743  auto i = subproblems.begin()+1;
744  while (i!=subproblems.end() && num_threads.load()>0) {
745  g.run([&,this,it=i++,div=division++]() {
746  partials[div] = this->divide_conquer(std::forward<Input>(*it),
747  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
748  std::forward<Combiner>(combine_op), num_threads);
749  });
750  num_threads--;
751  }
752 
753  //Main thread works on the first subproblem.
754  while (i != subproblems.end()){
755  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
756  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
757  std::forward<Combiner>(combine_op));
758  }
759 
760  auto out = divide_conquer(std::forward<Input>(*subproblems.begin()),
761  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
762  std::forward<Combiner>(combine_op), num_threads);
763 
764  g.wait();
765 
766  return seq.reduce(partials.begin(), partials.size(),
767  std::forward<subresult_type>(out), std::forward<Combiner>(combine_op));
768 }
769 
770 template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
772  Input && input,
773  Divider && divide_op,
774  Predicate && predicate_op,
775  Solver && solve_op,
776  Combiner && combine_op,
777  std::atomic<int> & num_threads) const
778 {
779  constexpr sequential_execution seq;
780 
781  if (num_threads.load()<=0) {
782  return seq.divide_conquer(std::forward<Input>(input),
783  std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
784  std::forward<Solver>(solve_op),
785  std::forward<Combiner>(combine_op));
786  }
787 
788 
789  if (predicate_op(input)) { return solve_op(std::forward<Input>(input)); }
790  auto subproblems = divide_op(std::forward<Input>(input));
791 
792  using subresult_type = std::decay_t<typename std::result_of<Solver(Input)>::type>;
793  std::vector<subresult_type> partials(subproblems.size()-1);
794  int division = 0;
795 
796  tbb::task_group g;
797  auto i = subproblems.begin()+1;
798  while (i!=subproblems.end() && num_threads.load()>0) {
799  g.run([&,this,it=i++,div=division++]() {
800  partials[div] = this->divide_conquer(std::forward<Input>(*it),
801  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
802  std::forward<Solver>(solve_op),
803  std::forward<Combiner>(combine_op), num_threads);
804  });
805  num_threads--;
806  }
807 
808  //Main thread works on the first subproblem.
809  while (i != subproblems.end()){
810  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
811  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
812  std::forward<Solver>(solve_op),
813  std::forward<Combiner>(combine_op));
814  }
815 
816  auto out = divide_conquer(std::forward<Input>(*subproblems.begin()),
817  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
818  std::forward<Combiner>(combine_op), num_threads);
819 
820  g.wait();
821 
822  return seq.reduce(partials.begin(), partials.size(),
823  std::forward<subresult_type>(out), std::forward<Combiner>(combine_op));
824 }
825 
826 template <typename Input, typename Transformer,
828 auto parallel_execution_tbb::make_filter(
829  Transformer && transform_op) const
830 {
831  using namespace std;
832  using namespace experimental;
833 
834  using input_value_type = Input;
835  using gen_value_type = optional<input_value_type>;
836 
837  return tbb::make_filter<gen_value_type, void>(
838  tbb::filter::serial_in_order,
839  [=](gen_value_type item) {
840  if (item) transform_op(*item);
841  });
842 }
843 
844 template <typename Input, typename Transformer, typename ... OtherTransformers,
846 auto parallel_execution_tbb::make_filter(
847  Transformer && transform_op,
848  OtherTransformers && ... other_transform_ops) const
849 {
850  using namespace std;
851  using namespace experimental;
852 
853  using input_value_type = Input;
854  static_assert(!is_void<input_value_type>::value,
855  "Transformer must take non-void argument");
856  using gen_value_type = optional<input_value_type>;
857  using output_value_type =
858  decay_t<typename result_of<Transformer(input_value_type)>::type>;
859  static_assert(!is_void<output_value_type>::value,
860  "Transformer must return a non-void result");
861  using output_type = optional<output_value_type>;
862 
863 
864  return
865  tbb::make_filter<gen_value_type, output_type>(
866  tbb::filter::serial_in_order,
867  [=](gen_value_type item) -> output_type {
868  if (item) return transform_op(*item);
869  else return {};
870  })
871  &
872  this->template make_filter<output_value_type>(
873  std::forward<OtherTransformers>(other_transform_ops)...);
874 }
875 
876 template <typename Input, typename FarmTransformer,
877  template <typename> class Farm,
879 auto parallel_execution_tbb::make_filter(
880  Farm<FarmTransformer> && farm_obj) const
881 {
882  using namespace std;
883  using namespace experimental;
884 
885  using input_value_type = Input;
886  using gen_value_type = optional<input_value_type>;
887 
888  return tbb::make_filter<gen_value_type, void>(
889  tbb::filter::parallel,
890  [=](gen_value_type item) {
891  if (item) farm_obj(*item);
892  });
893 }
894 
895 template <typename Input, typename FarmTransformer,
896  template <typename> class Farm,
897  typename ... OtherTransformers,
899 auto parallel_execution_tbb::make_filter(
900  Farm<FarmTransformer> && farm_obj,
901  OtherTransformers && ... other_transform_ops) const
902 {
903  using namespace std;
904  using namespace experimental;
905 
906  using input_value_type = Input;
907  static_assert(!is_void<input_value_type>::value,
908  "Farm must take non-void argument");
909  using gen_value_type = optional<input_value_type>;
910  using output_value_type = decay_t<typename result_of<FarmTransformer(input_value_type)>::type>;
911  static_assert(!is_void<output_value_type>::value,
912  "Farm must return a non-void result");
913  using output_type = optional<output_value_type>;
914 
915  return tbb::make_filter<gen_value_type, output_type>(
916  tbb::filter::parallel,
917  [&](gen_value_type item) -> output_type {
918  if (item) return farm_obj(*item);
919  else return {};
920  })
921  &
922  this->template make_filter<output_value_type>(
923  std::forward<OtherTransformers>(other_transform_ops)...);
924 }
925 
926 template <typename Input, typename Predicate,
927  template <typename> class Filter,
929 auto parallel_execution_tbb::make_filter(
930  Filter<Predicate> && farm_obj) const
931 {
932  using namespace std;
933  using namespace experimental;
934 
935  using input_value_type = Input;
936  using gen_value_type = optional<input_value_type>;
937 
938  return tbb::make_filter<gen_value_type, void>(
939  tbb::filter::parallel,
940  [=](gen_value_type item) {
941  if (item) filter_obj(*item);
942  });
943 }
944 
945 template <typename Input, typename Predicate,
946  template <typename> class Filter,
947  typename ... OtherTransformers,
949 auto parallel_execution_tbb::make_filter(
950  Filter<Predicate> && filter_obj,
951  OtherTransformers && ... other_transform_ops) const
952 {
953  using namespace std;
954  using namespace experimental;
955 
956  using input_value_type = Input;
957  static_assert(!is_void<input_value_type>::value,
958  "Filter must take non-void argument");
959  using gen_value_type = optional<input_value_type>;
960 
961  return tbb::make_filter<gen_value_type, gen_value_type>(
962  tbb::filter::parallel,
963  [&](gen_value_type item) -> gen_value_type {
964  if (item && filter_obj(*item)) return item;
965  else return {};
966  })
967  &
968  this->template make_filter<input_value_type>(
969  std::forward<OtherTransformers>(other_transform_ops)...);
970 }
971 
972 template <typename Input, typename Combiner, typename Identity,
973  template <typename C, typename I> class Reduce,
974  typename ... OtherTransformers,
976 auto parallel_execution_tbb::make_filter(
977  Reduce<Combiner,Identity> && reduce_obj,
978  OtherTransformers && ... other_transform_ops) const
979 {
980  using namespace std;
981  using namespace experimental;
982 
983  using input_value_type = Input;
984  using gen_value_type = optional<input_value_type>;
985 
986  std::atomic<long int> order{0};
987  return tbb::make_filter<gen_value_type, gen_value_type>(
988  tbb::filter::serial,
989  [&, it=std::vector<input_value_type>(), rem=0](gen_value_type item) -> gen_value_type {
990  if (!item) return {};
991  reduce_obj.add_item(std::forward<Identity>(*item));
992  if (reduce_obj.reduction_needed()) {
993  constexpr sequential_execution seq;
994  return reduce_obj.reduce_window(seq);
995  }
996  return {};
997  })
998  &
999  this->template make_filter<input_value_type>(
1000  std::forward<OtherTransformers>(other_transform_ops)...);
1001 }
1002 
1003 template <typename Input, typename Transformer, typename Predicate,
1004  template <typename T, typename P> class Iteration,
1005  typename ... OtherTransformers,
1008 auto parallel_execution_tbb::make_filter(
1009  Iteration<Transformer,Predicate> && iteration_obj,
1010  OtherTransformers && ... other_transform_ops) const
1011 {
1012  using namespace std;
1013  using namespace experimental;
1014 
1015  using input_value_type = Input;
1016  using gen_value_type = optional<input_value_type>;
1017 
1018  return tbb::make_filter<gen_value_type, gen_value_type>(
1019  tbb::filter::serial,
1020  [&](gen_value_type item) -> gen_value_type {
1021  if (!item) return {};
1022  do {
1023  item = iteration_obj.transform(*item);
1024  } while (!iteration_obj.predicate(*item));
1025  return item;
1026  })
1027  &
1028  this->template make_filter<input_value_type>(
1029  std::forward<OtherTransformers>(other_transform_ops)...);
1030 }
1031 
1032 template <typename Input, typename Transformer, typename Predicate,
1033  template <typename T, typename P> class Iteration,
1034  typename ... OtherTransformers,
1035  requires_iteration<Iteration<Transformer,Predicate>>,
1037 auto parallel_execution_tbb::make_filter(
1038  Iteration<Transformer,Predicate> && iteration_obj,
1039  OtherTransformers && ... other_transform_ops) const
1040 {
1041  static_assert(!is_pipeline<Transformer>, "Not implemented");
1042 }
1043 
1044 
1045 
1046 template <typename Input, typename ... Transformers,
1047  template <typename...> class Pipeline,
1048  typename ... OtherTransformers,
1049  requires_pipeline<Pipeline<Transformers...>>>
1050 auto parallel_execution_tbb::make_filter(
1051  Pipeline<Transformers...> && pipeline_obj,
1052  OtherTransformers && ... other_transform_ops) const
1053 {
1054  return this->template make_filter_nested<Input>(
1055  std::tuple_cat(pipeline_obj.transformers(),
1056  std::forward_as_tuple(other_transform_ops...)),
1057  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
1058 }
1059 
1060 template <typename Input, typename ... Transformers,
1061  std::size_t ... I>
1062 auto parallel_execution_tbb::make_filter_nested(
1063  std::tuple<Transformers...> && transform_ops,
1064  std::index_sequence<I...>) const
1065 {
1066  return this->template make_filter<Input>(
1067  std::forward<Transformers>(std::get<I>(transform_ops))...);
1068 }
1069 
1070 
1071 } // end namespace grppi
1072 
1073 #else // GRPPI_TBB not defined
1074 
1075 namespace grppi {
1076 
1079 struct parallel_execution_tbb {};
1080 
1086 template <typename E>
1087 constexpr bool is_parallel_execution_tbb() {
1088  return false;
1089 }
1090 
1091 } // end namespace grppi
1092 
1093 #endif // GRPPI_TBB
1094 
1095 #endif
Definition: callable_traits.h:26
std::enable_if_t< is_reduce< T >, int > requires_reduce
Definition: reduce_pattern.h:135
constexpr bool supports_reduce< parallel_execution_tbb >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_tbb.h:509
constexpr bool is_supported< parallel_execution_tbb >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_tbb.h:495
int tokens() const noexcept
Definition: parallel_execution_tbb.h:119
typename std::enable_if_t< is_iteration< T >, int > requires_iteration
Definition: iteration_pattern.h:88
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_tbb.h:90
constexpr bool is_parallel_execution_tbb()
Metafunction that determines if type E is parallel_execution_tbb.
Definition: parallel_execution_tbb.h:486
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern comming from another context that uses mpmc_queues as communication channels...
Definition: parallel_execution_tbb.h:263
STL namespace.
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:175
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing...
Definition: iterator.h:147
typename internal::output_value_type< I, T >::type output_value_type
Definition: pipeline_pattern.h:132
queue_mode
Definition: mpmc_queue.h:35
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_tbb.h:75
constexpr bool supports_map< parallel_execution_tbb >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_tbb.h:502
void set_queue_attributes(int size, queue_mode mode, int tokens) noexcept
Sets the attributes for the queues built through make_queue<T>()
Definition: parallel_execution_tbb.h:100
Definition: mpmc_queue.h:38
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_tbb.h:112
parallel_execution_tbb() noexcept
Default construct a TBB parallel execution policy.
Definition: parallel_execution_tbb.h:55
parallel_execution_tbb(int concurrency_degree, bool order=true) noexcept
Constructs a TBB parallel execution policy.
Definition: parallel_execution_tbb.h:67
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a trasnformation to multiple sequences leaving the result in another sequence using available...
Definition: parallel_execution_tbb.h:541
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:44
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:92
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
void pipeline(Generator &&generate_op, Transformers &&...transform_op) const
Invoke Pipeline pattern.
Definition: parallel_execution_tbb.h:680
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_tbb.h:80
typename std::enable_if_t< is_filter< T >, int > requires_filter
Definition: filter_pattern.h:70
constexpr bool supports_divide_conquer< parallel_execution_tbb >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_tbb.h:530
Sequential execution policy.
Definition: sequential_execution.h:41
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: sequential_execution.h:543
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of appliying a function on a input type.
Definition: patterns.h:110
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: parallel_execution_tbb.h:651
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_tbb.h:95
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_tbb.h:85
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a trasnformation to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_tbb.h:618
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_tbb.h:574
constexpr bool supports_map_reduce< parallel_execution_tbb >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_tbb.h:516
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_tbb.h:556
constexpr bool supports_pipeline< parallel_execution_tbb >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_tbb.h:537
T pop()
Definition: mpmc_queue.h:93
typename std::enable_if_t< is_context< T >, int > requires_context
Definition: common/context.h:95
bool push(T item)
Definition: mpmc_queue.h:128
typename std::enable_if_t< is_farm< T >, int > requires_farm
Definition: farm_pattern.h:89
constexpr bool supports_stencil< parallel_execution_tbb >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_tbb.h:523