16 #ifndef GRPPI_OMP_PARALLEL_EXECUTION_OMP_H
17 #define GRPPI_OMP_PARALLEL_EXECUTION_OMP_H
21 #include "../common/mpmc_queue.h"
22 #include "../common/iterator.h"
23 #include "../common/execution_traits.h"
24 #include "../seq/sequential_execution.h"
26 #include <type_traits>
28 #include <experimental/optional>
71 omp_set_num_threads(concurrency_degree_);
78 concurrency_degree_ = degree;
79 omp_set_num_threads(concurrency_degree_);
86 return concurrency_degree_;
118 template <
typename T>
120 return {queue_size_, queue_mode_};
131 template <
typename T,
typename ... Transformers>
143 template <
typename T,
typename ... Transformers>
145 return std::move(make_queue<T>());
155 result = omp_get_thread_num();
174 template <
typename ... InputIterators,
typename OutputIterator,
175 typename Transformer>
176 void map(std::tuple<InputIterators...> firsts,
177 OutputIterator first_out,
178 std::size_t sequence_size, Transformer transform_op)
const;
192 template <
typename InputIterator,
typename Identity,
typename Combiner>
193 auto reduce(InputIterator first, std::size_t sequence_size,
194 Identity && identity, Combiner && combine_op)
const;
210 template <
typename ... InputIterators,
typename Identity,
211 typename Transformer,
typename Combiner>
212 auto map_reduce(std::tuple<InputIterators...> firsts,
213 std::size_t sequence_size,
214 Identity && identity,
215 Transformer && transform_op, Combiner && combine_op)
const;
233 template <
typename ... InputIterators,
typename OutputIterator,
234 typename StencilTransformer,
typename Neighbourhood>
235 void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
236 std::size_t sequence_size,
237 StencilTransformer && transform_op,
238 Neighbourhood && neighbour_op)
const;
252 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
253 [[deprecated(
"Use new interface with predicate argument")]]
255 Divider && divide_op,
257 Combiner && combine_op)
const;
273 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
275 Divider && divide_op,
276 Predicate && predicate_op,
278 Combiner && combine_op)
const;
288 template <
typename Generator,
typename ... Transformers>
289 void pipeline(Generator && generate_op,
290 Transformers && ... transform_op)
const;
302 template <
typename InputType,
typename Transformer,
typename OutputType>
306 do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
311 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
313 Divider && divide_op,
315 Combiner && combine_op,
316 std::atomic<int> & num_threads)
const;
318 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
320 Divider && divide_op,
321 Predicate && predicate_op,
323 Combiner && combine_op,
324 std::atomic<int> & num_threads)
const;
327 template <
typename Queue,
typename Consumer,
329 void do_pipeline(Queue & input_queue, Consumer && consume_op)
const;
331 template <
typename Inqueue,
typename Transformer,
typename output_type,
333 void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
336 template <
typename T,
typename ... Others>
339 template <
typename T>
342 template <
typename Queue,
typename Transformer,
typename ... OtherTransformers,
343 requires_no_pattern<Transformer> = 0>
344 void do_pipeline(Queue & input_queue, Transformer && transform_op,
345 OtherTransformers && ... other_ops)
const;
347 template <
typename Queue,
typename Execution,
typename Transformer,
348 template <
typename,
typename>
class Context,
349 typename ... OtherTransformers,
350 requires_context<Context<Execution,Transformer>> = 0>
351 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
352 OtherTransformers &&... other_ops)
const;
354 template <
typename Queue,
typename Execution,
typename Transformer,
355 template <
typename,
typename>
class Context,
356 typename ... OtherTransformers,
357 requires_context<Context<Execution,Transformer>> = 0>
358 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
359 OtherTransformers &&... other_ops)
const
361 do_pipeline(input_queue, std::move(context_op),
362 std::forward<OtherTransformers>(other_ops)...);
365 template <
typename Queue,
typename FarmTransformer,
366 template <
typename>
class Farm,
367 requires_farm<Farm<FarmTransformer>> = 0>
368 void do_pipeline(Queue & input_queue,
369 Farm<FarmTransformer> & farm_obj)
const
371 do_pipeline(input_queue, std::move(farm_obj));
374 template <
typename Queue,
typename FarmTransformer,
375 template <
typename>
class Farm,
376 requires_farm<Farm<FarmTransformer>> = 0>
377 void do_pipeline(Queue & input_queue,
378 Farm<FarmTransformer> && farm_obj)
const;
380 template <
typename Queue,
typename FarmTransformer,
381 template <
typename>
class Farm,
382 typename ... OtherTransformers,
383 requires_farm<Farm<FarmTransformer>> =0>
384 void do_pipeline(Queue & input_queue,
385 Farm<FarmTransformer> & farm_obj,
386 OtherTransformers && ... other_transform_ops)
const
388 do_pipeline(input_queue, std::move(farm_obj),
389 std::forward<OtherTransformers>(other_transform_ops)...);
392 template <
typename Queue,
typename FarmTransformer,
393 template <
typename>
class Farm,
394 typename ... OtherTransformers,
395 requires_farm<Farm<FarmTransformer>> =0>
396 void do_pipeline(Queue & input_queue,
397 Farm<FarmTransformer> && farm_obj,
398 OtherTransformers && ... other_transform_ops)
const;
400 template <
typename Queue,
typename Predicate,
401 template <
typename>
class Filter,
402 requires_filter<Filter<Predicate>> = 0>
403 void do_pipeline(Queue & input_queue,
404 Filter<Predicate> & filter_obj)
const
406 do_pipeline(input_queue, std::move(filter_obj));
409 template <
typename Queue,
typename Predicate,
410 template <
typename>
class Filter,
411 requires_filter<Filter<Predicate>> = 0>
412 void do_pipeline(Queue & input_queue,
413 Filter<Predicate> && filter_obj)
const;
415 template <
typename Queue,
typename Predicate,
416 template <
typename>
class Filter,
417 typename ... OtherTransformers,
418 requires_filter<Filter<Predicate>> =0>
419 void do_pipeline(Queue & input_queue,
420 Filter<Predicate> & filter_obj,
421 OtherTransformers && ... other_transform_ops)
const
423 do_pipeline(input_queue, std::move(filter_obj),
424 std::forward<OtherTransformers>(other_transform_ops)...);
427 template <
typename Queue,
typename Predicate,
428 template <
typename>
class Filter,
429 typename ... OtherTransformers,
430 requires_filter<Filter<Predicate>> =0>
431 void do_pipeline(Queue & input_queue,
432 Filter<Predicate> && filter_obj,
433 OtherTransformers && ... other_transform_ops)
const;
435 template <
typename Queue,
typename Combiner,
typename Identity,
436 template <
typename C,
typename I>
class Reduce,
437 typename ... OtherTransformers,
438 requires_reduce<Reduce<Combiner,Identity>> = 0>
439 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
440 OtherTransformers && ... other_transform_ops)
const
442 do_pipeline(input_queue, std::move(reduce_obj),
443 std::forward<OtherTransformers>(other_transform_ops)...);
446 template <
typename Queue,
typename Combiner,
typename Identity,
447 template <
typename C,
typename I>
class Reduce,
448 typename ... OtherTransformers,
449 requires_reduce<Reduce<Combiner,Identity>> = 0>
450 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
451 OtherTransformers && ... other_transform_ops)
const;
453 template <
typename Queue,
typename Transformer,
typename Predicate,
454 template <
typename T,
typename P>
class Iteration,
455 typename ... OtherTransformers,
456 requires_iteration<Iteration<Transformer,Predicate>> =0,
457 requires_no_pattern<Transformer> =0>
458 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
459 OtherTransformers && ... other_transform_ops)
const
461 do_pipeline(input_queue, std::move(iteration_obj),
462 std::forward<OtherTransformers>(other_transform_ops)...);
465 template <
typename Queue,
typename Transformer,
typename Predicate,
466 template <
typename T,
typename P>
class Iteration,
467 typename ... OtherTransformers,
468 requires_iteration<Iteration<Transformer,Predicate>> =0,
469 requires_no_pattern<Transformer> =0>
470 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
471 OtherTransformers && ... other_transform_ops)
const;
473 template <
typename Queue,
typename Transformer,
typename Predicate,
474 template <
typename T,
typename P>
class Iteration,
475 typename ... OtherTransformers,
476 requires_iteration<Iteration<Transformer,Predicate>> =0,
477 requires_pipeline<Transformer> =0>
478 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
479 OtherTransformers && ... other_transform_ops)
const;
481 template <
typename Queue,
typename ... Transformers,
482 template <
typename...>
class Pipeline,
483 typename ... OtherTransformers,
485 void do_pipeline(Queue & input_queue,
486 Pipeline<Transformers...> & pipeline_obj,
487 OtherTransformers && ... other_transform_ops)
const
489 do_pipeline(input_queue, std::move(pipeline_obj),
490 std::forward<OtherTransformers>(other_transform_ops)...);
493 template <
typename Queue,
typename ... Transformers,
494 template <
typename...>
class Pipeline,
495 typename ... OtherTransformers,
497 void do_pipeline(Queue & input_queue,
498 Pipeline<Transformers...> && pipeline_obj,
499 OtherTransformers && ... other_transform_ops)
const;
501 template <
typename Queue,
typename ... Transformers,
503 void do_pipeline_nested(
505 std::tuple<Transformers...> && transform_ops,
506 std::index_sequence<I...>)
const;
517 static int impl_concurrency_degree() {
521 result = omp_get_num_threads();
528 int concurrency_degree_;
532 constexpr
static int default_queue_size = 100;
533 int queue_size_ = default_queue_size;
542 template <
typename E>
544 return std::is_same<E, parallel_execution_omp>::value;
596 template <
typename ... InputIterators,
typename OutputIterator,
597 typename Transformer>
599 std::tuple<InputIterators...> firsts,
600 OutputIterator first_out,
601 std::size_t sequence_size, Transformer transform_op)
const
603 #pragma omp parallel for
604 for (std::size_t i=0; i<sequence_size; ++i) {
609 template <
typename InputIterator,
typename Identity,
typename Combiner>
611 InputIterator first, std::size_t sequence_size,
612 Identity && identity,
613 Combiner && combine_op)
const
618 std::vector<result_type> partial_results(concurrency_degree_);
619 auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
620 partial_results[id] = seq.reduce(f, sz, std::forward<Identity>(identity),
621 std::forward<Combiner>(combine_op));
624 const auto chunk_size = sequence_size/concurrency_degree_;
628 #pragma omp single nowait
630 for (
int i=0 ;i<concurrency_degree_-1; ++i) {
631 const auto delta = chunk_size * i;
632 const auto chunk_first = std::next(first,delta);
634 #pragma omp task firstprivate (chunk_first, chunk_size, i)
636 process_chunk(chunk_first, chunk_size, i);
641 const auto delta = chunk_size * (concurrency_degree_ - 1);
642 const auto chunk_first= std::next(first,delta);
643 const auto chunk_sz = sequence_size - delta;
644 process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
649 return seq.reduce(std::next(partial_results.begin()),
650 partial_results.size()-1,
651 partial_results[0], std::forward<Combiner>(combine_op));
654 template <
typename ... InputIterators,
typename Identity,
655 typename Transformer,
typename Combiner>
657 std::tuple<InputIterators...> firsts,
658 std::size_t sequence_size,
660 Transformer && transform_op, Combiner && combine_op)
const
665 std::vector<result_type> partial_results(concurrency_degree_);
667 auto process_chunk = [&](
auto f, std::size_t sz, std::size_t i) {
668 partial_results[i] = seq.map_reduce(
669 f, sz, partial_results[i],
670 std::forward<Transformer>(transform_op),
671 std::forward<Combiner>(combine_op));
674 const auto chunk_size = sequence_size / concurrency_degree_;
678 #pragma omp single nowait
681 for (
int i=0;i<concurrency_degree_-1;++i) {
682 #pragma omp task firstprivate(i)
684 const auto delta = chunk_size * i;
686 process_chunk(chunk_firsts, chunk_size, i);
690 const auto delta = chunk_size * (concurrency_degree_ - 1);
692 auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
693 process_chunk(chunk_firsts,
694 std::distance(std::get<0>(chunk_firsts), chunk_last),
695 concurrency_degree_ - 1);
700 return seq.reduce(std::next(partial_results.begin()),
701 partial_results.size()-1,
702 partial_results[0], std::forward<Combiner>(combine_op));
705 template <
typename ... InputIterators,
typename OutputIterator,
706 typename StencilTransformer,
typename Neighbourhood>
708 std::tuple<InputIterators...> firsts, OutputIterator first_out,
709 std::size_t sequence_size,
710 StencilTransformer && transform_op,
711 Neighbourhood && neighbour_op)
const
714 const auto chunk_size = sequence_size / concurrency_degree_;
715 auto process_chunk = [&](
auto f, std::size_t sz, std::size_t delta) {
716 seq.stencil(f, std::next(first_out,delta), sz,
717 std::forward<StencilTransformer>(transform_op),
718 std::forward<Neighbourhood>(neighbour_op));
723 #pragma omp single nowait
725 for (
int i=0; i<concurrency_degree_-1; ++i) {
726 #pragma omp task firstprivate(i)
728 const auto delta = chunk_size * i;
730 process_chunk(chunk_firsts, chunk_size, delta);
734 const auto delta = chunk_size * (concurrency_degree_ - 1);
736 const auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
737 process_chunk(chunk_firsts,
738 std::distance(std::get<0>(chunk_firsts), chunk_last), delta);
745 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
748 Divider && divide_op,
749 Predicate && predicate_op,
751 Combiner && combine_op)
const
753 std::atomic<int> num_threads{concurrency_degree_-1};
756 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
757 std::forward<Solver>(solve_op),
758 std::forward<Combiner>(combine_op),
763 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
766 Divider && divide_op,
768 Combiner && combine_op)
const
770 std::atomic<int> num_threads{concurrency_degree_-1};
773 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
774 std::forward<Combiner>(combine_op),
778 template <
typename Generator,
typename ... Transformers>
780 Generator && generate_op,
781 Transformers && ... transform_ops)
const
785 using result_type = decay_t<
typename result_of<Generator()>::type>;
786 auto output_queue = make_queue<pair<result_type,long>>();
790 #pragma omp single nowait
792 #pragma omp task shared(generate_op,output_queue)
796 auto item = generate_op();
797 output_queue.push(make_pair(item,order++)) ;
801 do_pipeline(output_queue,
802 forward<Transformers>(transform_ops)...);
809 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
812 Divider && divide_op,
813 Predicate && predicate_op,
815 Combiner && combine_op,
816 std::atomic<int> & num_threads)
const
819 if (num_threads.load()<=0) {
821 std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
822 std::forward<Solver>(solve_op),
823 std::forward<Combiner>(combine_op));
826 if (predicate_op(input)) {
return solve_op(std::forward<Input>(input)); }
827 auto subproblems = divide_op(std::forward<Input>(input));
829 using subresult_type =
830 std::decay_t<
typename std::result_of<Solver(Input)>::type>;
831 std::vector<subresult_type> partials(subproblems.size()-1);
833 auto process_subproblems = [&,
this](
auto it, std::size_t div) {
835 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
836 std::forward<Solver>(solve_op),
837 std::forward<Combiner>(combine_op), num_threads);
841 subresult_type subresult;
845 #pragma omp single nowait
847 auto i = subproblems.begin() + 1;
848 while (i!=subproblems.end() && num_threads.load()>0) {
849 #pragma omp task firstprivate(i,division) \
850 shared(partials,divide_op,solve_op,combine_op,num_threads)
852 process_subproblems(i, division);
859 while (i!=subproblems.end()) {
860 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
861 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
862 std::forward<Solver>(solve_op),
863 std::forward<Combiner>(combine_op));
867 if (num_threads.load()>0) {
868 subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
869 std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
870 std::forward<Solver>(solve_op),
871 std::forward<Combiner>(combine_op), num_threads);
874 subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
875 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
876 std::forward<Solver>(solve_op),
877 std::forward<Combiner>(combine_op));
882 return seq.reduce(partials.begin(), partials.size(),
883 std::forward<subresult_type>(subresult), combine_op);
888 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
891 Divider && divide_op,
893 Combiner && combine_op,
894 std::atomic<int> & num_threads)
const
896 constexpr sequential_execution seq;
897 if (num_threads.load()<=0) {
898 return seq.divide_conquer(std::forward<Input>(input),
899 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
900 std::forward<Combiner>(combine_op));
903 auto subproblems = divide_op(std::forward<Input>(input));
904 if (subproblems.size()<=1) {
return solve_op(std::forward<Input>(input)); }
906 using subresult_type =
907 std::decay_t<
typename std::result_of<Solver(Input)>::type>;
908 std::vector<subresult_type> partials(subproblems.size()-1);
910 auto process_subproblems = [&,
this](
auto it, std::size_t div) {
912 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
913 std::forward<Combiner>(combine_op), num_threads);
917 subresult_type subresult;
921 #pragma omp single nowait
923 auto i = subproblems.begin() + 1;
924 while (i!=subproblems.end() && num_threads.load()>0) {
925 #pragma omp task firstprivate(i,division) \
926 shared(partials,divide_op,solve_op,combine_op,num_threads)
928 process_subproblems(i, division);
935 while (i!=subproblems.end()) {
936 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
937 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
938 std::forward<Combiner>(combine_op));
942 if (num_threads.load()>0) {
943 subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
944 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
945 std::forward<Combiner>(combine_op), num_threads);
948 subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
949 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
950 std::forward<Combiner>(combine_op));
955 return seq.reduce(partials.begin(), partials.size(),
956 std::forward<subresult_type>(subresult), combine_op);
959 template <
typename Queue,
typename Consumer,
960 requires_no_pattern<Consumer>>
961 void parallel_execution_omp::do_pipeline(Queue & input_queue, Consumer && consume_op)
const
964 using gen_value_type =
typename Queue::value_type;
968 auto item = input_queue.pop();
969 if (!item.first)
break;
970 consume_op(*item.first);
975 vector<gen_value_type> elements;
977 auto item = input_queue.pop( );
979 if (current == item.second) {
980 consume_op(*item.first);
984 elements.push_back(item);
986 auto it = find_if(elements.begin(), elements.end(),
987 [&](
auto x) { return x.second== current; });
988 if(it != elements.end()){
989 consume_op(*it->first);
993 item = input_queue.pop( );
995 while(elements.size()>0){
996 auto it = find_if(elements.begin(), elements.end(),
997 [&](
auto x) { return x.second== current; });
998 if(it != elements.end()){
999 consume_op(*it->first);
1007 template <
typename Inqueue,
typename Transformer,
typename output_type,
1008 requires_no_pattern<Transformer>>
1009 void parallel_execution_omp::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1010 mpmc_queue<output_type> & output_queue)
const
1012 using namespace std;
1013 using namespace experimental;
1015 using output_item_value_type =
typename output_type::first_type::value_type;
1017 auto item{input_queue.pop()};
1018 if(!item.first)
break;
1019 auto out = output_item_value_type{transform_op(*item.first)};
1020 output_queue.push(make_pair(out,item.second)) ;
1025 template <
typename Queue,
typename Execution,
typename Transformer,
1026 template <
typename,
typename>
class Context,
1027 typename ... OtherTransformers,
1028 requires_context<Context<Execution,Transformer>>>
1029 void parallel_execution_omp::do_pipeline(Queue & input_queue,
1030 Context<Execution,Transformer> && context_op,
1031 OtherTransformers &&... other_ops)
const
1033 using namespace std;
1034 using namespace experimental;
1036 using input_item_type =
typename Queue::value_type;
1037 using input_item_value_type =
typename input_item_type::first_type::value_type;
1040 using output_optional_type = experimental::optional<output_type>;
1041 using output_item_type = pair <output_optional_type, long> ;
1043 decltype(
auto) output_queue =
1046 #pragma omp task shared(input_queue,context_op,output_queue)
1048 context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1049 output_queue.push(make_pair(output_optional_type{},-1));
1052 do_pipeline(output_queue,
1053 forward<OtherTransformers>(other_ops)... );
1054 #pragma omp taskwait
1057 template <
typename Queue,
typename Transformer,
typename ... OtherTransformers,
1058 requires_no_pattern<Transformer>>
1059 void parallel_execution_omp::do_pipeline(
1060 Queue & input_queue,
1061 Transformer && transform_op,
1062 OtherTransformers && ... other_ops)
const
1064 using namespace std;
1065 using gen_value_type =
typename Queue::value_type;
1066 using input_value_type =
typename gen_value_type::first_type::value_type;
1067 using result_type =
typename result_of<Transformer(input_value_type)>::type;
1069 using output_type = pair<output_value_type,long>;
1071 decltype(
auto) output_queue =
1074 #pragma omp task shared(transform_op, input_queue, output_queue)
1077 auto item = input_queue.pop();
1078 if (!item.first)
break;
1080 output_queue.push(make_pair(out, item.second));
1085 do_pipeline(output_queue,
1086 forward<OtherTransformers>(other_ops)...);
1089 template <
typename Queue,
typename FarmTransformer,
1090 template <
typename>
class Farm,
1091 requires_farm<Farm<FarmTransformer>>>
1092 void parallel_execution_omp::do_pipeline(
1093 Queue & input_queue,
1094 Farm<FarmTransformer> && farm_obj)
const
1096 using namespace std;
1097 using namespace experimental;
1099 for (
int i=0; i<farm_obj.cardinality(); ++i) {
1100 #pragma omp task shared(farm_obj,input_queue)
1102 auto item = input_queue.pop();
1103 while (item.first) {
1104 farm_obj(*item.first);
1105 item = input_queue.pop();
1107 input_queue.push(item);
1110 #pragma omp taskwait
1113 template <
typename Queue,
typename FarmTransformer,
1114 template <
typename>
class Farm,
1115 typename ... OtherTransformers,
1116 requires_farm<Farm<FarmTransformer>>>
1117 void parallel_execution_omp::do_pipeline(
1118 Queue & input_queue,
1119 Farm<FarmTransformer> && farm_obj,
1120 OtherTransformers && ... other_transform_ops)
const
1122 using namespace std;
1123 using namespace experimental;
1124 using gen_value_type =
typename Queue::value_type;
1125 using input_value_type =
typename gen_value_type::first_type::value_type;
1128 using output_optional_type = optional<result_type>;
1129 using output_type = pair<output_optional_type,long>;
1131 decltype(
auto) output_queue =
1136 atomic<
int> done_threads{0};
1137 int ntask = farm_obj.cardinality();
1138 for (
int i=0; i<farm_obj.cardinality(); ++i) {
1139 #pragma omp task shared(done_threads,output_queue,farm_obj,input_queue,ntask)
1141 do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1143 if (done_threads == ntask){
1144 output_queue.push(make_pair(output_optional_type{}, -1));
1146 input_queue.push(gen_value_type{});
1150 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1151 #pragma omp taskwait
1155 template <
typename Queue,
typename Predicate,
1156 template <
typename>
class Filter,
1157 requires_filter<Filter<Predicate>>>
1158 void parallel_execution_omp::do_pipeline(
1160 Filter<Predicate> &&)
const
1164 template <
typename Queue,
typename Predicate,
1165 template <
typename>
class Filter,
1166 typename ... OtherTransformers,
1167 requires_filter<Filter<Predicate>>>
1168 void parallel_execution_omp::do_pipeline(
1169 Queue & input_queue,
1170 Filter<Predicate> && filter_obj,
1171 OtherTransformers && ... other_transform_ops)
const
1173 using namespace std;
1174 using gen_value_type =
typename Queue::value_type;
1175 using input_value_type =
typename gen_value_type::first_type;
1176 auto filter_queue = make_queue<gen_value_type>();
1179 auto filter_task = [&]() {
1181 auto item{input_queue.pop()};
1182 while (item.first) {
1183 if(filter_obj(*item.first)) {
1184 filter_queue.push(item);
1187 filter_queue.push(make_pair(input_value_type{} ,item.second));
1189 item = input_queue.pop();
1191 filter_queue.push (make_pair(input_value_type{}, -1));
1195 decltype(
auto) output_queue =
1199 auto reorder_task = [&]() {
1200 vector<gen_value_type> elements;
1203 auto item = filter_queue.pop();
1205 if (!item.first && item.second == -1)
break;
1206 if (item.second == current) {
1208 output_queue.push(make_pair(item.first, order++));
1213 elements.push_back(item);
1215 auto it = find_if(elements.begin(), elements.end(),
1216 [&](
auto x) { return x.second== current; });
1217 if(it != elements.end()){
1219 output_queue.push(make_pair(it->first,order));
1225 item = filter_queue.pop();
1228 while (elements.size()>0) {
1229 auto it = find_if(elements.begin(), elements.end(),
1230 [&](
auto x) { return x.second== current; });
1231 if(it != elements.end()){
1233 output_queue.push(make_pair(it->first,order));
1239 item = filter_queue.pop();
1242 output_queue.push(item);
1246 #pragma omp task shared(filter_queue,filter_obj,input_queue)
1251 #pragma omp task shared (output_queue,filter_queue)
1256 do_pipeline(output_queue,
1257 forward<OtherTransformers>(other_transform_ops)...);
1259 #pragma omp taskwait
1262 auto filter_task = [&]() {
1263 auto item = input_queue.pop( ) ;
1264 while (item.first) {
1265 if (filter_obj(*item.first)) {
1266 filter_queue.push(item);
1268 item = input_queue.pop();
1270 filter_queue.push(make_pair(input_value_type{}, -1));
1273 #pragma omp task shared(filter_queue,filter_obj,input_queue)
1277 do_pipeline(filter_queue,
1278 std::forward<OtherTransformers>(other_transform_ops)...);
1279 #pragma omp taskwait
1284 template <
typename Queue,
typename Combiner,
typename Identity,
1285 template <
typename C,
typename I>
class Reduce,
1286 typename ... OtherTransformers,
1287 requires_reduce<Reduce<Combiner,Identity>>>
1288 void parallel_execution_omp::do_pipeline(
1289 Queue && input_queue,
1290 Reduce<Combiner,Identity> && reduce_obj,
1291 OtherTransformers && ... other_transform_ops)
const
1293 using namespace std;
1294 using namespace experimental;
1296 using output_item_value_type = optional<decay_t<Identity>>;
1297 using output_item_type = pair<output_item_value_type,long>;
1299 decltype(
auto) output_queue =
1302 auto reduce_task = [&]() {
1303 auto item{input_queue.pop()};
1305 while (item.first) {
1306 reduce_obj.add_item(std::forward<Identity>(*item.first));
1307 item = input_queue.pop();
1308 if (reduce_obj.reduction_needed()) {
1309 constexpr sequential_execution seq;
1310 auto red = reduce_obj.reduce_window(seq);
1311 output_queue.push(make_pair(red, order++));
1314 output_queue.push(make_pair(output_item_value_type{}, -1));
1317 #pragma omp task shared(reduce_obj,input_queue, output_queue)
1321 do_pipeline(output_queue,
1322 std::forward<OtherTransformers>(other_transform_ops)...);
1323 #pragma omp taskwait
1326 template <
typename Queue,
typename Transformer,
typename Predicate,
1327 template <
typename T,
typename P>
class Iteration,
1328 typename ... OtherTransformers,
1329 requires_iteration<Iteration<Transformer,Predicate>>,
1330 requires_no_pattern<Transformer>>
1331 void parallel_execution_omp::do_pipeline(
1332 Queue & input_queue,
1333 Iteration<Transformer,Predicate> && iteration_obj,
1334 OtherTransformers && ... other_transform_ops)
const
1336 using namespace std;
1337 using namespace experimental;
1339 using input_item_type =
typename decay_t<Queue>::value_type;
1340 decltype(
auto) output_queue =
1344 auto iteration_task = [&]() {
1346 auto item = input_queue.pop();
1347 if (!item.first)
break;
1348 auto value = iteration_obj.transform(*item.first);
1349 auto new_item = input_item_type{value,item.second};
1350 if (iteration_obj.predicate(value)) {
1351 output_queue.push(new_item);
1354 input_queue.push(new_item);
1357 while (!input_queue.is_empty()) {
1358 auto item = input_queue.pop();
1359 auto value = iteration_obj.transform(*item.first);
1360 auto new_item = input_item_type{value,item.second};
1361 if (iteration_obj.predicate(value)) {
1362 output_queue.push(new_item);
1365 input_queue.push(new_item);
1368 output_queue.push(input_item_type{{},-1});
1371 #pragma omp task shared(iteration_obj,input_queue,output_queue)
1375 do_pipeline(output_queue,
1376 std::forward<OtherTransformers>(other_transform_ops)...);
1377 #pragma omp taskwait
1381 template <
typename Queue,
typename Transformer,
typename Predicate,
1382 template <
typename T,
typename P>
class Iteration,
1383 typename ... OtherTransformers,
1384 requires_iteration<Iteration<Transformer,Predicate>>,
1385 requires_pipeline<Transformer>>
1386 void parallel_execution_omp::do_pipeline(
1388 Iteration<Transformer,Predicate> &&,
1389 OtherTransformers && ...)
const
1391 static_assert(!is_pipeline<Transformer>,
"Not implemented");
1394 template <
typename Queue,
typename ... Transformers,
1395 template <
typename...>
class Pipeline,
1396 typename ... OtherTransformers,
1398 void parallel_execution_omp::do_pipeline(
1399 Queue & input_queue,
1400 Pipeline<Transformers...> && pipeline_obj,
1401 OtherTransformers && ... other_transform_ops)
const
1405 std::tuple_cat(pipeline_obj.transformers(),
1406 std::forward_as_tuple(other_transform_ops...)),
1407 std::make_index_sequence<
sizeof...(Transformers)+
sizeof...(OtherTransformers)>());
1410 template <
typename Queue,
typename ... Transformers,
1412 void parallel_execution_omp::do_pipeline_nested(
1413 Queue & input_queue,
1414 std::tuple<Transformers...> && transform_ops,
1415 std::index_sequence<I...>)
const
1417 do_pipeline(input_queue,
1418 std::forward<Transformers>(std::get<I>(transform_ops))...);
1421 template<
typename T,
typename... Others>
1435 struct parallel_execution_omp {};
1442 template <
typename E>
Definition: mpmc_queue.h:33
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_omp.h:97
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>(()
Definition: parallel_execution_omp.h:107
void pipeline(Generator &&generate_op, Transformers &&... transform_op) const
Invoke Pipeline pattern.
Definition: parallel_execution_omp.h:779
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a transformation to multiple sequences leaving the result in another sequence using available...
Definition: parallel_execution_omp.h:598
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_omp.h:132
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern coming from another context that uses mpmc_queues as communication channels.
Definition: parallel_execution_omp.h:303
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_omp.h:144
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_omp.h:102
parallel_execution_omp() noexcept
Default construct an OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:51
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_omp.h:77
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_omp.h:151
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: parallel_execution_omp.h:764
parallel_execution_omp(int concurrency_degree, bool order=true) noexcept
Set num_threads to _threads in order to run in parallel.
Definition: parallel_execution_omp.h:67
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_omp.h:92
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_omp.h:656
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_omp.h:707
int concurrency_degree() const noexcept
Get number of grppi threads.
Definition: parallel_execution_omp.h:85
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_omp.h:610
Sequential execution policy.
Definition: sequential_execution.h:36
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: sequential_execution.h:538
Definition: callable_traits.h:21
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
constexpr bool supports_pipeline< parallel_execution_omp >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_omp.h:594
typename internal::output_value_type< I, T >::type output_value_type
Definition: pipeline_pattern.h:128
constexpr bool is_supported< parallel_execution_omp >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_omp.h:552
constexpr bool supports_stencil< parallel_execution_omp >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_omp.h:580
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of applying a function on a input type.
Definition: patterns.h:105
constexpr bool is_parallel_execution_omp()
Metafunction that determines if type E is parallel_execution_omp.
Definition: parallel_execution_omp.h:543
queue_mode
Definition: mpmc_queue.h:30
constexpr bool supports_reduce< parallel_execution_omp >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_omp.h:566
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:170
constexpr bool supports_map< parallel_execution_omp >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_omp.h:559
constexpr bool supports_divide_conquer< parallel_execution_omp >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_omp.h:587
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing....
Definition: iterator.h:142
constexpr bool supports_map_reduce< parallel_execution_omp >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_omp.h:573
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:87
return_type type
Definition: patterns.h:98