GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
parallel_execution_omp.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_OMP_PARALLEL_EXECUTION_OMP_H
22 #define GRPPI_OMP_PARALLEL_EXECUTION_OMP_H
23 
24 #ifdef GRPPI_OMP
25 
26 #include "../common/mpmc_queue.h"
27 #include "../common/iterator.h"
28 #include "../common/execution_traits.h"
29 #include "../seq/sequential_execution.h"
30 
31 #include <type_traits>
32 #include <tuple>
33 #include <experimental/optional>
34 
35 #include <omp.h>
36 
37 
38 namespace grppi {
39 
46 
47 public:
57  parallel_execution_omp{impl_concurrency_degree()}
58  {}
59 
72  parallel_execution_omp(int concurrency_degree, bool order = true) noexcept :
73  concurrency_degree_{concurrency_degree},
74  ordering_{order}
75  {
76  omp_set_num_threads(concurrency_degree_);
77  }
78 
82  void set_concurrency_degree(int degree) noexcept {
83  concurrency_degree_ = degree;
84  omp_set_num_threads(concurrency_degree_);
85  }
86 
90  int concurrency_degree() const noexcept {
91  return concurrency_degree_;
92  }
93 
97  void enable_ordering() noexcept { ordering_=true; }
98 
102  void disable_ordering() noexcept { ordering_=false; }
103 
107  bool is_ordered() const noexcept { return ordering_; }
108 
112  void set_queue_attributes(int size, queue_mode mode) noexcept {
113  queue_size_ = size;
114  queue_mode_ = mode;
115  }
116 
123  template <typename T>
125  return {queue_size_, queue_mode_};
126  }
127 
136  template <typename T, typename ... Transformers>
137  mpmc_queue<T>& get_output_queue(mpmc_queue<T> & queue, Transformers &&...) const {
138  return queue;
139  }
140 
148  template <typename T, typename ... Transformers>
149  mpmc_queue<T> get_output_queue(Transformers &&... ) const{
150  return std::move(make_queue<T>());
151  }
152 
156  int get_thread_id() const noexcept {
157  int result;
158  #pragma omp parallel
159  {
160  result = omp_get_thread_num();
161  }
162  return result;
163  }
164 
179  template <typename ... InputIterators, typename OutputIterator,
180  typename Transformer>
181  void map(std::tuple<InputIterators...> firsts,
182  OutputIterator first_out,
183  std::size_t sequence_size, Transformer transform_op) const;
184 
197  template <typename InputIterator, typename Identity, typename Combiner>
198  auto reduce(InputIterator first, std::size_t sequence_size,
199  Identity && identity, Combiner && combine_op) const;
200 
215  template <typename ... InputIterators, typename Identity,
216  typename Transformer, typename Combiner>
217  auto map_reduce(std::tuple<InputIterators...> firsts,
218  std::size_t sequence_size,
219  Identity && identity,
220  Transformer && transform_op, Combiner && combine_op) const;
221 
238  template <typename ... InputIterators, typename OutputIterator,
239  typename StencilTransformer, typename Neighbourhood>
240  void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
241  std::size_t sequence_size,
242  StencilTransformer && transform_op,
243  Neighbourhood && neighbour_op) const;
244 
257  template <typename Input, typename Divider, typename Solver, typename Combiner>
258  [[deprecated("Use new interface with predicate argument")]]
259  auto divide_conquer(Input && input,
260  Divider && divide_op,
261  Solver && solve_op,
262  Combiner && combine_op) const;
263 
278  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
279  auto divide_conquer(Input && input,
280  Divider && divide_op,
281  Predicate && predicate_op,
282  Solver && solve_op,
283  Combiner && combine_op) const;
284 
285 
293  template <typename Generator, typename ... Transformers>
294  void pipeline(Generator && generate_op,
295  Transformers && ... transform_op) const;
296 
307  template <typename InputType, typename Transformer, typename OutputType>
308  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
309  mpmc_queue<OutputType> &output_queue) const
310  {
311  do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
312  }
313 
314 private:
315 
316  template <typename Input, typename Divider, typename Solver, typename Combiner>
317  auto divide_conquer(Input && input,
318  Divider && divide_op,
319  Solver && solve_op,
320  Combiner && combine_op,
321  std::atomic<int> & num_threads) const;
322 
323  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
324  auto divide_conquer(Input && input,
325  Divider && divide_op,
326  Predicate && predicate_op,
327  Solver && solve_op,
328  Combiner && combine_op,
329  std::atomic<int> & num_threads) const;
330 
331 
332  template <typename Queue, typename Consumer,
334  void do_pipeline(Queue & input_queue, Consumer && consume_op) const;
335 
336  template <typename Inqueue, typename Transformer, typename output_type,
338  void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
339  mpmc_queue<output_type> & output_queue) const;
340 
341  template <typename T, typename ... Others>
342  void do_pipeline(mpmc_queue<T> & in_q, mpmc_queue<T> & same_queue, Others &&... ops) const
343  { }
344 
345  template <typename T>
346  void do_pipeline(mpmc_queue<T> & in_q) const {}
347 
348  template <typename Queue, typename Transformer, typename ... OtherTransformers,
349  requires_no_pattern<Transformer> = 0>
350  void do_pipeline(Queue & input_queue, Transformer && transform_op,
351  OtherTransformers && ... other_ops) const;
352 
353  template <typename Queue, typename Execution, typename Transformer,
354  template <typename, typename> class Context,
355  typename ... OtherTransformers,
357  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
358  OtherTransformers &&... other_ops) const;
359 
360  template <typename Queue, typename Execution, typename Transformer,
361  template <typename, typename> class Context,
362  typename ... OtherTransformers,
363  requires_context<Context<Execution,Transformer>> = 0>
364  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
365  OtherTransformers &&... other_ops) const
366  {
367  do_pipeline(input_queue, std::move(context_op),
368  std::forward<OtherTransformers>(other_ops)...);
369  }
370 
371  template <typename Queue, typename FarmTransformer,
372  template <typename> class Farm,
374  void do_pipeline(Queue & input_queue,
375  Farm<FarmTransformer> & farm_obj) const
376  {
377  do_pipeline(input_queue, std::move(farm_obj));
378  }
379 
380  template <typename Queue, typename FarmTransformer,
381  template <typename> class Farm,
382  requires_farm<Farm<FarmTransformer>> = 0>
383  void do_pipeline(Queue & input_queue,
384  Farm<FarmTransformer> && farm_obj) const;
385 
386  template <typename Queue, typename FarmTransformer,
387  template <typename> class Farm,
388  typename ... OtherTransformers,
389  requires_farm<Farm<FarmTransformer>> =0>
390  void do_pipeline(Queue & input_queue,
391  Farm<FarmTransformer> & farm_obj,
392  OtherTransformers && ... other_transform_ops) const
393  {
394  do_pipeline(input_queue, std::move(farm_obj),
395  std::forward<OtherTransformers>(other_transform_ops)...);
396  }
397 
398  template <typename Queue, typename FarmTransformer,
399  template <typename> class Farm,
400  typename ... OtherTransformers,
401  requires_farm<Farm<FarmTransformer>> =0>
402  void do_pipeline(Queue & input_queue,
403  Farm<FarmTransformer> && farm_obj,
404  OtherTransformers && ... other_transform_ops) const;
405 
406  template <typename Queue, typename Predicate,
407  template <typename> class Filter,
409  void do_pipeline(Queue & input_queue,
410  Filter<Predicate> & filter_obj) const
411  {
412  do_pipeline(input_queue, std::move(filter_obj));
413  }
414 
415  template <typename Queue, typename Predicate,
416  template <typename> class Filter,
417  requires_filter<Filter<Predicate>> = 0>
418  void do_pipeline(Queue & input_queue,
419  Filter<Predicate> && filter_obj) const;
420 
421  template <typename Queue, typename Predicate,
422  template <typename> class Filter,
423  typename ... OtherTransformers,
424  requires_filter<Filter<Predicate>> =0>
425  void do_pipeline(Queue & input_queue,
426  Filter<Predicate> & filter_obj,
427  OtherTransformers && ... other_transform_ops) const
428  {
429  do_pipeline(input_queue, std::move(filter_obj),
430  std::forward<OtherTransformers>(other_transform_ops)...);
431  }
432 
433  template <typename Queue, typename Predicate,
434  template <typename> class Filter,
435  typename ... OtherTransformers,
436  requires_filter<Filter<Predicate>> =0>
437  void do_pipeline(Queue & input_queue,
438  Filter<Predicate> && filter_obj,
439  OtherTransformers && ... other_transform_ops) const;
440 
441  template <typename Queue, typename Combiner, typename Identity,
442  template <typename C, typename I> class Reduce,
443  typename ... OtherTransformers,
445  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
446  OtherTransformers && ... other_transform_ops) const
447  {
448  do_pipeline(input_queue, std::move(reduce_obj),
449  std::forward<OtherTransformers>(other_transform_ops)...);
450  }
451 
452  template <typename Queue, typename Combiner, typename Identity,
453  template <typename C, typename I> class Reduce,
454  typename ... OtherTransformers,
455  requires_reduce<Reduce<Combiner,Identity>> = 0>
456  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
457  OtherTransformers && ... other_transform_ops) const;
458 
459  template <typename Queue, typename Transformer, typename Predicate,
460  template <typename T, typename P> class Iteration,
461  typename ... OtherTransformers,
463  requires_no_pattern<Transformer> =0>
464  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
465  OtherTransformers && ... other_transform_ops) const
466  {
467  do_pipeline(input_queue, std::move(iteration_obj),
468  std::forward<OtherTransformers>(other_transform_ops)...);
469  }
470 
471  template <typename Queue, typename Transformer, typename Predicate,
472  template <typename T, typename P> class Iteration,
473  typename ... OtherTransformers,
474  requires_iteration<Iteration<Transformer,Predicate>> =0,
475  requires_no_pattern<Transformer> =0>
476  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
477  OtherTransformers && ... other_transform_ops) const;
478 
479  template <typename Queue, typename Transformer, typename Predicate,
480  template <typename T, typename P> class Iteration,
481  typename ... OtherTransformers,
482  requires_iteration<Iteration<Transformer,Predicate>> =0,
484  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
485  OtherTransformers && ... other_transform_ops) const;
486 
487  template <typename Queue, typename ... Transformers,
488  template <typename...> class Pipeline,
489  typename ... OtherTransformers,
490  requires_pipeline<Pipeline<Transformers...>> = 0>
491  void do_pipeline(Queue & input_queue,
492  Pipeline<Transformers...> & pipeline_obj,
493  OtherTransformers && ... other_transform_ops) const
494  {
495  do_pipeline(input_queue, std::move(pipeline_obj),
496  std::forward<OtherTransformers>(other_transform_ops)...);
497  }
498 
499  template <typename Queue, typename ... Transformers,
500  template <typename...> class Pipeline,
501  typename ... OtherTransformers,
502  requires_pipeline<Pipeline<Transformers...>> = 0>
503  void do_pipeline(Queue & input_queue,
504  Pipeline<Transformers...> && pipeline_obj,
505  OtherTransformers && ... other_transform_ops) const;
506 
507  template <typename Queue, typename ... Transformers,
508  std::size_t ... I>
509  void do_pipeline_nested(
510  Queue & input_queue,
511  std::tuple<Transformers...> && transform_ops,
512  std::index_sequence<I...>) const;
513 
514 private:
515 
523  static int impl_concurrency_degree() {
524  int result;
525  #pragma omp parallel
526  {
527  result = omp_get_num_threads();
528  }
529  return result;
530  }
531 
532 private:
533 
534  int concurrency_degree_;
535 
536  bool ordering_;
537 
538  constexpr static int default_queue_size = 100;
539  int queue_size_ = default_queue_size;
540 
541  queue_mode queue_mode_ = queue_mode::blocking;
542 };
543 
548 template <typename E>
549 constexpr bool is_parallel_execution_omp() {
550  return std::is_same<E, parallel_execution_omp>::value;
551 }
552 
557 template <>
558 constexpr bool is_supported<parallel_execution_omp>() { return true; }
559 
564 template <>
565 constexpr bool supports_map<parallel_execution_omp>() { return true; }
566 
571 template <>
572 constexpr bool supports_reduce<parallel_execution_omp>() { return true; }
573 
578 template <>
579 constexpr bool supports_map_reduce<parallel_execution_omp>() { return true; }
580 
585 template <>
586 constexpr bool supports_stencil<parallel_execution_omp>() { return true; }
587 
592 template <>
593 constexpr bool supports_divide_conquer<parallel_execution_omp>() { return true; }
594 
599 template <>
600 constexpr bool supports_pipeline<parallel_execution_omp>() { return true; }
601 
602 template <typename ... InputIterators, typename OutputIterator,
603  typename Transformer>
605  std::tuple<InputIterators...> firsts,
606  OutputIterator first_out,
607  std::size_t sequence_size, Transformer transform_op) const
608 {
609  #pragma omp parallel for
610  for (std::size_t i=0; i<sequence_size; ++i) {
611  first_out[i] = apply_iterators_indexed(transform_op, firsts, i);
612  }
613 }
614 
615 template <typename InputIterator, typename Identity, typename Combiner>
617  InputIterator first, std::size_t sequence_size,
618  Identity && identity,
619  Combiner && combine_op) const
620 {
621  constexpr sequential_execution seq;
622 
623  using result_type = std::decay_t<Identity>;
624  std::vector<result_type> partial_results(concurrency_degree_);
625  auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
626  partial_results[id] = seq.reduce(f, sz, std::forward<Identity>(identity),
627  std::forward<Combiner>(combine_op));
628  };
629 
630  const auto chunk_size = sequence_size/concurrency_degree_;
631 
632  #pragma omp parallel
633  {
634  #pragma omp single nowait
635  {
636  for (int i=0 ;i<concurrency_degree_-1; ++i) {
637  const auto delta = chunk_size * i;
638  const auto chunk_first = std::next(first,delta);
639 
640  #pragma omp task firstprivate (chunk_first, chunk_size, i)
641  {
642  process_chunk(chunk_first, chunk_size, i);
643  }
644  }
645 
646  //Main thread
647  const auto delta = chunk_size * (concurrency_degree_ - 1);
648  const auto chunk_first= std::next(first,delta);
649  const auto chunk_sz = sequence_size - delta;
650  process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
651  #pragma omp taskwait
652  }
653  }
654 
655  return seq.reduce(std::next(partial_results.begin()),
656  partial_results.size()-1,
657  partial_results[0], std::forward<Combiner>(combine_op));
658 }
659 
660 template <typename ... InputIterators, typename Identity,
661  typename Transformer, typename Combiner>
663  std::tuple<InputIterators...> firsts,
664  std::size_t sequence_size,
665  Identity && identity,
666  Transformer && transform_op, Combiner && combine_op) const
667 {
668  constexpr sequential_execution seq;
669 
670  using result_type = std::decay_t<Identity>;
671  std::vector<result_type> partial_results(concurrency_degree_);
672 
673  auto process_chunk = [&](auto f, std::size_t sz, std::size_t i) {
674  partial_results[i] = seq.map_reduce(
675  f, sz, partial_results[i],
676  std::forward<Transformer>(transform_op),
677  std::forward<Combiner>(combine_op));
678  };
679 
680  const auto chunk_size = sequence_size / concurrency_degree_;
681 
682  #pragma omp parallel
683  {
684  #pragma omp single nowait
685  {
686 
687  for (int i=0;i<concurrency_degree_-1;++i) {
688  #pragma omp task firstprivate(i)
689  {
690  const auto delta = chunk_size * i;
691  const auto chunk_firsts = iterators_next(firsts,delta);
692  const auto chunk_last = std::next(std::get<0>(chunk_firsts), chunk_size);
693  process_chunk(chunk_firsts, chunk_size, i);
694  }
695  }
696 
697  const auto delta = chunk_size * (concurrency_degree_ - 1);
698  auto chunk_firsts = iterators_next(firsts,delta);
699  auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
700  process_chunk(chunk_firsts,
701  std::distance(std::get<0>(chunk_firsts), chunk_last),
702  concurrency_degree_ - 1);
703  #pragma omp taskwait
704  }
705  }
706 
707  return seq.reduce(std::next(partial_results.begin()),
708  partial_results.size()-1,
709  partial_results[0], std::forward<Combiner>(combine_op));
710 }
711 
712 template <typename ... InputIterators, typename OutputIterator,
713  typename StencilTransformer, typename Neighbourhood>
715  std::tuple<InputIterators...> firsts, OutputIterator first_out,
716  std::size_t sequence_size,
717  StencilTransformer && transform_op,
718  Neighbourhood && neighbour_op) const
719 {
720  constexpr sequential_execution seq;
721  const auto chunk_size = sequence_size / concurrency_degree_;
722  auto process_chunk = [&](auto f, std::size_t sz, std::size_t delta) {
723  seq.stencil(f, std::next(first_out,delta), sz,
724  std::forward<StencilTransformer>(transform_op),
725  std::forward<Neighbourhood>(neighbour_op));
726  };
727 
728  #pragma omp parallel
729  {
730  #pragma omp single nowait
731  {
732  for (int i=0; i<concurrency_degree_-1; ++i) {
733  #pragma omp task firstprivate(i)
734  {
735  const auto delta = chunk_size * i;
736  const auto chunk_firsts = iterators_next(firsts,delta);
737  process_chunk(chunk_firsts, chunk_size, delta);
738  }
739  }
740 
741  const auto delta = chunk_size * (concurrency_degree_ - 1);
742  const auto chunk_firsts = iterators_next(firsts,delta);
743  const auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
744  process_chunk(chunk_firsts,
745  std::distance(std::get<0>(chunk_firsts), chunk_last), delta);
746 
747  #pragma omp taskwait
748  }
749  }
750 }
751 
752 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
754  Input && input,
755  Divider && divide_op,
756  Predicate && predicate_op,
757  Solver && solve_op,
758  Combiner && combine_op) const
759 {
760  std::atomic<int> num_threads{concurrency_degree_-1};
761 
762  return divide_conquer(std::forward<Input>(input),
763  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
764  std::forward<Solver>(solve_op),
765  std::forward<Combiner>(combine_op),
766  num_threads);
767 }
768 
769 
770 template <typename Input, typename Divider, typename Solver, typename Combiner>
772  Input && input,
773  Divider && divide_op,
774  Solver && solve_op,
775  Combiner && combine_op) const
776 {
777  std::atomic<int> num_threads{concurrency_degree_-1};
778 
779  return divide_conquer(std::forward<Input>(input),
780  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
781  std::forward<Combiner>(combine_op),
782  num_threads);
783 }
784 
785 template <typename Generator, typename ... Transformers>
787  Generator && generate_op,
788  Transformers && ... transform_ops) const
789 {
790  using namespace std;
791 
792  using result_type = decay_t<typename result_of<Generator()>::type>;
793  auto output_queue = make_queue<pair<result_type,long>>();
794 
795  #pragma omp parallel
796  {
797  #pragma omp single nowait
798  {
799  #pragma omp task shared(generate_op,output_queue)
800  {
801  long order = 0;
802  for (;;) {
803  auto item = generate_op();
804  output_queue.push(make_pair(item,order++)) ;
805  if (!item) break;
806  }
807  }
808  do_pipeline(output_queue,
809  forward<Transformers>(transform_ops)...);
810  #pragma omp taskwait
811  }
812  }
813 }
814 
815 // PRIVATE MEMBERS
816 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
818  Input && input,
819  Divider && divide_op,
820  Predicate && predicate_op,
821  Solver && solve_op,
822  Combiner && combine_op,
823  std::atomic<int> & num_threads) const
824 {
825  constexpr sequential_execution seq;
826  if (num_threads.load()<=0) {
827  return seq.divide_conquer(std::forward<Input>(input),
828  std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
829  std::forward<Solver>(solve_op),
830  std::forward<Combiner>(combine_op));
831  }
832 
833  if (predicate_op(input)) { return solve_op(std::forward<Input>(input)); }
834  auto subproblems = divide_op(std::forward<Input>(input));
835 
836  using subresult_type =
837  std::decay_t<typename std::result_of<Solver(Input)>::type>;
838  std::vector<subresult_type> partials(subproblems.size()-1);
839 
840  auto process_subproblems = [&,this](auto it, std::size_t div) {
841  partials[div] = this->divide_conquer(std::forward<Input>(*it),
842  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
843  std::forward<Solver>(solve_op),
844  std::forward<Combiner>(combine_op), num_threads);
845  };
846 
847  int division = 0;
848  subresult_type subresult;
849 
850  #pragma omp parallel
851  {
852  #pragma omp single nowait
853  {
854  auto i = subproblems.begin() + 1;
855  while (i!=subproblems.end() && num_threads.load()>0) {
856  #pragma omp task firstprivate(i,division) \
857  shared(partials,divide_op,solve_op,combine_op,num_threads)
858  {
859  process_subproblems(i, division);
860  }
861  num_threads --;
862  i++;
863  division++;
864  }
865 
866  while (i!=subproblems.end()) {
867  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
868  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
869  std::forward<Solver>(solve_op),
870  std::forward<Combiner>(combine_op));
871  }
872 
873  //Main thread works on the first subproblem.
874  if (num_threads.load()>0) {
875  subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
876  std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
877  std::forward<Solver>(solve_op),
878  std::forward<Combiner>(combine_op), num_threads);
879  }
880  else {
881  subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
882  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
883  std::forward<Solver>(solve_op),
884  std::forward<Combiner>(combine_op));
885  }
886  #pragma omp taskwait
887  }
888  }
889  return seq.reduce(partials.begin(), partials.size(),
890  std::forward<subresult_type>(subresult), combine_op);
891 }
892 
893 
894 
895 template <typename Input, typename Divider, typename Solver, typename Combiner>
897  Input && input,
898  Divider && divide_op,
899  Solver && solve_op,
900  Combiner && combine_op,
901  std::atomic<int> & num_threads) const
902 {
903  constexpr sequential_execution seq;
904  if (num_threads.load()<=0) {
905  return seq.divide_conquer(std::forward<Input>(input),
906  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
907  std::forward<Combiner>(combine_op));
908  }
909 
910  auto subproblems = divide_op(std::forward<Input>(input));
911  if (subproblems.size()<=1) { return solve_op(std::forward<Input>(input)); }
912 
913  using subresult_type =
914  std::decay_t<typename std::result_of<Solver(Input)>::type>;
915  std::vector<subresult_type> partials(subproblems.size()-1);
916 
917  auto process_subproblems = [&,this](auto it, std::size_t div) {
918  partials[div] = this->divide_conquer(std::forward<Input>(*it),
919  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
920  std::forward<Combiner>(combine_op), num_threads);
921  };
922 
923  int division = 0;
924  subresult_type subresult;
925 
926  #pragma omp parallel
927  {
928  #pragma omp single nowait
929  {
930  auto i = subproblems.begin() + 1;
931  while (i!=subproblems.end() && num_threads.load()>0) {
932  #pragma omp task firstprivate(i,division) \
933  shared(partials,divide_op,solve_op,combine_op,num_threads)
934  {
935  process_subproblems(i, division);
936  }
937  num_threads --;
938  i++;
939  division++;
940  }
941 
942  while (i!=subproblems.end()) {
943  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
944  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
945  std::forward<Combiner>(combine_op));
946  }
947 
948  //Main thread works on the first subproblem.
949  if (num_threads.load()>0) {
950  subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
951  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
952  std::forward<Combiner>(combine_op), num_threads);
953  }
954  else {
955  subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
956  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
957  std::forward<Combiner>(combine_op));
958  }
959  #pragma omp taskwait
960  }
961  }
962  return seq.reduce(partials.begin(), partials.size(),
963  std::forward<subresult_type>(subresult), combine_op);
964 }
965 
966 template <typename Queue, typename Consumer,
968 void parallel_execution_omp::do_pipeline(Queue & input_queue, Consumer && consume_op) const
969 {
970  using namespace std;
971  using gen_value_type = typename Queue::value_type;
972 
973  if (!is_ordered()) {
974  for (;;) {
975  auto item = input_queue.pop();
976  if (!item.first) break;
977  consume_op(*item.first);
978  }
979  return;
980  }
981 
982  vector<gen_value_type> elements;
983  long current = 0;
984  auto item = input_queue.pop( );
985  while (item.first) {
986  if (current == item.second) {
987  consume_op(*item.first);
988  current ++;
989  }
990  else {
991  elements.push_back(item);
992  }
993  for (auto it=elements.begin(); it!=elements.end(); it++) {
994  if (it->second == current) {
995  consume_op(*it->first);
996  elements.erase(it);
997  current++;
998  break;
999  }
1000  }
1001  item = input_queue.pop( );
1002  }
1003  while(elements.size()>0){
1004  for(auto it = elements.begin(); it != elements.end(); it++){
1005  if(it->second == current) {
1006  consume_op(*it->first);
1007  elements.erase(it);
1008  current++;
1009  break;
1010  }
1011  }
1012  }
1013 }
1014 
1015 
1016 template <typename Inqueue, typename Transformer, typename output_type,
1018 void parallel_execution_omp::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1019  mpmc_queue<output_type> & output_queue) const
1020 {
1021  using namespace std;
1022  using namespace experimental;
1023 
1024  using input_item_type = typename Inqueue::value_type;
1025  using input_item_value_type = typename input_item_type::first_type::value_type;
1026 
1027  using output_optional_type = typename output_type::first_type;
1028  using output_item_value_type = typename output_type::first_type::value_type;
1029  for (;;) {
1030  auto item{input_queue.pop()};
1031  if(!item.first) break;
1032  auto out = output_item_value_type{transform_op(*item.first)};
1033  output_queue.push(make_pair(out,item.second)) ;
1034  }
1035 }
1036 
1037 
1038 template <typename Queue, typename Execution, typename Transformer,
1039  template <typename, typename> class Context,
1040  typename ... OtherTransformers,
1042 void parallel_execution_omp::do_pipeline(Queue & input_queue,
1043  Context<Execution,Transformer> && context_op,
1044  OtherTransformers &&... other_ops) const
1045 {
1046  using namespace std;
1047  using namespace experimental;
1048 
1049  using input_item_type = typename Queue::value_type;
1050  using input_item_value_type = typename input_item_type::first_type::value_type;
1051 
1053  using output_optional_type = experimental::optional<output_type>;
1054  using output_item_type = pair <output_optional_type, long> ;
1055 
1056  decltype(auto) output_queue =
1057  get_output_queue<output_item_type>(other_ops...);
1058 
1059  #pragma omp task shared(input_queue,context_op,output_queue)
1060  {
1061  context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1062  output_queue.push(make_pair(output_optional_type{},-1));
1063  }
1064 
1065  do_pipeline(output_queue,
1066  forward<OtherTransformers>(other_ops)... );
1067  #pragma omp taskwait
1068 }
1069 
1070 template <typename Queue, typename Transformer, typename ... OtherTransformers,
1072 void parallel_execution_omp::do_pipeline(
1073  Queue & input_queue,
1074  Transformer && transform_op,
1075  OtherTransformers && ... other_ops) const
1076 {
1077  using namespace std;
1078  using gen_value_type = typename Queue::value_type;
1079  using input_value_type = typename gen_value_type::first_type::value_type;
1080  using result_type = typename result_of<Transformer(input_value_type)>::type;
1081  using output_value_type = experimental::optional<result_type>;
1082  using output_type = pair<output_value_type,long>;
1083 
1084  decltype(auto) output_queue =
1085  get_output_queue<output_type>(other_ops...);
1086 
1087  #pragma omp task shared(transform_op, input_queue, output_queue)
1088  {
1089  for (;;) {
1090  auto item = input_queue.pop();
1091  if (!item.first) break;
1092  auto out = output_value_type{transform_op(*item.first)};
1093  output_queue.push(make_pair(out, item.second));
1094  }
1095  output_queue.push(make_pair(output_value_type{}, -1));
1096  }
1097 
1098  do_pipeline(output_queue,
1099  forward<OtherTransformers>(other_ops)...);
1100 }
1101 
1102 template <typename Queue, typename FarmTransformer,
1103  template <typename> class Farm,
1105 void parallel_execution_omp::do_pipeline(
1106  Queue & input_queue,
1107  Farm<FarmTransformer> && farm_obj) const
1108 {
1109  using namespace std;
1110  using namespace experimental;
1111  using gen_value_type = typename Queue::value_type;
1112  using input_value_type = typename gen_value_type::first_type::value_type;
1113 
1114  for (int i=0; i<farm_obj.cardinality(); ++i) {
1115  #pragma omp task shared(farm_obj,input_queue)
1116  {
1117  auto item = input_queue.pop();
1118  while (item.first) {
1119  farm_obj(*item.first);
1120  item = input_queue.pop();
1121  }
1122  input_queue.push(item);
1123  }
1124  }
1125  #pragma omp taskwait
1126 }
1127 
1128 template <typename Queue, typename FarmTransformer,
1129  template <typename> class Farm,
1130  typename ... OtherTransformers,
1132 void parallel_execution_omp::do_pipeline(
1133  Queue & input_queue,
1134  Farm<FarmTransformer> && farm_obj,
1135  OtherTransformers && ... other_transform_ops) const
1136 {
1137  using namespace std;
1138  using namespace experimental;
1139  using gen_value_type = typename Queue::value_type;
1140  using input_value_type = typename gen_value_type::first_type::value_type;
1141 
1143  using output_optional_type = optional<result_type>;
1144  using output_type = pair<output_optional_type,long>;
1145 
1146  decltype(auto) output_queue =
1147  get_output_queue<output_type>(other_transform_ops...);
1148 
1149 // auto output_queue = make_queue<output_type>();
1150 
1151  atomic<int> done_threads{0};
1152  int ntask = farm_obj.cardinality();
1153  for (int i=0; i<farm_obj.cardinality(); ++i) {
1154  #pragma omp task shared(done_threads,output_queue,farm_obj,input_queue,ntask)
1155  {
1156  do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1157  done_threads++;
1158  if (done_threads == ntask){
1159  output_queue.push(make_pair(output_optional_type{}, -1));
1160  }else{
1161  input_queue.push(gen_value_type{});
1162  }
1163  }
1164  }
1165  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1166  #pragma omp taskwait
1167 }
1168 
1169 
1170 template <typename Queue, typename Predicate,
1171  template <typename> class Filter,
1173 void parallel_execution_omp::do_pipeline(
1174  Queue & input_queue,
1175  Filter<Predicate> && filter_obj) const
1176 {
1177 }
1178 
1179 template <typename Queue, typename Predicate,
1180  template <typename> class Filter,
1181  typename ... OtherTransformers,
1183 void parallel_execution_omp::do_pipeline(
1184  Queue & input_queue,
1185  Filter<Predicate> && filter_obj,
1186  OtherTransformers && ... other_transform_ops) const
1187 {
1188  using namespace std;
1189  using gen_value_type = typename Queue::value_type;
1190  using input_value_type = typename gen_value_type::first_type;
1191  auto filter_queue = make_queue<gen_value_type>();
1192 
1193  if (is_ordered()) {
1194  auto filter_task = [&]() {
1195  {
1196  auto item{input_queue.pop()};
1197  while (item.first) {
1198  if(filter_obj(*item.first)) {
1199  filter_queue.push(item);
1200  }
1201  else {
1202  filter_queue.push(make_pair(input_value_type{} ,item.second));
1203  }
1204  item = input_queue.pop();
1205  }
1206  filter_queue.push (make_pair(input_value_type{}, -1));
1207  }
1208  };
1209 
1210  decltype(auto) output_queue =
1211  get_output_queue<gen_value_type>(other_transform_ops...);
1212 
1213 
1214  auto reorder_task = [&]() {
1215  vector<gen_value_type> elements;
1216  int current = 0;
1217  long order = 0;
1218  auto item = filter_queue.pop();
1219  for (;;) {
1220  if (!item.first && item.second == -1) break;
1221  if (item.second == current) {
1222  if (item.first) {
1223  output_queue.push(make_pair(item.first, order++));
1224  }
1225  current++;
1226  }
1227  else {
1228  elements.push_back(item);
1229  }
1230  for (auto it=elements.begin(); it<elements.end(); it++) {
1231  if ((*it).second==current) {
1232  if((*it).first){
1233  output_queue.push(make_pair((*it).first,order++));
1234  }
1235  elements.erase(it);
1236  current++;
1237  break;
1238  }
1239  }
1240  item = filter_queue.pop();
1241  }
1242 
1243  while (elements.size()>0) {
1244  for (auto it=elements.begin(); it<elements.end(); it++) {
1245  if ((*it).second == current) {
1246  if((*it).first) {
1247  output_queue.push(make_pair((*it).first,order++));
1248  }
1249  elements.erase(it);
1250  current++;
1251  break;
1252  }
1253  }
1254  }
1255 
1256  output_queue.push(item);
1257  };
1258 
1259 
1260  #pragma omp task shared(filter_queue,filter_obj,input_queue)
1261  {
1262  filter_task();
1263  }
1264 
1265  #pragma omp task shared (output_queue,filter_queue)
1266  {
1267  reorder_task();
1268  }
1269 
1270  do_pipeline(output_queue,
1271  forward<OtherTransformers>(other_transform_ops)...);
1272 
1273  #pragma omp taskwait
1274  }
1275  else {
1276  auto filter_task = [&]() {
1277  auto item = input_queue.pop( ) ;
1278  while (item.first) {
1279  if (filter_obj(*item.first)) {
1280  filter_queue.push(item);
1281  }
1282  item = input_queue.pop();
1283  }
1284  filter_queue.push(make_pair(input_value_type{}, -1));
1285  };
1286 
1287  #pragma omp task shared(filter_queue,filter_obj,input_queue)
1288  {
1289  filter_task();
1290  }
1291  do_pipeline(filter_queue,
1292  std::forward<OtherTransformers>(other_transform_ops)...);
1293  #pragma omp taskwait
1294  }
1295 }
1296 
1297 
1298 template <typename Queue, typename Combiner, typename Identity,
1299  template <typename C, typename I> class Reduce,
1300  typename ... OtherTransformers,
1302 void parallel_execution_omp::do_pipeline(
1303  Queue && input_queue,
1304  Reduce<Combiner,Identity> && reduce_obj,
1305  OtherTransformers && ... other_transform_ops) const
1306 {
1307  using namespace std;
1308  using namespace experimental;
1309 
1310  using input_item_type = typename decay_t<Queue>::value_type;
1311  using input_item_value_type = typename input_item_type::first_type::value_type;
1312  using output_item_value_type = optional<decay_t<Identity>>;
1313  using output_item_type = pair<output_item_value_type,long>;
1314 
1315  decltype(auto) output_queue =
1316  get_output_queue<output_item_type>(other_transform_ops...);
1317 
1318  auto reduce_task = [&,this]() {
1319  auto item{input_queue.pop()};
1320  int order = 0;
1321  while (item.first) {
1322  reduce_obj.add_item(std::forward<Identity>(*item.first));
1323  item = input_queue.pop();
1324  if (reduce_obj.reduction_needed()) {
1325  constexpr sequential_execution seq;
1326  auto red = reduce_obj.reduce_window(seq);
1327  output_queue.push(make_pair(red, order++));
1328  }
1329  }
1330  output_queue.push(make_pair(output_item_value_type{}, -1));
1331  };
1332 
1333  #pragma omp task shared(reduce_obj,input_queue, output_queue)
1334  {
1335  reduce_task();
1336  }
1337  do_pipeline(output_queue,
1338  std::forward<OtherTransformers>(other_transform_ops)...);
1339  #pragma omp taskwait
1340 }
1341 
1342 template <typename Queue, typename Transformer, typename Predicate,
1343  template <typename T, typename P> class Iteration,
1344  typename ... OtherTransformers,
1347 void parallel_execution_omp::do_pipeline(
1348  Queue & input_queue,
1349  Iteration<Transformer,Predicate> && iteration_obj,
1350  OtherTransformers && ... other_transform_ops) const
1351 {
1352  using namespace std;
1353  using namespace experimental;
1354 
1355  using input_item_type = typename decay_t<Queue>::value_type;
1356  using input_item_value_type = typename input_item_type::first_type::value_type;
1357  decltype(auto) output_queue =
1358  get_output_queue<input_item_type>(other_transform_ops...);
1359 
1360 
1361  auto iteration_task = [&]() {
1362  for (;;) {
1363  auto item = input_queue.pop();
1364  if (!item.first) break;
1365  std::cerr << "Processing: <" << *item.first << " , " << item.second << ">\n";
1366  auto value = iteration_obj.transform(*item.first);
1367  auto new_item = input_item_type{value,item.second};
1368  if (iteration_obj.predicate(value)) {
1369  std::cerr << "Sending to output"
1370  << *new_item.first << " , " << new_item.second << ">\n";
1371  output_queue.push(new_item);
1372  }
1373  else {
1374  std::cerr << "Sending to input"
1375  << *new_item.first << " , " << new_item.second << ">\n";
1376  input_queue.push(new_item);
1377  }
1378  }
1379  while (!input_queue.is_empty()) {
1380  auto item = input_queue.pop();
1381  std::cerr << "Processing: <" << *item.first << " , " << item.second << ">\n";
1382  auto value = iteration_obj.transform(*item.first);
1383  auto new_item = input_item_type{value,item.second};
1384  if (iteration_obj.predicate(value)) {
1385  output_queue.push(new_item);
1386  }
1387  else {
1388  input_queue.push(new_item);
1389  }
1390  }
1391  output_queue.push(input_item_type{{},-1});
1392  };
1393 
1394  #pragma omp task shared(iteration_obj,input_queue,output_queue)
1395  {
1396  iteration_task();
1397  }
1398  do_pipeline(output_queue,
1399  std::forward<OtherTransformers>(other_transform_ops)...);
1400  #pragma omp taskwait
1401 
1402 }
1403 
1404 template <typename Queue, typename Transformer, typename Predicate,
1405  template <typename T, typename P> class Iteration,
1406  typename ... OtherTransformers,
1407  requires_iteration<Iteration<Transformer,Predicate>>,
1409 void parallel_execution_omp::do_pipeline(
1410  Queue & input_queue,
1411  Iteration<Transformer,Predicate> && iteration_obj,
1412  OtherTransformers && ... other_transform_ops) const
1413 {
1414  static_assert(!is_pipeline<Transformer>, "Not implemented");
1415 }
1416 
1417 template <typename Queue, typename ... Transformers,
1418  template <typename...> class Pipeline,
1419  typename ... OtherTransformers,
1420  requires_pipeline<Pipeline<Transformers...>>>
1421 void parallel_execution_omp::do_pipeline(
1422  Queue & input_queue,
1423  Pipeline<Transformers...> && pipeline_obj,
1424  OtherTransformers && ... other_transform_ops) const
1425 {
1426  do_pipeline_nested(
1427  input_queue,
1428  std::tuple_cat(pipeline_obj.transformers(),
1429  std::forward_as_tuple(other_transform_ops...)),
1430  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
1431 }
1432 
1433 template <typename Queue, typename ... Transformers,
1434  std::size_t ... I>
1435 void parallel_execution_omp::do_pipeline_nested(
1436  Queue & input_queue,
1437  std::tuple<Transformers...> && transform_ops,
1438  std::index_sequence<I...>) const
1439 {
1440  do_pipeline(input_queue,
1441  std::forward<Transformers>(std::get<I>(transform_ops))...);
1442 }
1443 
1444 
1445 } // end namespace grppi
1446 
1447 #else // GRPPI_OMP undefined
1448 
1449 namespace grppi {
1450 
1451 
1454 struct parallel_execution_omp {};
1455 
1461 template <typename E>
1462 constexpr bool is_parallel_execution_omp() {
1463  return false;
1464 }
1465 
1466 }
1467 
1468 #endif // GRPPI_OMP
1469 
1470 #endif
Definition: callable_traits.h:26
parallel_execution_omp(int concurrency_degree, bool order=true) noexcept
Set num_threads to _threads in order to run in parallel.
Definition: parallel_execution_omp.h:72
std::enable_if_t< is_reduce< T >, int > requires_reduce
Definition: reduce_pattern.h:135
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_omp.h:107
typename std::enable_if_t< is_iteration< T >, int > requires_iteration
Definition: iteration_pattern.h:88
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_omp.h:102
parallel_execution_omp() noexcept
Default construct an OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:56
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_omp.h:156
constexpr bool is_supported< parallel_execution_omp >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_omp.h:558
STL namespace.
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: parallel_execution_omp.h:771
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:175
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing...
Definition: iterator.h:147
constexpr bool is_parallel_execution_omp()
Metafunction that determines if type E is parallel_execution_omp.
Definition: parallel_execution_omp.h:549
typename internal::output_value_type< I, T >::type output_value_type
Definition: pipeline_pattern.h:132
queue_mode
Definition: mpmc_queue.h:35
constexpr bool supports_map< parallel_execution_omp >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_omp.h:565
Definition: mpmc_queue.h:38
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_omp.h:662
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_omp.h:149
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:124
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_omp.h:82
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_omp.h:137
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_omp.h:97
constexpr bool supports_stencil< parallel_execution_omp >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_omp.h:586
constexpr bool supports_map_reduce< parallel_execution_omp >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_omp.h:579
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:45
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:92
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_omp.h:616
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern comming from another context that uses mpmc_queues as communication channels...
Definition: parallel_execution_omp.h:308
return_type type
Definition: patterns.h:103
typename std::enable_if_t< is_filter< T >, int > requires_filter
Definition: filter_pattern.h:70
Sequential execution policy.
Definition: sequential_execution.h:41
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_omp.h:90
constexpr bool supports_pipeline< parallel_execution_omp >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_omp.h:600
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: sequential_execution.h:543
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of appliying a function on a input type.
Definition: patterns.h:110
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_omp.h:714
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>(()
Definition: parallel_execution_omp.h:112
constexpr bool supports_reduce< parallel_execution_omp >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_omp.h:572
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a trasnformation to multiple sequences leaving the result in another sequence using available...
Definition: parallel_execution_omp.h:604
void pipeline(Generator &&generate_op, Transformers &&...transform_op) const
Invoke Pipeline pattern.
Definition: parallel_execution_omp.h:786
typename std::enable_if_t< is_context< T >, int > requires_context
Definition: common/context.h:95
bool push(T item)
Definition: mpmc_queue.h:128
typename std::enable_if_t< is_farm< T >, int > requires_farm
Definition: farm_pattern.h:89
constexpr bool supports_divide_conquer< parallel_execution_omp >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_omp.h:593