21 #ifndef GRPPI_OMP_PARALLEL_EXECUTION_OMP_H 22 #define GRPPI_OMP_PARALLEL_EXECUTION_OMP_H 26 #include "../common/mpmc_queue.h" 27 #include "../common/iterator.h" 28 #include "../common/execution_traits.h" 29 #include "../seq/sequential_execution.h" 31 #include <type_traits> 33 #include <experimental/optional> 73 concurrency_degree_{concurrency_degree},
76 omp_set_num_threads(concurrency_degree_);
83 concurrency_degree_ = degree;
84 omp_set_num_threads(concurrency_degree_);
91 return concurrency_degree_;
123 template <
typename T>
125 return {queue_size_, queue_mode_};
136 template <
typename T,
typename ... Transformers>
148 template <
typename T,
typename ... Transformers>
150 return std::move(make_queue<T>());
160 result = omp_get_thread_num();
179 template <
typename ... InputIterators,
typename OutputIterator,
180 typename Transformer>
181 void map(std::tuple<InputIterators...> firsts,
182 OutputIterator first_out,
183 std::size_t sequence_size, Transformer transform_op)
const;
197 template <
typename InputIterator,
typename Identity,
typename Combiner>
198 auto reduce(InputIterator first, std::size_t sequence_size,
199 Identity && identity, Combiner && combine_op)
const;
215 template <
typename ... InputIterators,
typename Identity,
216 typename Transformer,
typename Combiner>
217 auto map_reduce(std::tuple<InputIterators...> firsts,
218 std::size_t sequence_size,
219 Identity && identity,
220 Transformer && transform_op, Combiner && combine_op)
const;
238 template <
typename ... InputIterators,
typename OutputIterator,
239 typename StencilTransformer,
typename Neighbourhood>
240 void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
241 std::size_t sequence_size,
242 StencilTransformer && transform_op,
243 Neighbourhood && neighbour_op)
const;
257 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
258 [[deprecated(
"Use new interface with predicate argument")]]
260 Divider && divide_op,
262 Combiner && combine_op)
const;
278 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
280 Divider && divide_op,
281 Predicate && predicate_op,
283 Combiner && combine_op)
const;
293 template <
typename Generator,
typename ... Transformers>
294 void pipeline(Generator && generate_op,
295 Transformers && ... transform_op)
const;
307 template <
typename InputType,
typename Transformer,
typename OutputType>
311 do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
316 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
318 Divider && divide_op,
320 Combiner && combine_op,
321 std::atomic<int> & num_threads)
const;
323 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
325 Divider && divide_op,
326 Predicate && predicate_op,
328 Combiner && combine_op,
329 std::atomic<int> & num_threads)
const;
332 template <
typename Queue,
typename Consumer,
334 void do_pipeline(Queue & input_queue, Consumer && consume_op)
const;
336 template <
typename Inqueue,
typename Transformer,
typename output_type,
338 void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
341 template <
typename T,
typename ... Others>
345 template <
typename T>
348 template <
typename Queue,
typename Transformer,
typename ... OtherTransformers,
349 requires_no_pattern<Transformer> = 0>
350 void do_pipeline(Queue & input_queue, Transformer && transform_op,
351 OtherTransformers && ... other_ops)
const;
353 template <
typename Queue,
typename Execution,
typename Transformer,
354 template <
typename,
typename>
class Context,
355 typename ... OtherTransformers,
357 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
358 OtherTransformers &&... other_ops)
const;
360 template <
typename Queue,
typename Execution,
typename Transformer,
361 template <
typename,
typename>
class Context,
362 typename ... OtherTransformers,
363 requires_context<Context<Execution,Transformer>> = 0>
364 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
365 OtherTransformers &&... other_ops)
const 367 do_pipeline(input_queue, std::move(context_op),
368 std::forward<OtherTransformers>(other_ops)...);
371 template <
typename Queue,
typename FarmTransformer,
372 template <
typename>
class Farm,
374 void do_pipeline(Queue & input_queue,
375 Farm<FarmTransformer> & farm_obj)
const 377 do_pipeline(input_queue, std::move(farm_obj));
380 template <
typename Queue,
typename FarmTransformer,
381 template <
typename>
class Farm,
382 requires_farm<Farm<FarmTransformer>> = 0>
383 void do_pipeline(Queue & input_queue,
384 Farm<FarmTransformer> && farm_obj)
const;
386 template <
typename Queue,
typename FarmTransformer,
387 template <
typename>
class Farm,
388 typename ... OtherTransformers,
389 requires_farm<Farm<FarmTransformer>> =0>
390 void do_pipeline(Queue & input_queue,
391 Farm<FarmTransformer> & farm_obj,
392 OtherTransformers && ... other_transform_ops)
const 394 do_pipeline(input_queue, std::move(farm_obj),
395 std::forward<OtherTransformers>(other_transform_ops)...);
398 template <
typename Queue,
typename FarmTransformer,
399 template <
typename>
class Farm,
400 typename ... OtherTransformers,
401 requires_farm<Farm<FarmTransformer>> =0>
402 void do_pipeline(Queue & input_queue,
403 Farm<FarmTransformer> && farm_obj,
404 OtherTransformers && ... other_transform_ops)
const;
406 template <
typename Queue,
typename Predicate,
407 template <
typename>
class Filter,
409 void do_pipeline(Queue & input_queue,
410 Filter<Predicate> & filter_obj)
const 412 do_pipeline(input_queue, std::move(filter_obj));
415 template <
typename Queue,
typename Predicate,
416 template <
typename>
class Filter,
417 requires_filter<Filter<Predicate>> = 0>
418 void do_pipeline(Queue & input_queue,
419 Filter<Predicate> && filter_obj)
const;
421 template <
typename Queue,
typename Predicate,
422 template <
typename>
class Filter,
423 typename ... OtherTransformers,
424 requires_filter<Filter<Predicate>> =0>
425 void do_pipeline(Queue & input_queue,
426 Filter<Predicate> & filter_obj,
427 OtherTransformers && ... other_transform_ops)
const 429 do_pipeline(input_queue, std::move(filter_obj),
430 std::forward<OtherTransformers>(other_transform_ops)...);
433 template <
typename Queue,
typename Predicate,
434 template <
typename>
class Filter,
435 typename ... OtherTransformers,
436 requires_filter<Filter<Predicate>> =0>
437 void do_pipeline(Queue & input_queue,
438 Filter<Predicate> && filter_obj,
439 OtherTransformers && ... other_transform_ops)
const;
441 template <
typename Queue,
typename Combiner,
typename Identity,
442 template <
typename C,
typename I>
class Reduce,
443 typename ... OtherTransformers,
445 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
446 OtherTransformers && ... other_transform_ops)
const 448 do_pipeline(input_queue, std::move(reduce_obj),
449 std::forward<OtherTransformers>(other_transform_ops)...);
452 template <
typename Queue,
typename Combiner,
typename Identity,
453 template <
typename C,
typename I>
class Reduce,
454 typename ... OtherTransformers,
455 requires_reduce<Reduce<Combiner,Identity>> = 0>
456 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
457 OtherTransformers && ... other_transform_ops)
const;
459 template <
typename Queue,
typename Transformer,
typename Predicate,
460 template <
typename T,
typename P>
class Iteration,
461 typename ... OtherTransformers,
463 requires_no_pattern<Transformer> =0>
464 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
465 OtherTransformers && ... other_transform_ops)
const 467 do_pipeline(input_queue, std::move(iteration_obj),
468 std::forward<OtherTransformers>(other_transform_ops)...);
471 template <
typename Queue,
typename Transformer,
typename Predicate,
472 template <
typename T,
typename P>
class Iteration,
473 typename ... OtherTransformers,
474 requires_iteration<Iteration<Transformer,Predicate>> =0,
475 requires_no_pattern<Transformer> =0>
476 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
477 OtherTransformers && ... other_transform_ops)
const;
479 template <
typename Queue,
typename Transformer,
typename Predicate,
480 template <
typename T,
typename P>
class Iteration,
481 typename ... OtherTransformers,
482 requires_iteration<Iteration<Transformer,Predicate>> =0,
484 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
485 OtherTransformers && ... other_transform_ops)
const;
487 template <
typename Queue,
typename ... Transformers,
488 template <
typename...>
class Pipeline,
489 typename ... OtherTransformers,
491 void do_pipeline(Queue & input_queue,
492 Pipeline<Transformers...> & pipeline_obj,
493 OtherTransformers && ... other_transform_ops)
const 495 do_pipeline(input_queue, std::move(pipeline_obj),
496 std::forward<OtherTransformers>(other_transform_ops)...);
499 template <
typename Queue,
typename ... Transformers,
500 template <
typename...>
class Pipeline,
501 typename ... OtherTransformers,
503 void do_pipeline(Queue & input_queue,
504 Pipeline<Transformers...> && pipeline_obj,
505 OtherTransformers && ... other_transform_ops)
const;
507 template <
typename Queue,
typename ... Transformers,
509 void do_pipeline_nested(
511 std::tuple<Transformers...> && transform_ops,
512 std::index_sequence<I...>)
const;
523 static int impl_concurrency_degree() {
527 result = omp_get_num_threads();
534 int concurrency_degree_;
538 constexpr
static int default_queue_size = 100;
539 int queue_size_ = default_queue_size;
548 template <
typename E>
550 return std::is_same<E, parallel_execution_omp>::value;
602 template <
typename ... InputIterators,
typename OutputIterator,
603 typename Transformer>
605 std::tuple<InputIterators...> firsts,
606 OutputIterator first_out,
607 std::size_t sequence_size, Transformer transform_op)
const 609 #pragma omp parallel for 610 for (std::size_t i=0; i<sequence_size; ++i) {
615 template <
typename InputIterator,
typename Identity,
typename Combiner>
617 InputIterator first, std::size_t sequence_size,
618 Identity && identity,
619 Combiner && combine_op)
const 624 std::vector<result_type> partial_results(concurrency_degree_);
625 auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
626 partial_results[id] = seq.reduce(f, sz, std::forward<Identity>(identity),
627 std::forward<Combiner>(combine_op));
630 const auto chunk_size = sequence_size/concurrency_degree_;
634 #pragma omp single nowait 636 for (
int i=0 ;i<concurrency_degree_-1; ++i) {
637 const auto delta = chunk_size * i;
638 const auto chunk_first = std::next(first,delta);
640 #pragma omp task firstprivate (chunk_first, chunk_size, i) 642 process_chunk(chunk_first, chunk_size, i);
647 const auto delta = chunk_size * (concurrency_degree_ - 1);
648 const auto chunk_first= std::next(first,delta);
649 const auto chunk_sz = sequence_size - delta;
650 process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
655 return seq.reduce(std::next(partial_results.begin()),
656 partial_results.size()-1,
657 partial_results[0], std::forward<Combiner>(combine_op));
660 template <
typename ... InputIterators,
typename Identity,
661 typename Transformer,
typename Combiner>
663 std::tuple<InputIterators...> firsts,
664 std::size_t sequence_size,
665 Identity && identity,
666 Transformer && transform_op, Combiner && combine_op)
const 671 std::vector<result_type> partial_results(concurrency_degree_);
673 auto process_chunk = [&](
auto f, std::size_t sz, std::size_t i) {
674 partial_results[i] = seq.map_reduce(
675 f, sz, partial_results[i],
676 std::forward<Transformer>(transform_op),
677 std::forward<Combiner>(combine_op));
680 const auto chunk_size = sequence_size / concurrency_degree_;
684 #pragma omp single nowait 687 for (
int i=0;i<concurrency_degree_-1;++i) {
688 #pragma omp task firstprivate(i) 690 const auto delta = chunk_size * i;
692 const auto chunk_last = std::next(std::get<0>(chunk_firsts), chunk_size);
693 process_chunk(chunk_firsts, chunk_size, i);
697 const auto delta = chunk_size * (concurrency_degree_ - 1);
699 auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
700 process_chunk(chunk_firsts,
701 std::distance(std::get<0>(chunk_firsts), chunk_last),
702 concurrency_degree_ - 1);
707 return seq.reduce(std::next(partial_results.begin()),
708 partial_results.size()-1,
709 partial_results[0], std::forward<Combiner>(combine_op));
712 template <
typename ... InputIterators,
typename OutputIterator,
713 typename StencilTransformer,
typename Neighbourhood>
715 std::tuple<InputIterators...> firsts, OutputIterator first_out,
716 std::size_t sequence_size,
717 StencilTransformer && transform_op,
718 Neighbourhood && neighbour_op)
const 721 const auto chunk_size = sequence_size / concurrency_degree_;
722 auto process_chunk = [&](
auto f, std::size_t sz, std::size_t delta) {
723 seq.stencil(f, std::next(first_out,delta), sz,
724 std::forward<StencilTransformer>(transform_op),
725 std::forward<Neighbourhood>(neighbour_op));
730 #pragma omp single nowait 732 for (
int i=0; i<concurrency_degree_-1; ++i) {
733 #pragma omp task firstprivate(i) 735 const auto delta = chunk_size * i;
737 process_chunk(chunk_firsts, chunk_size, delta);
741 const auto delta = chunk_size * (concurrency_degree_ - 1);
743 const auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
744 process_chunk(chunk_firsts,
745 std::distance(std::get<0>(chunk_firsts), chunk_last), delta);
752 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
755 Divider && divide_op,
756 Predicate && predicate_op,
758 Combiner && combine_op)
const 760 std::atomic<int> num_threads{concurrency_degree_-1};
763 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
764 std::forward<Solver>(solve_op),
765 std::forward<Combiner>(combine_op),
770 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
773 Divider && divide_op,
775 Combiner && combine_op)
const 777 std::atomic<int> num_threads{concurrency_degree_-1};
780 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
781 std::forward<Combiner>(combine_op),
785 template <
typename Generator,
typename ... Transformers>
787 Generator && generate_op,
788 Transformers && ... transform_ops)
const 792 using result_type = decay_t<typename result_of<Generator()>::type>;
793 auto output_queue = make_queue<pair<result_type,long>>();
797 #pragma omp single nowait 799 #pragma omp task shared(generate_op,output_queue) 803 auto item = generate_op();
804 output_queue.push(make_pair(item,order++)) ;
808 do_pipeline(output_queue,
809 forward<Transformers>(transform_ops)...);
816 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
819 Divider && divide_op,
820 Predicate && predicate_op,
822 Combiner && combine_op,
823 std::atomic<int> & num_threads)
const 826 if (num_threads.load()<=0) {
828 std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
829 std::forward<Solver>(solve_op),
830 std::forward<Combiner>(combine_op));
833 if (predicate_op(input)) {
return solve_op(std::forward<Input>(input)); }
834 auto subproblems = divide_op(std::forward<Input>(input));
836 using subresult_type =
837 std::decay_t<typename std::result_of<Solver(Input)>::type>;
838 std::vector<subresult_type> partials(subproblems.size()-1);
840 auto process_subproblems = [&,
this](
auto it, std::size_t div) {
842 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
843 std::forward<Solver>(solve_op),
844 std::forward<Combiner>(combine_op), num_threads);
848 subresult_type subresult;
852 #pragma omp single nowait 854 auto i = subproblems.begin() + 1;
855 while (i!=subproblems.end() && num_threads.load()>0) {
856 #pragma omp task firstprivate(i,division) \ 857 shared(partials,divide_op,solve_op,combine_op,num_threads) 859 process_subproblems(i, division);
866 while (i!=subproblems.end()) {
867 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
868 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
869 std::forward<Solver>(solve_op),
870 std::forward<Combiner>(combine_op));
874 if (num_threads.load()>0) {
875 subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
876 std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
877 std::forward<Solver>(solve_op),
878 std::forward<Combiner>(combine_op), num_threads);
881 subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
882 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
883 std::forward<Solver>(solve_op),
884 std::forward<Combiner>(combine_op));
889 return seq.reduce(partials.begin(), partials.size(),
890 std::forward<subresult_type>(subresult), combine_op);
895 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
898 Divider && divide_op,
900 Combiner && combine_op,
901 std::atomic<int> & num_threads)
const 904 if (num_threads.load()<=0) {
906 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
907 std::forward<Combiner>(combine_op));
910 auto subproblems = divide_op(std::forward<Input>(input));
911 if (subproblems.size()<=1) {
return solve_op(std::forward<Input>(input)); }
913 using subresult_type =
914 std::decay_t<typename std::result_of<Solver(Input)>::type>;
915 std::vector<subresult_type> partials(subproblems.size()-1);
917 auto process_subproblems = [&,
this](
auto it, std::size_t div) {
919 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
920 std::forward<Combiner>(combine_op), num_threads);
924 subresult_type subresult;
928 #pragma omp single nowait 930 auto i = subproblems.begin() + 1;
931 while (i!=subproblems.end() && num_threads.load()>0) {
932 #pragma omp task firstprivate(i,division) \ 933 shared(partials,divide_op,solve_op,combine_op,num_threads) 935 process_subproblems(i, division);
942 while (i!=subproblems.end()) {
943 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
944 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
945 std::forward<Combiner>(combine_op));
949 if (num_threads.load()>0) {
950 subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
951 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
952 std::forward<Combiner>(combine_op), num_threads);
955 subresult = seq.divide_conquer(std::forward<Input>(*subproblems.begin()),
956 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
957 std::forward<Combiner>(combine_op));
962 return seq.reduce(partials.begin(), partials.size(),
963 std::forward<subresult_type>(subresult), combine_op);
966 template <
typename Queue,
typename Consumer,
968 void parallel_execution_omp::do_pipeline(Queue & input_queue, Consumer && consume_op)
const 971 using gen_value_type =
typename Queue::value_type;
975 auto item = input_queue.pop();
976 if (!item.first)
break;
977 consume_op(*item.first);
982 vector<gen_value_type> elements;
984 auto item = input_queue.pop( );
986 if (current == item.second) {
987 consume_op(*item.first);
991 elements.push_back(item);
993 for (
auto it=elements.begin(); it!=elements.end(); it++) {
994 if (it->second == current) {
995 consume_op(*it->first);
1001 item = input_queue.pop( );
1003 while(elements.size()>0){
1004 for(
auto it = elements.begin(); it != elements.end(); it++){
1005 if(it->second == current) {
1006 consume_op(*it->first);
1016 template <
typename Inqueue,
typename Transformer,
typename output_type,
1018 void parallel_execution_omp::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1021 using namespace std;
1022 using namespace experimental;
1024 using input_item_type =
typename Inqueue::value_type;
1025 using input_item_value_type =
typename input_item_type::first_type::value_type;
1027 using output_optional_type =
typename output_type::first_type;
1028 using output_item_value_type =
typename output_type::first_type::value_type;
1030 auto item{input_queue.pop()};
1031 if(!item.first)
break;
1032 auto out = output_item_value_type{transform_op(*item.first)};
1033 output_queue.
push(make_pair(out,item.second)) ;
1038 template <
typename Queue,
typename Execution,
typename Transformer,
1039 template <
typename,
typename>
class Context,
1040 typename ... OtherTransformers,
1042 void parallel_execution_omp::do_pipeline(Queue & input_queue,
1043 Context<Execution,Transformer> && context_op,
1044 OtherTransformers &&... other_ops)
const 1046 using namespace std;
1047 using namespace experimental;
1049 using input_item_type =
typename Queue::value_type;
1050 using input_item_value_type =
typename input_item_type::first_type::value_type;
1053 using output_optional_type = experimental::optional<output_type>;
1054 using output_item_type = pair <output_optional_type, long> ;
1056 decltype(
auto) output_queue =
1057 get_output_queue<output_item_type>(other_ops...);
1059 #pragma omp task shared(input_queue,context_op,output_queue) 1061 context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1062 output_queue.
push(make_pair(output_optional_type{},-1));
1065 do_pipeline(output_queue,
1066 forward<OtherTransformers>(other_ops)... );
1067 #pragma omp taskwait 1070 template <
typename Queue,
typename Transformer,
typename ... OtherTransformers,
1072 void parallel_execution_omp::do_pipeline(
1073 Queue & input_queue,
1074 Transformer && transform_op,
1075 OtherTransformers && ... other_ops)
const 1077 using namespace std;
1078 using gen_value_type =
typename Queue::value_type;
1079 using input_value_type =
typename gen_value_type::first_type::value_type;
1080 using result_type =
typename result_of<Transformer(input_value_type)>::type;
1082 using output_type = pair<output_value_type,long>;
1084 decltype(
auto) output_queue =
1085 get_output_queue<output_type>(other_ops...);
1087 #pragma omp task shared(transform_op, input_queue, output_queue) 1090 auto item = input_queue.pop();
1091 if (!item.first)
break;
1093 output_queue.
push(make_pair(out, item.second));
1098 do_pipeline(output_queue,
1099 forward<OtherTransformers>(other_ops)...);
1102 template <
typename Queue,
typename FarmTransformer,
1103 template <
typename>
class Farm,
1105 void parallel_execution_omp::do_pipeline(
1106 Queue & input_queue,
1107 Farm<FarmTransformer> && farm_obj)
const 1109 using namespace std;
1110 using namespace experimental;
1111 using gen_value_type =
typename Queue::value_type;
1112 using input_value_type =
typename gen_value_type::first_type::value_type;
1114 for (
int i=0; i<farm_obj.cardinality(); ++i) {
1115 #pragma omp task shared(farm_obj,input_queue) 1117 auto item = input_queue.pop();
1118 while (item.first) {
1119 farm_obj(*item.first);
1120 item = input_queue.pop();
1122 input_queue.push(item);
1125 #pragma omp taskwait 1128 template <
typename Queue,
typename FarmTransformer,
1129 template <
typename>
class Farm,
1130 typename ... OtherTransformers,
1132 void parallel_execution_omp::do_pipeline(
1133 Queue & input_queue,
1134 Farm<FarmTransformer> && farm_obj,
1135 OtherTransformers && ... other_transform_ops)
const 1137 using namespace std;
1138 using namespace experimental;
1139 using gen_value_type =
typename Queue::value_type;
1140 using input_value_type =
typename gen_value_type::first_type::value_type;
1143 using output_optional_type = optional<result_type>;
1144 using output_type = pair<output_optional_type,long>;
1146 decltype(
auto) output_queue =
1147 get_output_queue<output_type>(other_transform_ops...);
1151 atomic<int> done_threads{0};
1152 int ntask = farm_obj.cardinality();
1153 for (
int i=0; i<farm_obj.cardinality(); ++i) {
1154 #pragma omp task shared(done_threads,output_queue,farm_obj,input_queue,ntask) 1156 do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1158 if (done_threads == ntask){
1159 output_queue.
push(make_pair(output_optional_type{}, -1));
1161 input_queue.push(gen_value_type{});
1165 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1166 #pragma omp taskwait 1170 template <
typename Queue,
typename Predicate,
1171 template <
typename>
class Filter,
1173 void parallel_execution_omp::do_pipeline(
1174 Queue & input_queue,
1175 Filter<Predicate> && filter_obj)
const 1179 template <
typename Queue,
typename Predicate,
1180 template <
typename>
class Filter,
1181 typename ... OtherTransformers,
1183 void parallel_execution_omp::do_pipeline(
1184 Queue & input_queue,
1185 Filter<Predicate> && filter_obj,
1186 OtherTransformers && ... other_transform_ops)
const 1188 using namespace std;
1189 using gen_value_type =
typename Queue::value_type;
1190 using input_value_type =
typename gen_value_type::first_type;
1191 auto filter_queue = make_queue<gen_value_type>();
1194 auto filter_task = [&]() {
1196 auto item{input_queue.pop()};
1197 while (item.first) {
1198 if(filter_obj(*item.first)) {
1199 filter_queue.push(item);
1202 filter_queue.push(make_pair(input_value_type{} ,item.second));
1204 item = input_queue.pop();
1206 filter_queue.push (make_pair(input_value_type{}, -1));
1210 decltype(
auto) output_queue =
1211 get_output_queue<gen_value_type>(other_transform_ops...);
1214 auto reorder_task = [&]() {
1215 vector<gen_value_type> elements;
1218 auto item = filter_queue.pop();
1220 if (!item.first && item.second == -1)
break;
1221 if (item.second == current) {
1223 output_queue.
push(make_pair(item.first, order++));
1228 elements.push_back(item);
1230 for (
auto it=elements.begin(); it<elements.end(); it++) {
1231 if ((*it).second==current) {
1233 output_queue.
push(make_pair((*it).first,order++));
1240 item = filter_queue.pop();
1243 while (elements.size()>0) {
1244 for (
auto it=elements.begin(); it<elements.end(); it++) {
1245 if ((*it).second == current) {
1247 output_queue.
push(make_pair((*it).first,order++));
1256 output_queue.
push(item);
1260 #pragma omp task shared(filter_queue,filter_obj,input_queue) 1265 #pragma omp task shared (output_queue,filter_queue) 1270 do_pipeline(output_queue,
1271 forward<OtherTransformers>(other_transform_ops)...);
1273 #pragma omp taskwait 1276 auto filter_task = [&]() {
1277 auto item = input_queue.pop( ) ;
1278 while (item.first) {
1279 if (filter_obj(*item.first)) {
1280 filter_queue.push(item);
1282 item = input_queue.pop();
1284 filter_queue.push(make_pair(input_value_type{}, -1));
1287 #pragma omp task shared(filter_queue,filter_obj,input_queue) 1291 do_pipeline(filter_queue,
1292 std::forward<OtherTransformers>(other_transform_ops)...);
1293 #pragma omp taskwait 1298 template <
typename Queue,
typename Combiner,
typename Identity,
1299 template <
typename C,
typename I>
class Reduce,
1300 typename ... OtherTransformers,
1302 void parallel_execution_omp::do_pipeline(
1303 Queue && input_queue,
1304 Reduce<Combiner,Identity> && reduce_obj,
1305 OtherTransformers && ... other_transform_ops)
const 1307 using namespace std;
1308 using namespace experimental;
1310 using input_item_type =
typename decay_t<Queue>::value_type;
1311 using input_item_value_type =
typename input_item_type::first_type::value_type;
1312 using output_item_value_type = optional<decay_t<Identity>>;
1313 using output_item_type = pair<output_item_value_type,long>;
1315 decltype(
auto) output_queue =
1316 get_output_queue<output_item_type>(other_transform_ops...);
1318 auto reduce_task = [&,
this]() {
1319 auto item{input_queue.pop()};
1321 while (item.first) {
1322 reduce_obj.add_item(std::forward<Identity>(*item.first));
1323 item = input_queue.pop();
1324 if (reduce_obj.reduction_needed()) {
1326 auto red = reduce_obj.reduce_window(seq);
1327 output_queue.
push(make_pair(red, order++));
1330 output_queue.
push(make_pair(output_item_value_type{}, -1));
1333 #pragma omp task shared(reduce_obj,input_queue, output_queue) 1337 do_pipeline(output_queue,
1338 std::forward<OtherTransformers>(other_transform_ops)...);
1339 #pragma omp taskwait 1342 template <
typename Queue,
typename Transformer,
typename Predicate,
1343 template <
typename T,
typename P>
class Iteration,
1344 typename ... OtherTransformers,
1347 void parallel_execution_omp::do_pipeline(
1348 Queue & input_queue,
1349 Iteration<Transformer,Predicate> && iteration_obj,
1350 OtherTransformers && ... other_transform_ops)
const 1352 using namespace std;
1353 using namespace experimental;
1355 using input_item_type =
typename decay_t<Queue>::value_type;
1356 using input_item_value_type =
typename input_item_type::first_type::value_type;
1357 decltype(
auto) output_queue =
1358 get_output_queue<input_item_type>(other_transform_ops...);
1361 auto iteration_task = [&]() {
1363 auto item = input_queue.pop();
1364 if (!item.first)
break;
1365 std::cerr <<
"Processing: <" << *item.first <<
" , " << item.second <<
">\n";
1366 auto value = iteration_obj.transform(*item.first);
1367 auto new_item = input_item_type{value,item.second};
1368 if (iteration_obj.predicate(value)) {
1369 std::cerr <<
"Sending to output" 1370 << *new_item.first <<
" , " << new_item.second <<
">\n";
1371 output_queue.
push(new_item);
1374 std::cerr <<
"Sending to input" 1375 << *new_item.first <<
" , " << new_item.second <<
">\n";
1376 input_queue.push(new_item);
1379 while (!input_queue.is_empty()) {
1380 auto item = input_queue.pop();
1381 std::cerr <<
"Processing: <" << *item.first <<
" , " << item.second <<
">\n";
1382 auto value = iteration_obj.transform(*item.first);
1383 auto new_item = input_item_type{value,item.second};
1384 if (iteration_obj.predicate(value)) {
1385 output_queue.
push(new_item);
1388 input_queue.push(new_item);
1391 output_queue.
push(input_item_type{{},-1});
1394 #pragma omp task shared(iteration_obj,input_queue,output_queue) 1398 do_pipeline(output_queue,
1399 std::forward<OtherTransformers>(other_transform_ops)...);
1400 #pragma omp taskwait 1404 template <
typename Queue,
typename Transformer,
typename Predicate,
1405 template <
typename T,
typename P>
class Iteration,
1406 typename ... OtherTransformers,
1407 requires_iteration<Iteration<Transformer,Predicate>>,
1409 void parallel_execution_omp::do_pipeline(
1410 Queue & input_queue,
1411 Iteration<Transformer,Predicate> && iteration_obj,
1412 OtherTransformers && ... other_transform_ops)
const 1414 static_assert(!is_pipeline<Transformer>,
"Not implemented");
1417 template <
typename Queue,
typename ... Transformers,
1418 template <
typename...>
class Pipeline,
1419 typename ... OtherTransformers,
1421 void parallel_execution_omp::do_pipeline(
1422 Queue & input_queue,
1423 Pipeline<Transformers...> && pipeline_obj,
1424 OtherTransformers && ... other_transform_ops)
const 1428 std::tuple_cat(pipeline_obj.transformers(),
1429 std::forward_as_tuple(other_transform_ops...)),
1430 std::make_index_sequence<
sizeof...(Transformers)+
sizeof...(OtherTransformers)>());
1433 template <
typename Queue,
typename ... Transformers,
1435 void parallel_execution_omp::do_pipeline_nested(
1436 Queue & input_queue,
1437 std::tuple<Transformers...> && transform_ops,
1438 std::index_sequence<I...>)
const 1440 do_pipeline(input_queue,
1441 std::forward<Transformers>(std::get<I>(transform_ops))...);
1447 #else // GRPPI_OMP undefined 1461 template <
typename E>
Definition: callable_traits.h:26
parallel_execution_omp(int concurrency_degree, bool order=true) noexcept
Set num_threads to _threads in order to run in parallel.
Definition: parallel_execution_omp.h:72
std::enable_if_t< is_reduce< T >, int > requires_reduce
Definition: reduce_pattern.h:135
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_omp.h:107
typename std::enable_if_t< is_iteration< T >, int > requires_iteration
Definition: iteration_pattern.h:88
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_omp.h:102
parallel_execution_omp() noexcept
Default construct an OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:56
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_omp.h:156
constexpr bool is_supported< parallel_execution_omp >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_omp.h:558
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: parallel_execution_omp.h:771
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:175
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing...
Definition: iterator.h:147
constexpr bool is_parallel_execution_omp()
Metafunction that determines if type E is parallel_execution_omp.
Definition: parallel_execution_omp.h:549
typename internal::output_value_type< I, T >::type output_value_type
Definition: pipeline_pattern.h:132
queue_mode
Definition: mpmc_queue.h:35
constexpr bool supports_map< parallel_execution_omp >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_omp.h:565
Definition: mpmc_queue.h:38
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_omp.h:662
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_omp.h:149
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:124
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_omp.h:82
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_omp.h:137
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_omp.h:97
constexpr bool supports_stencil< parallel_execution_omp >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_omp.h:586
constexpr bool supports_map_reduce< parallel_execution_omp >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_omp.h:579
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:45
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:92
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_omp.h:616
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern comming from another context that uses mpmc_queues as communication channels...
Definition: parallel_execution_omp.h:308
return_type type
Definition: patterns.h:103
typename std::enable_if_t< is_filter< T >, int > requires_filter
Definition: filter_pattern.h:70
Sequential execution policy.
Definition: sequential_execution.h:41
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_omp.h:90
constexpr bool supports_pipeline< parallel_execution_omp >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_omp.h:600
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: sequential_execution.h:543
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of appliying a function on a input type.
Definition: patterns.h:110
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_omp.h:714
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>(()
Definition: parallel_execution_omp.h:112
constexpr bool supports_reduce< parallel_execution_omp >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_omp.h:572
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a trasnformation to multiple sequences leaving the result in another sequence using available...
Definition: parallel_execution_omp.h:604
void pipeline(Generator &&generate_op, Transformers &&...transform_op) const
Invoke Pipeline pattern.
Definition: parallel_execution_omp.h:786
typename std::enable_if_t< is_context< T >, int > requires_context
Definition: common/context.h:95
bool push(T item)
Definition: mpmc_queue.h:128
typename std::enable_if_t< is_farm< T >, int > requires_farm
Definition: farm_pattern.h:89
constexpr bool supports_divide_conquer< parallel_execution_omp >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_omp.h:593