16 #ifndef GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
17 #define GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
20 #include "../common/mpmc_queue.h"
21 #include "../common/iterator.h"
22 #include "../common/execution_traits.h"
28 #include <type_traits>
30 #include <experimental/optional>
70 mutable std::atomic_flag lock_ = ATOMIC_FLAG_INIT;
71 std::vector<std::thread::
id> ids_{};
77 while (lock_.test_and_set(memory_order_acquire)) {}
78 auto this_id = this_thread::get_id();
79 ids_.push_back(this_id);
80 lock_.clear(memory_order_release);
86 while (lock_.test_and_set(memory_order_acquire)) {}
87 auto this_id = this_thread::get_id();
88 auto current = find(begin(ids_), end(ids_), this_id);
90 lock_.clear(memory_order_release);
96 while (lock_.test_and_set(memory_order_acquire)) {}
97 auto this_id = this_thread::get_id();
98 auto current = find(begin(ids_), end(ids_), this_id);
99 auto index = distance(begin(ids_), current);
100 lock_.clear(memory_order_release);
116 : registry_{registry}
150 static_cast<int>(2 * std::thread::hardware_concurrency()),
226 template <
typename T>
228 return {queue_size_, queue_mode_};
239 template <
typename T,
typename ... Transformers>
251 template <
typename T,
typename ... Transformers>
253 return std::move(make_queue<T>());
270 template <
typename ... InputIterators,
typename OutputIterator,
271 typename Transformer>
272 void map(std::tuple<InputIterators...> firsts,
273 OutputIterator first_out,
274 std::size_t sequence_size, Transformer transform_op)
const;
288 template <
typename InputIterator,
typename Identity,
typename Combiner>
289 auto reduce(InputIterator first, std::size_t sequence_size,
290 Identity && identity, Combiner && combine_op)
const;
306 template <
typename ... InputIterators,
typename Identity,
307 typename Transformer,
typename Combiner>
308 auto map_reduce(std::tuple<InputIterators...> firsts,
309 std::size_t sequence_size,
310 Identity && identity,
311 Transformer && transform_op, Combiner && combine_op)
const;
329 template <
typename ... InputIterators,
typename OutputIterator,
330 typename StencilTransformer,
typename Neighbourhood>
331 void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
332 std::size_t sequence_size,
333 StencilTransformer && transform_op,
334 Neighbourhood && neighbour_op)
const;
348 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
349 [[deprecated(
"Use new interface with predicate argument")]]
351 Divider && divide_op,
353 Combiner && combine_op)
const;
369 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
371 Divider && divide_op,
372 Predicate && predicate_op,
374 Combiner && combine_op)
const;
385 template <
typename Generator,
typename ... Transformers>
386 void pipeline(Generator && generate_op,
387 Transformers && ... transform_ops)
const;
399 template <
typename InputType,
typename Transformer,
typename OutputType>
403 do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
408 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
410 Divider && divide_op,
412 Combiner && combine_op,
413 std::atomic<int> & num_threads)
const;
415 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
417 Divider && divide_op,
418 Predicate && predicate_op,
420 Combiner && combine_op,
421 std::atomic<int> & num_threads)
const;
424 template <
typename Queue,
typename Consumer,
426 void do_pipeline(Queue & input_queue, Consumer && consume_op)
const;
428 template <
typename Inqueue,
typename Transformer,
typename output_type,
430 void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
433 template <
typename T,
typename ... Others>
436 template <
typename T>
440 template <
typename Queue,
typename Transformer,
typename ... OtherTransformers,
441 requires_no_pattern<Transformer> = 0>
442 void do_pipeline(Queue & input_queue, Transformer && transform_op,
443 OtherTransformers && ... other_ops)
const;
445 template <
typename Queue,
typename FarmTransformer,
446 template <
typename>
class Farm,
447 requires_farm<Farm<FarmTransformer>> = 0>
448 void do_pipeline(Queue & input_queue,
449 Farm<FarmTransformer> & farm_obj)
const
451 do_pipeline(input_queue, std::move(farm_obj));
454 template <
typename Queue,
typename FarmTransformer,
455 template <
typename>
class Farm,
456 requires_farm<Farm<FarmTransformer>> = 0>
457 void do_pipeline( Queue & input_queue,
458 Farm<FarmTransformer> && farm_obj)
const;
460 template <
typename Queue,
typename Execution,
typename Transformer,
461 template <
typename,
typename>
class Context,
462 typename ... OtherTransformers,
463 requires_context<Context<Execution,Transformer>> = 0>
464 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
465 OtherTransformers &&... other_ops)
const;
467 template <
typename Queue,
typename Execution,
typename Transformer,
468 template <
typename,
typename>
class Context,
469 typename ... OtherTransformers,
470 requires_context<Context<Execution,Transformer>> = 0>
471 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
472 OtherTransformers &&... other_ops)
const
474 do_pipeline(input_queue, std::move(context_op),
475 std::forward<OtherTransformers>(other_ops)...);
478 template <
typename Queue,
typename FarmTransformer,
479 template <
typename>
class Farm,
480 typename ... OtherTransformers,
481 requires_farm<Farm<FarmTransformer>> = 0>
482 void do_pipeline(Queue & input_queue,
483 Farm<FarmTransformer> & farm_obj,
484 OtherTransformers && ... other_transform_ops)
const
486 do_pipeline(input_queue, std::move(farm_obj),
487 std::forward<OtherTransformers>(other_transform_ops)...);
490 template <
typename Queue,
typename FarmTransformer,
491 template <
typename>
class Farm,
492 typename ... OtherTransformers,
493 requires_farm<Farm<FarmTransformer>> = 0>
494 void do_pipeline(Queue & input_queue,
495 Farm<FarmTransformer> && farm_obj,
496 OtherTransformers && ... other_transform_ops)
const;
498 template <
typename Queue,
typename Predicate,
499 template <
typename>
class Filter,
500 typename ... OtherTransformers,
501 requires_filter<Filter<Predicate>> =0>
502 void do_pipeline(Queue & input_queue,
503 Filter<Predicate> & filter_obj,
504 OtherTransformers && ... other_transform_ops)
const
506 do_pipeline(input_queue, std::move(filter_obj),
507 std::forward<OtherTransformers>(other_transform_ops)...);
510 template <
typename Queue,
typename Predicate,
511 template <
typename>
class Filter,
512 typename ... OtherTransformers,
513 requires_filter<Filter<Predicate>> =0>
514 void do_pipeline(Queue & input_queue,
515 Filter<Predicate> && farm_obj,
516 OtherTransformers && ... other_transform_ops)
const;
518 template <
typename Queue,
typename Combiner,
typename Identity,
519 template <
typename C,
typename I>
class Reduce,
520 typename ... OtherTransformers,
521 requires_reduce<Reduce<Combiner,Identity>> = 0>
522 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
523 OtherTransformers && ... other_transform_ops)
const
525 do_pipeline(input_queue, std::move(reduce_obj),
526 std::forward<OtherTransformers>(other_transform_ops)...);
529 template <
typename Queue,
typename Combiner,
typename Identity,
530 template <
typename C,
typename I>
class Reduce,
531 typename ... OtherTransformers,
532 requires_reduce<Reduce<Combiner,Identity>> = 0>
533 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
534 OtherTransformers && ... other_transform_ops)
const;
536 template <
typename Queue,
typename Transformer,
typename Predicate,
537 template <
typename T,
typename P>
class Iteration,
538 typename ... OtherTransformers,
539 requires_iteration<Iteration<Transformer,Predicate>> =0,
540 requires_no_pattern<Transformer> =0>
541 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
542 OtherTransformers && ... other_transform_ops)
const
544 do_pipeline(input_queue, std::move(iteration_obj),
545 std::forward<OtherTransformers>(other_transform_ops)...);
548 template <
typename Queue,
typename Transformer,
typename Predicate,
549 template <
typename T,
typename P>
class Iteration,
550 typename ... OtherTransformers,
551 requires_iteration<Iteration<Transformer,Predicate>> =0,
552 requires_no_pattern<Transformer> =0>
553 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
554 OtherTransformers && ... other_transform_ops)
const;
556 template <
typename Queue,
typename Transformer,
typename Predicate,
557 template <
typename T,
typename P>
class Iteration,
558 typename ... OtherTransformers,
559 requires_iteration<Iteration<Transformer,Predicate>> =0,
560 requires_pipeline<Transformer> =0>
561 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
562 OtherTransformers && ... other_transform_ops)
const;
565 template <
typename Queue,
typename ... Transformers,
566 template <
typename...>
class Pipeline,
568 void do_pipeline(Queue & input_queue,
569 Pipeline<Transformers...> & pipeline_obj)
const
571 do_pipeline(input_queue, std::move(pipeline_obj));
574 template <
typename Queue,
typename ... Transformers,
575 template <
typename...>
class Pipeline,
577 void do_pipeline(Queue & input_queue,
578 Pipeline<Transformers...> && pipeline_obj)
const;
580 template <
typename Queue,
typename ... Transformers,
581 template <
typename...>
class Pipeline,
582 typename ... OtherTransformers,
584 void do_pipeline(Queue & input_queue,
585 Pipeline<Transformers...> & pipeline_obj,
586 OtherTransformers && ... other_transform_ops)
const
588 do_pipeline(input_queue, std::move(pipeline_obj),
589 std::forward<OtherTransformers>(other_transform_ops)...);
592 template <
typename Queue,
typename ... Transformers,
593 template <
typename...>
class Pipeline,
594 typename ... OtherTransformers,
596 void do_pipeline(Queue & input_queue,
597 Pipeline<Transformers...> && pipeline_obj,
598 OtherTransformers && ... other_transform_ops)
const;
600 template <
typename Queue,
typename ... Transformers,
602 void do_pipeline_nested(
604 std::tuple<Transformers...> && transform_ops,
605 std::index_sequence<I...>)
const;
608 mutable thread_registry thread_registry_{};
610 int concurrency_degree_;
613 constexpr
static int default_queue_size = 100;
614 int queue_size_ = default_queue_size;
623 template <
typename E>
625 return std::is_same<E, parallel_execution_native>::value;
677 template <
typename ... InputIterators,
typename OutputIterator,
678 typename Transformer>
680 std::tuple<InputIterators...> firsts,
681 OutputIterator first_out,
682 std::size_t sequence_size, Transformer transform_op)
const
687 [&transform_op](
auto fins, std::size_t size,
auto fout)
689 const auto l = next(get<0>(fins), size);
690 while (get<0>(fins)!=l) {
692 std::forward<Transformer>(transform_op), fins);
696 const int chunk_size = sequence_size / concurrency_degree_;
700 for (
int i=0; i!=concurrency_degree_-1; ++i) {
701 const auto delta = chunk_size * i;
703 const auto chunk_first_out = next(first_out, delta);
704 workers.launch(*
this, process_chunk, chunk_firsts, chunk_size, chunk_first_out);
707 const auto delta = chunk_size * (concurrency_degree_ - 1);
709 const auto chunk_first_out = next(first_out, delta);
710 process_chunk(chunk_firsts, sequence_size - delta, chunk_first_out);
714 template <
typename InputIterator,
typename Identity,
typename Combiner>
716 InputIterator first, std::size_t sequence_size,
717 Identity && identity,
718 Combiner && combine_op)
const
721 std::vector<result_type> partial_results(concurrency_degree_);
724 auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
725 partial_results[id] = seq.
reduce(f,sz, std::forward<Identity>(identity),
726 std::forward<Combiner>(combine_op));
729 const auto chunk_size = sequence_size / concurrency_degree_;
733 for (
int i=0; i<concurrency_degree_-1; ++i) {
734 const auto delta = chunk_size * i;
735 const auto chunk_first = std::next(first,delta);
736 workers.launch(*
this, process_chunk, chunk_first, chunk_size, i);
739 const auto delta = chunk_size * (concurrency_degree_-1);
740 const auto chunk_first = std::next(first, delta);
741 const auto chunk_sz = sequence_size - delta;
742 process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
745 return seq.reduce(std::next(partial_results.begin()),
746 partial_results.size()-1, std::forward<result_type>(partial_results[0]),
747 std::forward<Combiner>(combine_op));
750 template <
typename ... InputIterators,
typename Identity,
751 typename Transformer,
typename Combiner>
753 std::tuple<InputIterators...> firsts,
754 std::size_t sequence_size,
756 Transformer && transform_op, Combiner && combine_op)
const
759 std::vector<result_type> partial_results(concurrency_degree_);
762 auto process_chunk = [&](
auto f, std::size_t sz, std::size_t id) {
764 std::forward<Identity>(partial_results[
id]),
765 std::forward<Transformer>(transform_op),
766 std::forward<Combiner>(combine_op));
769 const auto chunk_size = sequence_size / concurrency_degree_;
773 for(
int i=0;i<concurrency_degree_-1;++i){
774 const auto delta = chunk_size * i;
776 workers.launch(*
this, process_chunk, chunk_firsts, chunk_size, i);
779 const auto delta = chunk_size * (concurrency_degree_-1);
781 process_chunk(chunk_firsts, sequence_size - delta, concurrency_degree_-1);
784 return seq.reduce(std::next(partial_results.begin()),
785 partial_results.size()-1, std::forward<result_type>(partial_results[0]),
786 std::forward<Combiner>(combine_op));
789 template <
typename ... InputIterators,
typename OutputIterator,
790 typename StencilTransformer,
typename Neighbourhood>
792 std::tuple<InputIterators...> firsts, OutputIterator first_out,
793 std::size_t sequence_size,
794 StencilTransformer && transform_op,
795 Neighbourhood && neighbour_op)
const
799 [&transform_op, &neighbour_op,seq](
auto fins, std::size_t sz,
auto fout)
802 std::forward<StencilTransformer>(transform_op),
803 std::forward<Neighbourhood>(neighbour_op));
806 const auto chunk_size = sequence_size / concurrency_degree_;
810 for (
int i=0; i!=concurrency_degree_-1; ++i) {
811 const auto delta = chunk_size * i;
813 const auto chunk_out = std::next(first_out,delta);
814 workers.launch(*
this, process_chunk, chunk_firsts, chunk_size, chunk_out);
817 const auto delta = chunk_size * (concurrency_degree_ - 1);
819 const auto chunk_out = std::next(first_out,delta);
820 process_chunk(chunk_firsts, sequence_size - delta, chunk_out);
824 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
827 Divider && divide_op,
829 Combiner && combine_op)
const
831 std::atomic<int> num_threads{concurrency_degree_-1};
833 return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
834 std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
839 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
842 Divider && divide_op,
843 Predicate && predicate_op,
845 Combiner && combine_op)
const
847 std::atomic<int> num_threads{concurrency_degree_-1};
849 return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
850 std::forward<Predicate>(predicate_op),
851 std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
855 template <
typename Generator,
typename ... Transformers>
857 Generator && generate_op,
858 Transformers && ... transform_ops)
const
861 using result_type = decay_t<
typename result_of<Generator()>::type>;
862 using output_type = pair<result_type,long>;
863 auto output_queue = make_queue<output_type>();
865 thread generator_task([&,
this]() {
870 auto item{generate_op()};
871 output_queue.push(make_pair(item, order));
877 do_pipeline(output_queue, forward<Transformers>(transform_ops)...);
878 generator_task.join();
883 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
886 Divider && divide_op,
888 Combiner && combine_op,
889 std::atomic<int> & num_threads)
const
892 if (num_threads.load() <=0) {
894 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
895 std::forward<Combiner>(combine_op));
898 auto subproblems = divide_op(std::forward<Input>(input));
899 if (subproblems.size()<=1) {
return solve_op(std::forward<Input>(input)); }
901 using subresult_type =
902 std::decay_t<
typename std::result_of<Solver(Input)>::type>;
903 std::vector<subresult_type> partials(subproblems.size()-1);
905 auto process_subproblem = [&,
this](
auto it, std::size_t div) {
907 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
908 std::forward<Combiner>(combine_op), num_threads);
913 worker_pool workers{num_threads.load()};
914 auto i = subproblems.begin() + 1;
915 while (i!=subproblems.end() && num_threads.load()>0) {
916 workers.launch(*
this,process_subproblem, i++, division++);
920 while (i!=subproblems.end()) {
921 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
922 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
923 std::forward<Combiner>(combine_op));
926 auto subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
927 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
928 std::forward<Combiner>(combine_op), num_threads);
932 return seq.reduce(partials.begin(), partials.size(),
933 std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
936 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
939 Divider && divide_op,
940 Predicate && predicate_op,
942 Combiner && combine_op,
943 std::atomic<int> & num_threads)
const
945 constexpr sequential_execution seq;
946 if (num_threads.load() <=0) {
947 return seq.divide_conquer(std::forward<Input>(input),
948 std::forward<Divider>(divide_op),
949 std::forward<Predicate>(predicate_op),
950 std::forward<Solver>(solve_op),
951 std::forward<Combiner>(combine_op));
954 if (predicate_op(input)) {
return solve_op(std::forward<Input>(input)); }
955 auto subproblems = divide_op(std::forward<Input>(input));
957 using subresult_type =
958 std::decay_t<
typename std::result_of<Solver(Input)>::type>;
959 std::vector<subresult_type> partials(subproblems.size()-1);
961 auto process_subproblem = [&,
this](
auto it, std::size_t div) {
963 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
964 std::forward<Solver>(solve_op),
965 std::forward<Combiner>(combine_op), num_threads);
970 worker_pool workers{num_threads.load()};
971 auto i = subproblems.begin() + 1;
972 while (i!=subproblems.end() && num_threads.load()>0) {
973 workers.launch(*
this,process_subproblem, i++, division++);
977 while (i!=subproblems.end()) {
978 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
979 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
980 std::forward<Combiner>(combine_op));
983 auto subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
984 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
985 std::forward<Combiner>(combine_op), num_threads);
989 return seq.reduce(partials.begin(), partials.size(),
990 std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
992 template <
typename Queue,
typename Consumer,
993 requires_no_pattern<Consumer>>
994 void parallel_execution_native::do_pipeline(
996 Consumer && consume_op)
const
999 using gen_value_type =
typename Queue::value_type;
1004 auto item = input_queue.pop();
1005 if (!item.first)
break;
1006 consume_op(*item.first);
1010 vector<gen_value_type> elements;
1013 auto item = input_queue.pop();
1014 if (!item.first)
break;
1015 if(current == item.second){
1016 consume_op(*item.first);
1020 elements.push_back(item);
1022 auto it = find_if(elements.begin(), elements.end(),
1023 [&](
auto x) { return x.second== current; });
1024 if(it != elements.end()){
1025 consume_op(*it->first);
1030 while (elements.size()>0) {
1031 auto it = find_if(elements.begin(), elements.end(),
1032 [&](
auto x) { return x.second== current; });
1033 if(it != elements.end()){
1034 consume_op(*it->first);
1042 template <
typename Inqueue,
typename Transformer,
typename output_type,
1043 requires_no_pattern<Transformer>>
1044 void parallel_execution_native::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1045 mpmc_queue<output_type> & output_queue)
const
1047 using namespace std;
1048 using namespace experimental;
1050 using output_item_value_type =
typename output_type::first_type::value_type;
1052 auto item{input_queue.pop()};
1053 if(!item.first)
break;
1054 auto out = output_item_value_type{transform_op(*item.first)};
1055 output_queue.push(make_pair(out,item.second)) ;
1061 template <
typename Queue,
typename Transformer,
1062 typename ... OtherTransformers,
1063 requires_no_pattern<Transformer>>
1064 void parallel_execution_native::do_pipeline(
1065 Queue & input_queue,
1066 Transformer && transform_op,
1067 OtherTransformers && ... other_transform_ops)
const
1069 using namespace std;
1070 using namespace experimental;
1072 using input_item_type =
typename Queue::value_type;
1073 using input_item_value_type =
typename input_item_type::first_type::value_type;
1074 using transform_result_type =
1075 decay_t<
typename result_of<Transformer(input_item_value_type)>::type>;
1076 using output_item_value_type = optional<transform_result_type>;
1077 using output_item_type = pair<output_item_value_type,long>;
1079 decltype(
auto) output_queue =
1082 thread task([&,this]() {
1086 auto item{input_queue.pop()};
1087 if (!item.first)
break;
1088 auto out = output_item_value_type{transform_op(*item.first)};
1089 output_queue.push(make_pair(out, item.second));
1091 output_queue.push(make_pair(output_item_value_type{},-1));
1094 do_pipeline(output_queue,
1095 forward<OtherTransformers>(other_transform_ops)...);
1099 template <
typename Queue,
typename FarmTransformer,
1100 template <
typename>
class Farm,
1101 requires_farm<Farm<FarmTransformer>>>
1102 void parallel_execution_native::do_pipeline(
1103 Queue & input_queue,
1104 Farm<FarmTransformer> && farm_obj)
const
1106 using namespace std;
1108 auto farm_task = [&](int) {
1109 auto item{input_queue.pop()};
1110 while (item.first) {
1111 farm_obj(*item.first);
1112 item = input_queue.pop();
1114 input_queue.push(item);
1117 auto ntasks = farm_obj.cardinality();
1118 worker_pool workers{ntasks};
1119 workers.launch_tasks(*
this, farm_task, ntasks);
1124 template <
typename Queue,
typename Execution,
typename Transformer,
1125 template <
typename,
typename>
class Context,
1126 typename ... OtherTransformers,
1127 requires_context<Context<Execution,Transformer>>>
1128 void parallel_execution_native::do_pipeline(Queue & input_queue,
1129 Context<Execution,Transformer> && context_op,
1130 OtherTransformers &&... other_ops)
const
1132 using namespace std;
1133 using namespace experimental;
1135 using input_item_type =
typename Queue::value_type;
1136 using input_item_value_type =
typename input_item_type::first_type::value_type;
1139 using output_optional_type = experimental::optional<output_type>;
1140 using output_item_type = pair <output_optional_type, long> ;
1142 decltype(
auto) output_queue =
1145 auto context_task = [&]() {
1146 context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1147 output_queue.push( make_pair(output_optional_type{}, -1) );
1150 worker_pool workers{1};
1151 workers.launch_tasks(*
this, context_task);
1153 do_pipeline(output_queue,
1154 forward<OtherTransformers>(other_ops)... );
1160 template <
typename Queue,
typename FarmTransformer,
1161 template <
typename>
class Farm,
1162 typename ... OtherTransformers,
1163 requires_farm<Farm<FarmTransformer>>>
1164 void parallel_execution_native::do_pipeline(
1165 Queue & input_queue,
1166 Farm<FarmTransformer> && farm_obj,
1167 OtherTransformers && ... other_transform_ops)
const
1169 using namespace std;
1170 using namespace experimental;
1172 using input_item_type =
typename Queue::value_type;
1173 using input_item_value_type =
typename input_item_type::first_type::value_type;
1176 using output_optional_type = experimental::optional<output_type>;
1177 using output_item_type = pair <output_optional_type, long> ;
1179 decltype(
auto) output_queue =
1182 atomic<
int> done_threads{0};
1184 auto farm_task = [&](
int nt) {
1185 do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1187 if (done_threads == nt) {
1188 output_queue.push(make_pair(output_optional_type{}, -1));
1190 input_queue.push(input_item_type{});
1194 auto ntasks = farm_obj.cardinality();
1195 worker_pool workers{ntasks};
1196 workers.launch_tasks(*
this, farm_task, ntasks);
1197 do_pipeline(output_queue,
1198 forward<OtherTransformers>(other_transform_ops)... );
1203 template <
typename Queue,
typename Predicate,
1204 template <
typename>
class Filter,
1205 typename ... OtherTransformers,
1206 requires_filter<Filter<Predicate>>>
1207 void parallel_execution_native::do_pipeline(
1208 Queue & input_queue,
1209 Filter<Predicate> && filter_obj,
1210 OtherTransformers && ... other_transform_ops)
const
1212 using namespace std;
1213 using namespace experimental;
1215 using input_item_type =
typename Queue::value_type;
1216 using input_value_type =
typename input_item_type::first_type;
1217 auto filter_queue = make_queue<input_item_type>();
1219 auto filter_task = [&,
this]() {
1221 auto item{input_queue.pop()};
1222 while (item.first) {
1223 if (filter_obj(*item.first)) {
1224 filter_queue.push(item);
1227 filter_queue.push(make_pair(input_value_type{}, item.second));
1229 item = input_queue.pop();
1231 filter_queue.push(make_pair(input_value_type{}, -1));
1233 thread filter_thread{filter_task};
1235 decltype(
auto) output_queue =
1238 thread ordering_thread;
1240 auto ordering_task = [&]() {
1242 vector<input_item_type> elements;
1245 auto item{filter_queue.pop()};
1247 if(!item.first && item.second == -1)
break;
1248 if (item.second == current) {
1250 output_queue.push(make_pair(item.first,order));
1256 elements.push_back(item);
1258 auto it = find_if(elements.begin(), elements.end(),
1259 [&](
auto x) { return x.second== current; });
1260 if(it != elements.end()){
1262 output_queue.push(make_pair(it->first,order));
1268 item = filter_queue.pop();
1270 while (elements.size()>0) {
1271 auto it = find_if(elements.begin(), elements.end(),
1272 [&](
auto x) { return x.second== current; });
1273 if(it != elements.end()){
1275 output_queue.push(make_pair(it->first,order));
1282 output_queue.push(item);
1285 ordering_thread = thread{ordering_task};
1286 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1287 filter_thread.join();
1288 ordering_thread.join();
1291 do_pipeline(filter_queue, forward<OtherTransformers>(other_transform_ops)...);
1292 filter_thread.join();
1296 template <
typename Queue,
typename Combiner,
typename Identity,
1297 template <
typename C,
typename I>
class Reduce,
1298 typename ... OtherTransformers,
1299 requires_reduce<Reduce<Combiner,Identity>>>
1300 void parallel_execution_native::do_pipeline(
1301 Queue && input_queue,
1302 Reduce<Combiner,Identity> && reduce_obj,
1303 OtherTransformers && ... other_transform_ops)
const
1305 using namespace std;
1306 using namespace experimental;
1308 using output_item_value_type = optional<decay_t<Identity>>;
1309 using output_item_type = pair<output_item_value_type,long>;
1310 decltype(
auto) output_queue =
1313 auto reduce_task = [&,this]() {
1315 auto item{input_queue.pop()};
1317 while (item.first) {
1318 reduce_obj.add_item(std::forward<Identity>(*item.first));
1319 item = input_queue.pop();
1320 if (reduce_obj.reduction_needed()) {
1321 constexpr sequential_execution seq;
1322 auto red = reduce_obj.reduce_window(seq);
1323 output_queue.push(make_pair(red, order++));
1326 output_queue.push(make_pair(output_item_value_type{}, -1));
1328 thread reduce_thread{reduce_task};
1329 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1330 reduce_thread.join();
1333 template <
typename Queue,
typename Transformer,
typename Predicate,
1334 template <
typename T,
typename P>
class Iteration,
1335 typename ... OtherTransformers,
1336 requires_iteration<Iteration<Transformer,Predicate>>,
1337 requires_no_pattern<Transformer>>
1338 void parallel_execution_native::do_pipeline(
1339 Queue & input_queue,
1340 Iteration<Transformer,Predicate> && iteration_obj,
1341 OtherTransformers && ... other_transform_ops)
const
1343 using namespace std;
1344 using namespace experimental;
1346 using input_item_type =
typename decay_t<Queue>::value_type;
1348 decltype(
auto) output_queue =
1351 auto iteration_task = [&]() {
1353 auto item = input_queue.pop();
1354 if (!item.first)
break;
1355 auto value = iteration_obj.transform(*item.first);
1356 auto new_item = input_item_type{value,item.second};
1357 if (iteration_obj.predicate(value)) {
1358 output_queue.push(new_item);
1361 input_queue.push(new_item);
1364 while (!input_queue.is_empty()) {
1365 auto item = input_queue.pop();
1366 auto value = iteration_obj.transform(*item.first);
1367 auto new_item = input_item_type{value,item.second};
1368 if (iteration_obj.predicate(value)) {
1369 output_queue.push(new_item);
1372 input_queue.push(new_item);
1375 output_queue.push(input_item_type{{},-1});
1378 thread iteration_thread{iteration_task};
1379 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1380 iteration_thread.join();
1383 template <
typename Queue,
typename Transformer,
typename Predicate,
1384 template <
typename T,
typename P>
class Iteration,
1385 typename ... OtherTransformers,
1386 requires_iteration<Iteration<Transformer,Predicate>>,
1387 requires_pipeline<Transformer>>
1388 void parallel_execution_native::do_pipeline(
1390 Iteration<Transformer,Predicate> &&,
1391 OtherTransformers && ...)
const
1393 static_assert(!is_pipeline<Transformer>,
"Not implemented");
1397 template <
typename Queue,
typename ... Transformers,
1398 template <
typename...>
class Pipeline,
1400 void parallel_execution_native::do_pipeline(
1401 Queue & input_queue,
1402 Pipeline<Transformers...> && pipeline_obj)
const
1406 pipeline_obj.transformers(),
1407 std::make_index_sequence<
sizeof...(Transformers)>());
1410 template <
typename Queue,
typename ... Transformers,
1411 template <
typename...>
class Pipeline,
1412 typename ... OtherTransformers,
1414 void parallel_execution_native::do_pipeline(
1415 Queue & input_queue,
1416 Pipeline<Transformers...> && pipeline_obj,
1417 OtherTransformers && ... other_transform_ops)
const
1421 std::tuple_cat(pipeline_obj.transformers(),
1422 std::forward_as_tuple(other_transform_ops...)),
1423 std::make_index_sequence<
sizeof...(Transformers)+
sizeof...(OtherTransformers)>());
1426 template <
typename Queue,
typename ... Transformers,
1428 void parallel_execution_native::do_pipeline_nested(
1429 Queue & input_queue,
1430 std::tuple<Transformers...> && transform_ops,
1431 std::index_sequence<I...>)
const
1433 do_pipeline(input_queue,
1434 std::forward<Transformers>(std::get<I>(transform_ops))...);
1437 template<
typename T,
typename... Others>
Definition: mpmc_queue.h:33
RAII class to manage registration/deregistration pairs. This class allows to manage automatic deregis...
Definition: parallel_execution_native.h:110
~native_thread_manager()
Deregisters current thread from the registry.
Definition: parallel_execution_native.h:122
native_thread_manager(thread_registry ®istry)
Saves a reference to the registry and registers current thread.
Definition: parallel_execution_native.h:115
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:135
void pipeline(Generator &&generate_op, Transformers &&... transform_ops) const
Invoke Pipeline pattern.
Definition: parallel_execution_native.h:856
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:227
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_native.h:174
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_native.h:752
parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept
Constructs a native parallel execution policy.
Definition: parallel_execution_native.h:162
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>()
Definition: parallel_execution_native.h:215
parallel_execution_native(const parallel_execution_native &ex)
Definition: parallel_execution_native.h:167
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_native.h:189
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: parallel_execution_native.h:825
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_native.h:194
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a transformation to multiple sequences leaving the result in another sequence by chunks accor...
Definition: parallel_execution_native.h:679
native_thread_manager thread_manager() const
Get a manager object for registration/deregistration in the thread index table for current thread.
Definition: parallel_execution_native.h:200
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_native.h:208
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_native.h:252
int concurrency_degree() const noexcept
Get number of grppi threads.
Definition: parallel_execution_native.h:179
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern coming from another context that uses mpmc_queues as communication channels.
Definition: parallel_execution_native.h:400
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_native.h:715
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_native.h:184
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_native.h:791
parallel_execution_native() noexcept
Default construct a native parallel execution policy.
Definition: parallel_execution_native.h:148
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_native.h:240
Sequential execution policy.
Definition: sequential_execution.h:36
constexpr auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: sequential_execution.h:464
constexpr void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:497
constexpr auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: sequential_execution.h:480
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: sequential_execution.h:538
Thread index table to provide portable natural thread indices.
Definition: parallel_execution_native.h:48
thread_registry()=default
void register_thread() noexcept
Adds the current thread id in the registry.
Definition: parallel_execution_native.h:74
int current_index() const noexcept
Integer index for current thread.
Definition: parallel_execution_native.h:93
void deregister_thread() noexcept
Removes current thread id from the registry.
Definition: parallel_execution_native.h:83
Pool of worker threads. This class offers a simple pool of worker threads.
Definition: worker_pool.h:29
Definition: callable_traits.h:21
constexpr bool supports_reduce< parallel_execution_native >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_native.h:647
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
constexpr bool supports_map_reduce< parallel_execution_native >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_native.h:654
constexpr bool is_parallel_execution_native()
Metafunction that determines if type E is parallel_execution_native.
Definition: parallel_execution_native.h:624
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of applying a function on a input type.
Definition: patterns.h:105
constexpr bool supports_stencil< parallel_execution_native >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_native.h:661
constexpr bool supports_pipeline< parallel_execution_native >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_native.h:675
constexpr bool supports_divide_conquer< parallel_execution_native >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_native.h:668
queue_mode
Definition: mpmc_queue.h:30
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:170
constexpr bool is_supported< parallel_execution_native >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_native.h:633
constexpr bool supports_map< parallel_execution_native >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_native.h:640
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:87
decltype(auto) apply_deref_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the values obtained from the iterators in a tuple-like object....
Definition: iterator.h:58
return_type type
Definition: patterns.h:98