GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
parallel_execution_omp.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_OMP_PARALLEL_EXECUTION_OMP_H
17 #define GRPPI_OMP_PARALLEL_EXECUTION_OMP_H
18 
19 #ifdef GRPPI_OMP
20 
21 #include "../common/mpmc_queue.h"
22 #include "../common/iterator.h"
23 #include "../common/execution_traits.h"
24 #include "../seq/sequential_execution.h"
25 
26 #include <type_traits>
27 #include <tuple>
28 #include <experimental/optional>
29 
30 #include <omp.h>
31 
32 
33 namespace grppi {
34 
41 
42 public:
52  parallel_execution_omp{impl_concurrency_degree()}
53  {}
54 
67  parallel_execution_omp(int concurrency_degree, bool order = true) noexcept :
68  concurrency_degree_{concurrency_degree},
69  ordering_{order}
70  {
71  omp_set_num_threads(concurrency_degree_);
72  }
73 
77  void set_concurrency_degree(int degree) noexcept {
78  concurrency_degree_ = degree;
79  omp_set_num_threads(concurrency_degree_);
80  }
81 
85  int concurrency_degree() const noexcept {
86  return concurrency_degree_;
87  }
88 
92  void enable_ordering() noexcept { ordering_=true; }
93 
97  void disable_ordering() noexcept { ordering_=false; }
98 
102  bool is_ordered() const noexcept { return ordering_; }
103 
107  void set_queue_attributes(int size, queue_mode mode) noexcept {
108  queue_size_ = size;
109  queue_mode_ = mode;
110  }
111 
118  template <typename T>
120  return {queue_size_, queue_mode_};
121  }
122 
131  template <typename T, typename ... Transformers>
132  mpmc_queue<T>& get_output_queue(mpmc_queue<T> & queue, Transformers &&...) const {
133  return queue;
134  }
135 
143  template <typename T, typename ... Transformers>
144  mpmc_queue<T> get_output_queue(Transformers &&... ) const{
145  return std::move(make_queue<T>());
146  }
147 
151  int get_thread_id() const noexcept {
152  int result;
153  #pragma omp parallel
154  {
155  result = omp_get_thread_num();
156  }
157  return result;
158  }
159 
174  template <typename ... InputIterators, typename OutputIterator,
175  typename Transformer>
176  void map(std::tuple<InputIterators...> firsts,
177  OutputIterator first_out,
178  std::size_t sequence_size, Transformer transform_op) const;
179 
192  template <typename InputIterator, typename Identity, typename Combiner>
193  auto reduce(InputIterator first, std::size_t sequence_size,
194  Identity && identity, Combiner && combine_op) const;
195 
210  template <typename ... InputIterators, typename Identity,
211  typename Transformer, typename Combiner>
212  auto map_reduce(std::tuple<InputIterators...> firsts,
213  std::size_t sequence_size,
214  Identity && identity,
215  Transformer && transform_op, Combiner && combine_op) const;
216 
233  template <typename ... InputIterators, typename OutputIterator,
234  typename StencilTransformer, typename Neighbourhood>
235  void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
236  std::size_t sequence_size,
237  StencilTransformer && transform_op,
238  Neighbourhood && neighbour_op) const;
239 
252  template <typename Input, typename Divider, typename Solver, typename Combiner>
253  [[deprecated("Use new interface with predicate argument")]]
254  auto divide_conquer(Input && input,
255  Divider && divide_op,
256  Solver && solve_op,
257  Combiner && combine_op) const;
258 
273  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
274  auto divide_conquer(Input && input,
275  Divider && divide_op,
276  Predicate && predicate_op,
277  Solver && solve_op,
278  Combiner && combine_op) const;
279 
280 
288  template <typename Generator, typename ... Transformers>
289  void pipeline(Generator && generate_op,
290  Transformers && ... transform_op) const;
291 
302  template <typename InputType, typename Transformer, typename OutputType>
303  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
304  mpmc_queue<OutputType> &output_queue) const
305  {
306  do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
307  }
308 
309 private:
310 
311  template <typename Input, typename Divider, typename Solver, typename Combiner>
312  auto divide_conquer(Input && input,
313  Divider && divide_op,
314  Solver && solve_op,
315  Combiner && combine_op,
316  std::atomic<int> & num_threads) const;
317 
318  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
319  auto divide_conquer(Input && input,
320  Divider && divide_op,
321  Predicate && predicate_op,
322  Solver && solve_op,
323  Combiner && combine_op,
324  std::atomic<int> & num_threads) const;
325 
326 
327  template <typename Queue, typename Consumer,
329  void do_pipeline(Queue & input_queue, Consumer && consume_op) const;
330 
331  template <typename Inqueue, typename Transformer, typename output_type,
333  void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
334  mpmc_queue<output_type> & output_queue) const;
335 
336  template <typename T, typename ... Others>
337  void do_pipeline(mpmc_queue<T> & in_q, mpmc_queue<T> & same_queue, Others &&... ops) const;
338 
339  template <typename T>
340  void do_pipeline(mpmc_queue<T> &) const {}
341 
342  template <typename Queue, typename Transformer, typename ... OtherTransformers,
343  requires_no_pattern<Transformer> = 0>
344  void do_pipeline(Queue & input_queue, Transformer && transform_op,
345  OtherTransformers && ... other_ops) const;
346 
347  template <typename Queue, typename Execution, typename Transformer,
348  template <typename, typename> class Context,
349  typename ... OtherTransformers,
350  requires_context<Context<Execution,Transformer>> = 0>
351  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
352  OtherTransformers &&... other_ops) const;
353 
354  template <typename Queue, typename Execution, typename Transformer,
355  template <typename, typename> class Context,
356  typename ... OtherTransformers,
357  requires_context<Context<Execution,Transformer>> = 0>
358  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
359  OtherTransformers &&... other_ops) const
360  {
361  do_pipeline(input_queue, std::move(context_op),
362  std::forward<OtherTransformers>(other_ops)...);
363  }
364 
365  template <typename Queue, typename FarmTransformer,
366  template <typename> class Farm,
367  requires_farm<Farm<FarmTransformer>> = 0>
368  void do_pipeline(Queue & input_queue,
369  Farm<FarmTransformer> & farm_obj) const
370  {
371  do_pipeline(input_queue, std::move(farm_obj));
372  }
373 
374  template <typename Queue, typename FarmTransformer,
375  template <typename> class Farm,
376  requires_farm<Farm<FarmTransformer>> = 0>
377  void do_pipeline(Queue & input_queue,
378  Farm<FarmTransformer> && farm_obj) const;
379 
380  template <typename Queue, typename FarmTransformer,
381  template <typename> class Farm,
382  typename ... OtherTransformers,
383  requires_farm<Farm<FarmTransformer>> =0>
384  void do_pipeline(Queue & input_queue,
385  Farm<FarmTransformer> & farm_obj,
386  OtherTransformers && ... other_transform_ops) const
387  {
388  do_pipeline(input_queue, std::move(farm_obj),
389  std::forward<OtherTransformers>(other_transform_ops)...);
390  }
391 
392  template <typename Queue, typename FarmTransformer,
393  template <typename> class Farm,
394  typename ... OtherTransformers,
395  requires_farm<Farm<FarmTransformer>> =0>
396  void do_pipeline(Queue & input_queue,
397  Farm<FarmTransformer> && farm_obj,
398  OtherTransformers && ... other_transform_ops) const;
399 
400  template <typename Queue, typename Predicate,
401  template <typename> class Filter,
402  requires_filter<Filter<Predicate>> = 0>
403  void do_pipeline(Queue & input_queue,
404  Filter<Predicate> & filter_obj) const
405  {
406  do_pipeline(input_queue, std::move(filter_obj));
407  }
408 
409  template <typename Queue, typename Predicate,
410  template <typename> class Filter,
411  requires_filter<Filter<Predicate>> = 0>
412  void do_pipeline(Queue & input_queue,
413  Filter<Predicate> && filter_obj) const;
414 
415  template <typename Queue, typename Predicate,
416  template <typename> class Filter,
417  typename ... OtherTransformers,
418  requires_filter<Filter<Predicate>> =0>
419  void do_pipeline(Queue & input_queue,
420  Filter<Predicate> & filter_obj,
421  OtherTransformers && ... other_transform_ops) const
422  {
423  do_pipeline(input_queue, std::move(filter_obj),
424  std::forward<OtherTransformers>(other_transform_ops)...);
425  }
426 
427  template <typename Queue, typename Predicate,
428  template <typename> class Filter,
429  typename ... OtherTransformers,
430  requires_filter<Filter<Predicate>> =0>
431  void do_pipeline(Queue & input_queue,
432  Filter<Predicate> && filter_obj,
433  OtherTransformers && ... other_transform_ops) const;
434 
435  template <typename Queue, typename Combiner, typename Identity,
436  template <typename C, typename I> class Reduce,
437  typename ... OtherTransformers,
438  requires_reduce<Reduce<Combiner,Identity>> = 0>
439  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
440  OtherTransformers && ... other_transform_ops) const
441  {
442  do_pipeline(input_queue, std::move(reduce_obj),
443  std::forward<OtherTransformers>(other_transform_ops)...);
444  }
445 
446  template <typename Queue, typename Combiner, typename Identity,
447  template <typename C, typename I> class Reduce,
448  typename ... OtherTransformers,
449  requires_reduce<Reduce<Combiner,Identity>> = 0>
450  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
451  OtherTransformers && ... other_transform_ops) const;
452 
453  template <typename Queue, typename Transformer, typename Predicate,
454  template <typename T, typename P> class Iteration,
455  typename ... OtherTransformers,
456  requires_iteration<Iteration<Transformer,Predicate>> =0,
457  requires_no_pattern<Transformer> =0>
458  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
459  OtherTransformers && ... other_transform_ops) const
460  {
461  do_pipeline(input_queue, std::move(iteration_obj),
462  std::forward<OtherTransformers>(other_transform_ops)...);
463  }
464 
465  template <typename Queue, typename Transformer, typename Predicate,
466  template <typename T, typename P> class Iteration,
467  typename ... OtherTransformers,
468  requires_iteration<Iteration<Transformer,Predicate>> =0,
469  requires_no_pattern<Transformer> =0>
470  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
471  OtherTransformers && ... other_transform_ops) const;
472 
473  template <typename Queue, typename Transformer, typename Predicate,
474  template <typename T, typename P> class Iteration,
475  typename ... OtherTransformers,
476  requires_iteration<Iteration<Transformer,Predicate>> =0,
477  requires_pipeline<Transformer> =0>
478  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
479  OtherTransformers && ... other_transform_ops) const;
480 
481  template <typename Queue, typename ... Transformers,
482  template <typename...> class Pipeline,
483  typename ... OtherTransformers,
484  requires_pipeline<Pipeline<Transformers...>> = 0>
485  void do_pipeline(Queue & input_queue,
486  Pipeline<Transformers...> & pipeline_obj,
487  OtherTransformers && ... other_transform_ops) const
488  {
489  do_pipeline(input_queue, std::move(pipeline_obj),
490  std::forward<OtherTransformers>(other_transform_ops)...);
491  }
492 
493  template <typename Queue, typename ... Transformers,
494  template <typename...> class Pipeline,
495  typename ... OtherTransformers,
496  requires_pipeline<Pipeline<Transformers...>> = 0>
497  void do_pipeline(Queue & input_queue,
498  Pipeline<Transformers...> && pipeline_obj,
499  OtherTransformers && ... other_transform_ops) const;
500 
501  template <typename Queue, typename ... Transformers,
502  std::size_t ... I>
503  void do_pipeline_nested(
504  Queue & input_queue,
505  std::tuple<Transformers...> && transform_ops,
506  std::index_sequence<I...>) const;
507 
508 private:
509 
517  static int impl_concurrency_degree() {
518  int result;
519  #pragma omp parallel
520  {
521  result = omp_get_num_threads();
522  }
523  return result;
524  }
525 
526 private:
527 
528  int concurrency_degree_;
529 
530  bool ordering_;
531 
532  constexpr static int default_queue_size = 100;
533  int queue_size_ = default_queue_size;
534 
535  queue_mode queue_mode_ = queue_mode::blocking;
536 };
537 
542 template <typename E>
543 constexpr bool is_parallel_execution_omp() {
544  return std::is_same<E, parallel_execution_omp>::value;
545 }
546 
551 template <>
552 constexpr bool is_supported<parallel_execution_omp>() { return true; }
553 
558 template <>
559 constexpr bool supports_map<parallel_execution_omp>() { return true; }
560 
565 template <>
566 constexpr bool supports_reduce<parallel_execution_omp>() { return true; }
567 
572 template <>
573 constexpr bool supports_map_reduce<parallel_execution_omp>() { return true; }
574 
579 template <>
580 constexpr bool supports_stencil<parallel_execution_omp>() { return true; }
581 
586 template <>
587 constexpr bool supports_divide_conquer<parallel_execution_omp>() { return true; }
588 
593 template <>
594 constexpr bool supports_pipeline<parallel_execution_omp>() { return true; }
595 
596 template <typename ... InputIterators, typename OutputIterator,
597  typename Transformer>
599  std::tuple<InputIterators...> firsts,
600  OutputIterator first_out,
601  std::size_t sequence_size, Transformer transform_op) const
602 {
603  #pragma omp parallel for
604  for (std::size_t i=0; i<sequence_size; ++i) {
605  first_out[i] = apply_iterators_indexed(transform_op, firsts, i);
606  }
607 }
608 
609 template <typename InputIterator, typename Identity, typename Combiner>
611  InputIterator first, std::size_t sequence_size,
612  Identity && identity,
613  Combiner && combine_op) const
614 {
615  constexpr sequential_execution seq;
616 
617  using result_type = std::decay_t<Identity>;
618  std::vector<result_type> partial_results(concurrency_degree_);
619  auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
620  partial_results[id] = seq.reduce(f, sz, std::forward<Identity>(identity),
621  std::forward<Combiner>(combine_op));
622  };
623 
624  const auto chunk_size = sequence_size/concurrency_degree_;
625 
626  #pragma omp parallel
627  {
628  #pragma omp single nowait
629  {
630  for (int i=0 ;i<concurrency_degree_-1; ++i) {
631  const auto delta = chunk_size * i;
632  const auto chunk_first = std::next(first,delta);
633 
634  #pragma omp task firstprivate (chunk_first, chunk_size, i)
635  {
636  process_chunk(chunk_first, chunk_size, i);
637  }
638  }
639 
640  //Main thread
641  const auto delta = chunk_size * (concurrency_degree_ - 1);
642  const auto chunk_first= std::next(first,delta);
643  const auto chunk_sz = sequence_size - delta;
644  process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
645  #pragma omp taskwait
646  }
647  }
648 
649  return seq.reduce(std::next(partial_results.begin()),
650  partial_results.size()-1,
651  partial_results[0], std::forward<Combiner>(combine_op));
652 }
653 
654 template <typename ... InputIterators, typename Identity,
655  typename Transformer, typename Combiner>
657  std::tuple<InputIterators...> firsts,
658  std::size_t sequence_size,
659  Identity &&,
660  Transformer && transform_op, Combiner && combine_op) const
661 {
662  constexpr sequential_execution seq;
663 
664  using result_type = std::decay_t<Identity>;
665  std::vector<result_type> partial_results(concurrency_degree_);
666 
667  auto process_chunk = [&](auto f, std::size_t sz, std::size_t i) {
668  partial_results[i] = seq.map_reduce(
669  f, sz, partial_results[i],
670  std::forward<Transformer>(transform_op),
671  std::forward<Combiner>(combine_op));
672  };
673 
674  const auto chunk_size = sequence_size / concurrency_degree_;
675 
676  #pragma omp parallel
677  {
678  #pragma omp single nowait
679  {
680 
681  for (int i=0;i<concurrency_degree_-1;++i) {
682  #pragma omp task firstprivate(i)
683  {
684  const auto delta = chunk_size * i;
685  const auto chunk_firsts = iterators_next(firsts,delta);
686  process_chunk(chunk_firsts, chunk_size, i);
687  }
688  }
689 
690  const auto delta = chunk_size * (concurrency_degree_ - 1);
691  auto chunk_firsts = iterators_next(firsts,delta);
692  auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
693  process_chunk(chunk_firsts,
694  std::distance(std::get<0>(chunk_firsts), chunk_last),
695  concurrency_degree_ - 1);
696  #pragma omp taskwait
697  }
698  }
699 
700  return seq.reduce(std::next(partial_results.begin()),
701  partial_results.size()-1,
702  partial_results[0], std::forward<Combiner>(combine_op));
703 }
704 
705 template <typename ... InputIterators, typename OutputIterator,
706  typename StencilTransformer, typename Neighbourhood>
708  std::tuple<InputIterators...> firsts, OutputIterator first_out,
709  std::size_t sequence_size,
710  StencilTransformer && transform_op,
711  Neighbourhood && neighbour_op) const
712 {
713  constexpr sequential_execution seq;
714  const auto chunk_size = sequence_size / concurrency_degree_;
715  auto process_chunk = [&](auto f, std::size_t sz, std::size_t delta) {
716  seq.stencil(f, std::next(first_out,delta), sz,
717  std::forward<StencilTransformer>(transform_op),
718  std::forward<Neighbourhood>(neighbour_op));
719  };
720 
721  #pragma omp parallel
722  {
723  #pragma omp single nowait
724  {
725  for (int i=0; i<concurrency_degree_-1; ++i) {
726  #pragma omp task firstprivate(i)
727  {
728  const auto delta = chunk_size * i;
729  const auto chunk_firsts = iterators_next(firsts,delta);
730  process_chunk(chunk_firsts, chunk_size, delta);
731  }
732  }
733 
734  const auto delta = chunk_size * (concurrency_degree_ - 1);
735  const auto chunk_firsts = iterators_next(firsts,delta);
736  const auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
737  process_chunk(chunk_firsts,
738  std::distance(std::get<0>(chunk_firsts), chunk_last), delta);
739 
740  #pragma omp taskwait
741  }
742  }
743 }
744 
745 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
747  Input && input,
748  Divider && divide_op,
749  Predicate && predicate_op,
750  Solver && solve_op,
751  Combiner && combine_op) const
752 {
753  std::atomic<int> num_threads{concurrency_degree_-1};
754 
755  return divide_conquer(std::forward<Input>(input),
756  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
757  std::forward<Solver>(solve_op),
758  std::forward<Combiner>(combine_op),
759  num_threads);
760 }
761 
762 
763 template <typename Input, typename Divider, typename Solver, typename Combiner>
765  Input && input,
766  Divider && divide_op,
767  Solver && solve_op,
768  Combiner && combine_op) const
769 {
770  std::atomic<int> num_threads{concurrency_degree_-1};
771 
772  return divide_conquer(std::forward<Input>(input),
773  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
774  std::forward<Combiner>(combine_op),
775  num_threads);
776 }
777 
778 template <typename Generator, typename ... Transformers>
780  Generator && generate_op,
781  Transformers && ... transform_ops) const
782 {
783  using namespace std;
784 
785  using result_type = decay_t<typename result_of<Generator()>::type>;
786  auto output_queue = make_queue<pair<result_type,long>>();
787 
788  #pragma omp parallel
789  {
790  #pragma omp single nowait
791  {
792  #pragma omp task shared(generate_op,output_queue)
793  {
794  long order = 0;
795  for (;;) {
796  auto item = generate_op();
797  output_queue.push(make_pair(item,order++)) ;
798  if (!item) break;
799  }
800  }
801  do_pipeline(output_queue,
802  forward<Transformers>(transform_ops)...);
803  #pragma omp taskwait
804  }
805  }
806 }
807 
808 // PRIVATE MEMBERS
809 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
811  Input && input,
812  Divider && divide_op,
813  Predicate && predicate_op,
814  Solver && solve_op,
815  Combiner && combine_op,
816  std::atomic<int> & num_threads) const
817 {
818  constexpr sequential_execution seq;
819  if (num_threads.load()<=0) {
820  return seq.divide_conquer(std::forward<Input>(input),
821  std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
822  std::forward<Solver>(solve_op),
823  std::forward<Combiner>(combine_op));
824  }
825 
826  if (predicate_op(input)) { return solve_op(std::forward<Input>(input)); }
827  auto subproblems = divide_op(std::forward<Input>(input));
828 
829  using subresult_type =
830  std::decay_t<typename std::result_of<Solver(Input)>::type>;
831  std::vector<subresult_type> partials(subproblems.size()-1);
832 
833  auto process_subproblems = [&,this](auto it, std::size_t div) {
834  partials[div] = this->divide_conquer(std::forward<Input>(*it),
835  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
836  std::forward<Solver>(solve_op),
837  std::forward<Combiner>(combine_op), num_threads);
838  };
839 
840  int division = 0;
841  subresult_type subresult;
842 
843  #pragma omp parallel
844  {
845  #pragma omp single nowait
846  {
847  auto i = subproblems.begin() + 1;
848  while (i!=subproblems.end() && num_threads.load()>0) {
849  #pragma omp task firstprivate(i,division) \
850  shared(partials,divide_op,solve_op,combine_op,num_threads)
851  {
852  process_subproblems(i, division);
853  }
854  num_threads --;
855  i++;
856  division++;
857  }
858 
859  while (i!=subproblems.end()) {
860  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
861  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
862  std::forward<Solver>(solve_op),
863  std::forward<Combiner>(combine_op));
864  }
865 
866  //Main thread works on the first subproblem.
867  if (num_threads.load()>0) {
868  subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
869  std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
870  std::forward<Solver>(solve_op),
871  std::forward<Combiner>(combine_op), num_threads);
872  }
873  else {
874  subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
875  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
876  std::forward<Solver>(solve_op),
877  std::forward<Combiner>(combine_op));
878  }
879  #pragma omp taskwait
880  }
881  }
882  return seq.reduce(partials.begin(), partials.size(),
883  std::forward<subresult_type>(subresult), combine_op);
884 }
885 
886 
887 
888 template <typename Input, typename Divider, typename Solver, typename Combiner>
890  Input && input,
891  Divider && divide_op,
892  Solver && solve_op,
893  Combiner && combine_op,
894  std::atomic<int> & num_threads) const
895 {
896  constexpr sequential_execution seq;
897  if (num_threads.load()<=0) {
898  return seq.divide_conquer(std::forward<Input>(input),
899  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
900  std::forward<Combiner>(combine_op));
901  }
902 
903  auto subproblems = divide_op(std::forward<Input>(input));
904  if (subproblems.size()<=1) { return solve_op(std::forward<Input>(input)); }
905 
906  using subresult_type =
907  std::decay_t<typename std::result_of<Solver(Input)>::type>;
908  std::vector<subresult_type> partials(subproblems.size()-1);
909 
910  auto process_subproblems = [&,this](auto it, std::size_t div) {
911  partials[div] = this->divide_conquer(std::forward<Input>(*it),
912  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
913  std::forward<Combiner>(combine_op), num_threads);
914  };
915 
916  int division = 0;
917  subresult_type subresult;
918 
919  #pragma omp parallel
920  {
921  #pragma omp single nowait
922  {
923  auto i = subproblems.begin() + 1;
924  while (i!=subproblems.end() && num_threads.load()>0) {
925  #pragma omp task firstprivate(i,division) \
926  shared(partials,divide_op,solve_op,combine_op,num_threads)
927  {
928  process_subproblems(i, division);
929  }
930  num_threads --;
931  i++;
932  division++;
933  }
934 
935  while (i!=subproblems.end()) {
936  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
937  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
938  std::forward<Combiner>(combine_op));
939  }
940 
941  //Main thread works on the first subproblem.
942  if (num_threads.load()>0) {
943  subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
944  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
945  std::forward<Combiner>(combine_op), num_threads);
946  }
947  else {
948  subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
949  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
950  std::forward<Combiner>(combine_op));
951  }
952  #pragma omp taskwait
953  }
954  }
955  return seq.reduce(partials.begin(), partials.size(),
956  std::forward<subresult_type>(subresult), combine_op);
957 }
958 
959 template <typename Queue, typename Consumer,
960  requires_no_pattern<Consumer>>
961 void parallel_execution_omp::do_pipeline(Queue & input_queue, Consumer && consume_op) const
962 {
963  using namespace std;
964  using gen_value_type = typename Queue::value_type;
965 
966  if (!is_ordered()) {
967  for (;;) {
968  auto item = input_queue.pop();
969  if (!item.first) break;
970  consume_op(*item.first);
971  }
972  return;
973  }
974 
975  vector<gen_value_type> elements;
976  long current = 0;
977  auto item = input_queue.pop( );
978  while (item.first) {
979  if (current == item.second) {
980  consume_op(*item.first);
981  current ++;
982  }
983  else {
984  elements.push_back(item);
985  }
986  auto it = find_if(elements.begin(), elements.end(),
987  [&](auto x) { return x.second== current; });
988  if(it != elements.end()){
989  consume_op(*it->first);
990  elements.erase(it);
991  current++;
992  }
993  item = input_queue.pop( );
994  }
995  while(elements.size()>0){
996  auto it = find_if(elements.begin(), elements.end(),
997  [&](auto x) { return x.second== current; });
998  if(it != elements.end()){
999  consume_op(*it->first);
1000  elements.erase(it);
1001  current++;
1002  }
1003  }
1004 }
1005 
1006 
1007 template <typename Inqueue, typename Transformer, typename output_type,
1008  requires_no_pattern<Transformer>>
1009 void parallel_execution_omp::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1010  mpmc_queue<output_type> & output_queue) const
1011 {
1012  using namespace std;
1013  using namespace experimental;
1014 
1015  using output_item_value_type = typename output_type::first_type::value_type;
1016  for (;;) {
1017  auto item{input_queue.pop()};
1018  if(!item.first) break;
1019  auto out = output_item_value_type{transform_op(*item.first)};
1020  output_queue.push(make_pair(out,item.second)) ;
1021  }
1022 }
1023 
1024 
1025 template <typename Queue, typename Execution, typename Transformer,
1026  template <typename, typename> class Context,
1027  typename ... OtherTransformers,
1028  requires_context<Context<Execution,Transformer>>>
1029 void parallel_execution_omp::do_pipeline(Queue & input_queue,
1030  Context<Execution,Transformer> && context_op,
1031  OtherTransformers &&... other_ops) const
1032 {
1033  using namespace std;
1034  using namespace experimental;
1035 
1036  using input_item_type = typename Queue::value_type;
1037  using input_item_value_type = typename input_item_type::first_type::value_type;
1038 
1040  using output_optional_type = experimental::optional<output_type>;
1041  using output_item_type = pair <output_optional_type, long> ;
1042 
1043  decltype(auto) output_queue =
1044  get_output_queue<output_item_type>(other_ops...);
1045 
1046  #pragma omp task shared(input_queue,context_op,output_queue)
1047  {
1048  context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1049  output_queue.push(make_pair(output_optional_type{},-1));
1050  }
1051 
1052  do_pipeline(output_queue,
1053  forward<OtherTransformers>(other_ops)... );
1054  #pragma omp taskwait
1055 }
1056 
1057 template <typename Queue, typename Transformer, typename ... OtherTransformers,
1058  requires_no_pattern<Transformer>>
1059 void parallel_execution_omp::do_pipeline(
1060  Queue & input_queue,
1061  Transformer && transform_op,
1062  OtherTransformers && ... other_ops) const
1063 {
1064  using namespace std;
1065  using gen_value_type = typename Queue::value_type;
1066  using input_value_type = typename gen_value_type::first_type::value_type;
1067  using result_type = typename result_of<Transformer(input_value_type)>::type;
1068  using output_value_type = experimental::optional<result_type>;
1069  using output_type = pair<output_value_type,long>;
1070 
1071  decltype(auto) output_queue =
1072  get_output_queue<output_type>(other_ops...);
1073 
1074  #pragma omp task shared(transform_op, input_queue, output_queue)
1075  {
1076  for (;;) {
1077  auto item = input_queue.pop();
1078  if (!item.first) break;
1079  auto out = output_value_type{transform_op(*item.first)};
1080  output_queue.push(make_pair(out, item.second));
1081  }
1082  output_queue.push(make_pair(output_value_type{}, -1));
1083  }
1084 
1085  do_pipeline(output_queue,
1086  forward<OtherTransformers>(other_ops)...);
1087 }
1088 
1089 template <typename Queue, typename FarmTransformer,
1090  template <typename> class Farm,
1091  requires_farm<Farm<FarmTransformer>>>
1092 void parallel_execution_omp::do_pipeline(
1093  Queue & input_queue,
1094  Farm<FarmTransformer> && farm_obj) const
1095 {
1096  using namespace std;
1097  using namespace experimental;
1098 
1099  for (int i=0; i<farm_obj.cardinality(); ++i) {
1100  #pragma omp task shared(farm_obj,input_queue)
1101  {
1102  auto item = input_queue.pop();
1103  while (item.first) {
1104  farm_obj(*item.first);
1105  item = input_queue.pop();
1106  }
1107  input_queue.push(item);
1108  }
1109  }
1110  #pragma omp taskwait
1111 }
1112 
1113 template <typename Queue, typename FarmTransformer,
1114  template <typename> class Farm,
1115  typename ... OtherTransformers,
1116  requires_farm<Farm<FarmTransformer>>>
1117 void parallel_execution_omp::do_pipeline(
1118  Queue & input_queue,
1119  Farm<FarmTransformer> && farm_obj,
1120  OtherTransformers && ... other_transform_ops) const
1121 {
1122  using namespace std;
1123  using namespace experimental;
1124  using gen_value_type = typename Queue::value_type;
1125  using input_value_type = typename gen_value_type::first_type::value_type;
1126 
1128  using output_optional_type = optional<result_type>;
1129  using output_type = pair<output_optional_type,long>;
1130 
1131  decltype(auto) output_queue =
1132  get_output_queue<output_type>(other_transform_ops...);
1133 
1134 // auto output_queue = make_queue<output_type>();
1135 
1136  atomic<int> done_threads{0};
1137  int ntask = farm_obj.cardinality();
1138  for (int i=0; i<farm_obj.cardinality(); ++i) {
1139  #pragma omp task shared(done_threads,output_queue,farm_obj,input_queue,ntask)
1140  {
1141  do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1142  done_threads++;
1143  if (done_threads == ntask){
1144  output_queue.push(make_pair(output_optional_type{}, -1));
1145  }else{
1146  input_queue.push(gen_value_type{});
1147  }
1148  }
1149  }
1150  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1151  #pragma omp taskwait
1152 }
1153 
1154 
1155 template <typename Queue, typename Predicate,
1156  template <typename> class Filter,
1157  requires_filter<Filter<Predicate>>>
1158 void parallel_execution_omp::do_pipeline(
1159  Queue &,
1160  Filter<Predicate> &&) const
1161 {
1162 }
1163 
1164 template <typename Queue, typename Predicate,
1165  template <typename> class Filter,
1166  typename ... OtherTransformers,
1167  requires_filter<Filter<Predicate>>>
1168 void parallel_execution_omp::do_pipeline(
1169  Queue & input_queue,
1170  Filter<Predicate> && filter_obj,
1171  OtherTransformers && ... other_transform_ops) const
1172 {
1173  using namespace std;
1174  using gen_value_type = typename Queue::value_type;
1175  using input_value_type = typename gen_value_type::first_type;
1176  auto filter_queue = make_queue<gen_value_type>();
1177 
1178  if (is_ordered()) {
1179  auto filter_task = [&]() {
1180  {
1181  auto item{input_queue.pop()};
1182  while (item.first) {
1183  if(filter_obj(*item.first)) {
1184  filter_queue.push(item);
1185  }
1186  else {
1187  filter_queue.push(make_pair(input_value_type{} ,item.second));
1188  }
1189  item = input_queue.pop();
1190  }
1191  filter_queue.push (make_pair(input_value_type{}, -1));
1192  }
1193  };
1194 
1195  decltype(auto) output_queue =
1196  get_output_queue<gen_value_type>(other_transform_ops...);
1197 
1198 
1199  auto reorder_task = [&]() {
1200  vector<gen_value_type> elements;
1201  int current = 0;
1202  long order = 0;
1203  auto item = filter_queue.pop();
1204  for (;;) {
1205  if (!item.first && item.second == -1) break;
1206  if (item.second == current) {
1207  if (item.first) {
1208  output_queue.push(make_pair(item.first, order++));
1209  }
1210  current++;
1211  }
1212  else {
1213  elements.push_back(item);
1214  }
1215  auto it = find_if(elements.begin(), elements.end(),
1216  [&](auto x) { return x.second== current; });
1217  if(it != elements.end()){
1218  if (it->first) {
1219  output_queue.push(make_pair(it->first,order));
1220  order++;
1221  }
1222  elements.erase(it);
1223  current++;
1224  }
1225  item = filter_queue.pop();
1226  }
1227 
1228  while (elements.size()>0) {
1229  auto it = find_if(elements.begin(), elements.end(),
1230  [&](auto x) { return x.second== current; });
1231  if(it != elements.end()){
1232  if (it->first) {
1233  output_queue.push(make_pair(it->first,order));
1234  order++;
1235  }
1236  elements.erase(it);
1237  current++;
1238  }
1239  item = filter_queue.pop();
1240  }
1241 
1242  output_queue.push(item);
1243  };
1244 
1245 
1246  #pragma omp task shared(filter_queue,filter_obj,input_queue)
1247  {
1248  filter_task();
1249  }
1250 
1251  #pragma omp task shared (output_queue,filter_queue)
1252  {
1253  reorder_task();
1254  }
1255 
1256  do_pipeline(output_queue,
1257  forward<OtherTransformers>(other_transform_ops)...);
1258 
1259  #pragma omp taskwait
1260  }
1261  else {
1262  auto filter_task = [&]() {
1263  auto item = input_queue.pop( ) ;
1264  while (item.first) {
1265  if (filter_obj(*item.first)) {
1266  filter_queue.push(item);
1267  }
1268  item = input_queue.pop();
1269  }
1270  filter_queue.push(make_pair(input_value_type{}, -1));
1271  };
1272 
1273  #pragma omp task shared(filter_queue,filter_obj,input_queue)
1274  {
1275  filter_task();
1276  }
1277  do_pipeline(filter_queue,
1278  std::forward<OtherTransformers>(other_transform_ops)...);
1279  #pragma omp taskwait
1280  }
1281 }
1282 
1283 
1284 template <typename Queue, typename Combiner, typename Identity,
1285  template <typename C, typename I> class Reduce,
1286  typename ... OtherTransformers,
1287  requires_reduce<Reduce<Combiner,Identity>>>
1288 void parallel_execution_omp::do_pipeline(
1289  Queue && input_queue,
1290  Reduce<Combiner,Identity> && reduce_obj,
1291  OtherTransformers && ... other_transform_ops) const
1292 {
1293  using namespace std;
1294  using namespace experimental;
1295 
1296  using output_item_value_type = optional<decay_t<Identity>>;
1297  using output_item_type = pair<output_item_value_type,long>;
1298 
1299  decltype(auto) output_queue =
1300  get_output_queue<output_item_type>(other_transform_ops...);
1301 
1302  auto reduce_task = [&]() {
1303  auto item{input_queue.pop()};
1304  int order = 0;
1305  while (item.first) {
1306  reduce_obj.add_item(std::forward<Identity>(*item.first));
1307  item = input_queue.pop();
1308  if (reduce_obj.reduction_needed()) {
1309  constexpr sequential_execution seq;
1310  auto red = reduce_obj.reduce_window(seq);
1311  output_queue.push(make_pair(red, order++));
1312  }
1313  }
1314  output_queue.push(make_pair(output_item_value_type{}, -1));
1315  };
1316 
1317  #pragma omp task shared(reduce_obj,input_queue, output_queue)
1318  {
1319  reduce_task();
1320  }
1321  do_pipeline(output_queue,
1322  std::forward<OtherTransformers>(other_transform_ops)...);
1323  #pragma omp taskwait
1324 }
1325 
1326 template <typename Queue, typename Transformer, typename Predicate,
1327  template <typename T, typename P> class Iteration,
1328  typename ... OtherTransformers,
1329  requires_iteration<Iteration<Transformer,Predicate>>,
1330  requires_no_pattern<Transformer>>
1331 void parallel_execution_omp::do_pipeline(
1332  Queue & input_queue,
1333  Iteration<Transformer,Predicate> && iteration_obj,
1334  OtherTransformers && ... other_transform_ops) const
1335 {
1336  using namespace std;
1337  using namespace experimental;
1338 
1339  using input_item_type = typename decay_t<Queue>::value_type;
1340  decltype(auto) output_queue =
1341  get_output_queue<input_item_type>(other_transform_ops...);
1342 
1343 
1344  auto iteration_task = [&]() {
1345  for (;;) {
1346  auto item = input_queue.pop();
1347  if (!item.first) break;
1348  auto value = iteration_obj.transform(*item.first);
1349  auto new_item = input_item_type{value,item.second};
1350  if (iteration_obj.predicate(value)) {
1351  output_queue.push(new_item);
1352  }
1353  else {
1354  input_queue.push(new_item);
1355  }
1356  }
1357  while (!input_queue.is_empty()) {
1358  auto item = input_queue.pop();
1359  auto value = iteration_obj.transform(*item.first);
1360  auto new_item = input_item_type{value,item.second};
1361  if (iteration_obj.predicate(value)) {
1362  output_queue.push(new_item);
1363  }
1364  else {
1365  input_queue.push(new_item);
1366  }
1367  }
1368  output_queue.push(input_item_type{{},-1});
1369  };
1370 
1371  #pragma omp task shared(iteration_obj,input_queue,output_queue)
1372  {
1373  iteration_task();
1374  }
1375  do_pipeline(output_queue,
1376  std::forward<OtherTransformers>(other_transform_ops)...);
1377  #pragma omp taskwait
1378 
1379 }
1380 
1381 template <typename Queue, typename Transformer, typename Predicate,
1382  template <typename T, typename P> class Iteration,
1383  typename ... OtherTransformers,
1384  requires_iteration<Iteration<Transformer,Predicate>>,
1385  requires_pipeline<Transformer>>
1386 void parallel_execution_omp::do_pipeline(
1387  Queue &,
1388  Iteration<Transformer,Predicate> &&,
1389  OtherTransformers && ...) const
1390 {
1391  static_assert(!is_pipeline<Transformer>, "Not implemented");
1392 }
1393 
1394 template <typename Queue, typename ... Transformers,
1395  template <typename...> class Pipeline,
1396  typename ... OtherTransformers,
1397  requires_pipeline<Pipeline<Transformers...>>>
1398 void parallel_execution_omp::do_pipeline(
1399  Queue & input_queue,
1400  Pipeline<Transformers...> && pipeline_obj,
1401  OtherTransformers && ... other_transform_ops) const
1402 {
1403  do_pipeline_nested(
1404  input_queue,
1405  std::tuple_cat(pipeline_obj.transformers(),
1406  std::forward_as_tuple(other_transform_ops...)),
1407  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
1408 }
1409 
1410 template <typename Queue, typename ... Transformers,
1411  std::size_t ... I>
1412 void parallel_execution_omp::do_pipeline_nested(
1413  Queue & input_queue,
1414  std::tuple<Transformers...> && transform_ops,
1415  std::index_sequence<I...>) const
1416 {
1417  do_pipeline(input_queue,
1418  std::forward<Transformers>(std::get<I>(transform_ops))...);
1419 }
1420 
1421 template<typename T, typename... Others>
1422 void parallel_execution_omp::do_pipeline(mpmc_queue <T> &, mpmc_queue <T> &, Others &&...) const
1423 { }
1424 
1425 
1426 } // end namespace grppi
1427 
1428 #else // GRPPI_OMP undefined
1429 
1430 namespace grppi {
1431 
1432 
1435 struct parallel_execution_omp {};
1436 
1442 template <typename E>
1443 constexpr bool is_parallel_execution_omp() {
1444  return false;
1445 }
1446 
1447 }
1448 
1449 #endif // GRPPI_OMP
1450 
1451 #endif
Definition: mpmc_queue.h:33
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_omp.h:97
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>(()
Definition: parallel_execution_omp.h:107
void pipeline(Generator &&generate_op, Transformers &&... transform_op) const
Invoke Pipeline pattern.
Definition: parallel_execution_omp.h:779
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a transformation to multiple sequences leaving the result in another sequence using available...
Definition: parallel_execution_omp.h:598
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_omp.h:132
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern coming from another context that uses mpmc_queues as communication channels.
Definition: parallel_execution_omp.h:303
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_omp.h:144
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_omp.h:102
parallel_execution_omp() noexcept
Default construct an OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:51
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_omp.h:77
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_omp.h:151
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: parallel_execution_omp.h:764
parallel_execution_omp(int concurrency_degree, bool order=true) noexcept
Set num_threads to _threads in order to run in parallel.
Definition: parallel_execution_omp.h:67
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_omp.h:92
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_omp.h:656
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_omp.h:707
int concurrency_degree() const noexcept
Get number of grppi threads.
Definition: parallel_execution_omp.h:85
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_omp.h:610
Sequential execution policy.
Definition: sequential_execution.h:36
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: sequential_execution.h:538
Definition: callable_traits.h:21
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
constexpr bool supports_pipeline< parallel_execution_omp >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_omp.h:594
typename internal::output_value_type< I, T >::type output_value_type
Definition: pipeline_pattern.h:128
constexpr bool is_supported< parallel_execution_omp >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_omp.h:552
constexpr bool supports_stencil< parallel_execution_omp >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_omp.h:580
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of applying a function on a input type.
Definition: patterns.h:105
constexpr bool is_parallel_execution_omp()
Metafunction that determines if type E is parallel_execution_omp.
Definition: parallel_execution_omp.h:543
queue_mode
Definition: mpmc_queue.h:30
constexpr bool supports_reduce< parallel_execution_omp >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_omp.h:566
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:170
constexpr bool supports_map< parallel_execution_omp >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_omp.h:559
constexpr bool supports_divide_conquer< parallel_execution_omp >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_omp.h:587
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing....
Definition: iterator.h:142
constexpr bool supports_map_reduce< parallel_execution_omp >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_omp.h:573
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:87
return_type type
Definition: patterns.h:98