21 #ifndef GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H 22 #define GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H 25 #include "../common/mpmc_queue.h" 26 #include "../common/iterator.h" 27 #include "../common/execution_traits.h" 33 #include <type_traits> 35 #include <experimental/optional> 75 mutable std::atomic_flag lock_ = ATOMIC_FLAG_INIT;
76 std::vector<std::thread::id> ids_;
82 while (lock_.test_and_set(memory_order_acquire)) {}
83 auto this_id = this_thread::get_id();
84 ids_.push_back(this_id);
85 lock_.clear(memory_order_release);
91 while (lock_.test_and_set(memory_order_acquire)) {}
92 auto this_id = this_thread::get_id();
93 auto current = find(begin(ids_), end(ids_), this_id);
95 lock_.clear(memory_order_release);
101 while (lock_.test_and_set(memory_order_acquire)) {}
102 auto this_id = this_thread::get_id();
103 auto current = find(begin(ids_), end(ids_), this_id);
104 auto index = distance(begin(ids_), current);
105 lock_.clear(memory_order_release);
121 : registry_{registry}
128 registry_.deregister_thread();
155 static_cast<int>(2 * std::thread::hardware_concurrency()),
168 concurrency_degree_{concurrency_degree},
214 return thread_registry_.current_index();
231 template <
typename T>
233 return {queue_size_, queue_mode_};
244 template <
typename T,
typename ... Transformers>
256 template <
typename T,
typename ... Transformers>
258 return std::move(make_queue<T>());
275 template <
typename ... InputIterators,
typename OutputIterator,
276 typename Transformer>
277 void map(std::tuple<InputIterators...> firsts,
278 OutputIterator first_out,
279 std::size_t sequence_size, Transformer transform_op)
const;
293 template <
typename InputIterator,
typename Identity,
typename Combiner>
294 auto reduce(InputIterator first, std::size_t sequence_size,
295 Identity && identity, Combiner && combine_op)
const;
311 template <
typename ... InputIterators,
typename Identity,
312 typename Transformer,
typename Combiner>
313 auto map_reduce(std::tuple<InputIterators...> firsts,
314 std::size_t sequence_size,
315 Identity && identity,
316 Transformer && transform_op, Combiner && combine_op)
const;
334 template <
typename ... InputIterators,
typename OutputIterator,
335 typename StencilTransformer,
typename Neighbourhood>
336 void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
337 std::size_t sequence_size,
338 StencilTransformer && transform_op,
339 Neighbourhood && neighbour_op)
const;
353 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
354 [[deprecated(
"Use new interface with predicate argument")]]
356 Divider && divide_op,
358 Combiner && combine_op)
const;
374 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
376 Divider && divide_op,
377 Predicate && predicate_op,
379 Combiner && combine_op)
const;
390 template <
typename Generator,
typename ... Transformers>
391 void pipeline(Generator && generate_op,
392 Transformers && ... transform_ops)
const;
404 template <
typename InputType,
typename Transformer,
typename OutputType>
408 do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
413 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
415 Divider && divide_op,
417 Combiner && combine_op,
418 std::atomic<int> & num_threads)
const;
420 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
422 Divider && divide_op,
423 Predicate && predicate_op,
425 Combiner && combine_op,
426 std::atomic<int> & num_threads)
const;
429 template <
typename Queue,
typename Consumer,
431 void do_pipeline(Queue & input_queue, Consumer && consume_op)
const;
433 template <
typename Inqueue,
typename Transformer,
typename output_type,
435 void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
438 template <
typename T,
typename ... Others>
442 template <
typename T>
446 template <
typename Queue,
typename Transformer,
typename ... OtherTransformers,
447 requires_no_pattern<Transformer> = 0>
448 void do_pipeline(Queue & input_queue, Transformer && transform_op,
449 OtherTransformers && ... other_ops)
const;
451 template <
typename Queue,
typename FarmTransformer,
452 template <
typename>
class Farm,
454 void do_pipeline(Queue & input_queue,
455 Farm<FarmTransformer> & farm_obj)
const 457 do_pipeline(input_queue, std::move(farm_obj));
460 template <
typename Queue,
typename FarmTransformer,
461 template <
typename>
class Farm,
462 requires_farm<Farm<FarmTransformer>> = 0>
463 void do_pipeline( Queue & input_queue,
464 Farm<FarmTransformer> && farm_obj)
const;
466 template <
typename Queue,
typename Execution,
typename Transformer,
467 template <
typename,
typename>
class Context,
468 typename ... OtherTransformers,
470 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
471 OtherTransformers &&... other_ops)
const;
473 template <
typename Queue,
typename Execution,
typename Transformer,
474 template <
typename,
typename>
class Context,
475 typename ... OtherTransformers,
476 requires_context<Context<Execution,Transformer>> = 0>
477 void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
478 OtherTransformers &&... other_ops)
const 480 do_pipeline(input_queue, std::move(context_op),
481 std::forward<OtherTransformers>(other_ops)...);
484 template <
typename Queue,
typename FarmTransformer,
485 template <
typename>
class Farm,
486 typename ... OtherTransformers,
487 requires_farm<Farm<FarmTransformer>> = 0>
488 void do_pipeline(Queue & input_queue,
489 Farm<FarmTransformer> & farm_obj,
490 OtherTransformers && ... other_transform_ops)
const 492 do_pipeline(input_queue, std::move(farm_obj),
493 std::forward<OtherTransformers>(other_transform_ops)...);
496 template <
typename Queue,
typename FarmTransformer,
497 template <
typename>
class Farm,
498 typename ... OtherTransformers,
499 requires_farm<Farm<FarmTransformer>> = 0>
500 void do_pipeline(Queue & input_queue,
501 Farm<FarmTransformer> && farm_obj,
502 OtherTransformers && ... other_transform_ops)
const;
504 template <
typename Queue,
typename Predicate,
505 template <
typename>
class Filter,
506 typename ... OtherTransformers,
508 void do_pipeline(Queue & input_queue,
509 Filter<Predicate> & filter_obj,
510 OtherTransformers && ... other_transform_ops)
const 512 do_pipeline(input_queue, std::move(filter_obj),
513 std::forward<OtherTransformers>(other_transform_ops)...);
516 template <
typename Queue,
typename Predicate,
517 template <
typename>
class Filter,
518 typename ... OtherTransformers,
519 requires_filter<Filter<Predicate>> =0>
520 void do_pipeline(Queue & input_queue,
521 Filter<Predicate> && farm_obj,
522 OtherTransformers && ... other_transform_ops)
const;
524 template <
typename Queue,
typename Combiner,
typename Identity,
525 template <
typename C,
typename I>
class Reduce,
526 typename ... OtherTransformers,
528 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
529 OtherTransformers && ... other_transform_ops)
const 531 do_pipeline(input_queue, std::move(reduce_obj),
532 std::forward<OtherTransformers>(other_transform_ops)...);
535 template <
typename Queue,
typename Combiner,
typename Identity,
536 template <
typename C,
typename I>
class Reduce,
537 typename ... OtherTransformers,
538 requires_reduce<Reduce<Combiner,Identity>> = 0>
539 void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
540 OtherTransformers && ... other_transform_ops)
const;
542 template <
typename Queue,
typename Transformer,
typename Predicate,
543 template <
typename T,
typename P>
class Iteration,
544 typename ... OtherTransformers,
546 requires_no_pattern<Transformer> =0>
547 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
548 OtherTransformers && ... other_transform_ops)
const 550 do_pipeline(input_queue, std::move(iteration_obj),
551 std::forward<OtherTransformers>(other_transform_ops)...);
554 template <
typename Queue,
typename Transformer,
typename Predicate,
555 template <
typename T,
typename P>
class Iteration,
556 typename ... OtherTransformers,
557 requires_iteration<Iteration<Transformer,Predicate>> =0,
558 requires_no_pattern<Transformer> =0>
559 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
560 OtherTransformers && ... other_transform_ops)
const;
562 template <
typename Queue,
typename Transformer,
typename Predicate,
563 template <
typename T,
typename P>
class Iteration,
564 typename ... OtherTransformers,
565 requires_iteration<Iteration<Transformer,Predicate>> =0,
567 void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
568 OtherTransformers && ... other_transform_ops)
const;
571 template <
typename Queue,
typename ... Transformers,
572 template <
typename...>
class Pipeline,
574 void do_pipeline(Queue & input_queue,
575 Pipeline<Transformers...> & pipeline_obj)
const 577 do_pipeline(input_queue, std::move(pipeline_obj));
580 template <
typename Queue,
typename ... Transformers,
581 template <
typename...>
class Pipeline,
583 void do_pipeline(Queue & input_queue,
584 Pipeline<Transformers...> && pipeline_obj)
const;
586 template <
typename Queue,
typename ... Transformers,
587 template <
typename...>
class Pipeline,
588 typename ... OtherTransformers,
590 void do_pipeline(Queue & input_queue,
591 Pipeline<Transformers...> & pipeline_obj,
592 OtherTransformers && ... other_transform_ops)
const 594 do_pipeline(input_queue, std::move(pipeline_obj),
595 std::forward<OtherTransformers>(other_transform_ops)...);
598 template <
typename Queue,
typename ... Transformers,
599 template <
typename...>
class Pipeline,
600 typename ... OtherTransformers,
602 void do_pipeline(Queue & input_queue,
603 Pipeline<Transformers...> && pipeline_obj,
604 OtherTransformers && ... other_transform_ops)
const;
606 template <
typename Queue,
typename ... Transformers,
608 void do_pipeline_nested(
610 std::tuple<Transformers...> && transform_ops,
611 std::index_sequence<I...>)
const;
616 int concurrency_degree_;
619 constexpr
static int default_queue_size = 100;
620 int queue_size_ = default_queue_size;
629 template <
typename E>
631 return std::is_same<E, parallel_execution_native>::value;
683 template <
typename ... InputIterators,
typename OutputIterator,
684 typename Transformer>
686 std::tuple<InputIterators...> firsts,
687 OutputIterator first_out,
688 std::size_t sequence_size, Transformer transform_op)
const 693 [&transform_op](
auto fins, std::size_t size,
auto fout)
695 const auto l = next(get<0>(fins), size);
696 while (get<0>(fins)!=l) {
698 std::forward<Transformer>(transform_op), fins);
702 const int chunk_size = sequence_size / concurrency_degree_;
706 for (
int i=0; i!=concurrency_degree_-1; ++i) {
707 const auto delta = chunk_size * i;
709 const auto chunk_first_out = next(first_out, delta);
710 workers.launch(*
this, process_chunk, chunk_firsts, chunk_size, chunk_first_out);
713 const auto delta = chunk_size * (concurrency_degree_ - 1);
715 const auto chunk_first_out = next(first_out, delta);
716 process_chunk(chunk_firsts, sequence_size - delta, chunk_first_out);
720 template <
typename InputIterator,
typename Identity,
typename Combiner>
722 InputIterator first, std::size_t sequence_size,
723 Identity && identity,
724 Combiner && combine_op)
const 727 std::vector<result_type> partial_results(concurrency_degree_);
730 auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
731 partial_results[id] = seq.reduce(f,sz, std::forward<Identity>(identity),
732 std::forward<Combiner>(combine_op));
735 const auto chunk_size = sequence_size / concurrency_degree_;
739 for (
int i=0; i<concurrency_degree_-1; ++i) {
740 const auto delta = chunk_size * i;
741 const auto chunk_first = std::next(first,delta);
742 workers.launch(*
this, process_chunk, chunk_first, chunk_size, i);
745 const auto delta = chunk_size * (concurrency_degree_-1);
746 const auto chunk_first = std::next(first, delta);
747 const auto chunk_sz = sequence_size - delta;
748 process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
751 return seq.reduce(std::next(partial_results.begin()),
752 partial_results.size()-1, std::forward<result_type>(partial_results[0]),
753 std::forward<Combiner>(combine_op));
756 template <
typename ... InputIterators,
typename Identity,
757 typename Transformer,
typename Combiner>
759 std::tuple<InputIterators...> firsts,
760 std::size_t sequence_size,
761 Identity && identity,
762 Transformer && transform_op, Combiner && combine_op)
const 765 std::vector<result_type> partial_results(concurrency_degree_);
768 auto process_chunk = [&](
auto f, std::size_t sz, std::size_t id) {
769 partial_results[id] = seq.map_reduce(f, sz,
770 std::forward<Identity>(partial_results[
id]),
771 std::forward<Transformer>(transform_op),
772 std::forward<Combiner>(combine_op));
775 const auto chunk_size = sequence_size / concurrency_degree_;
779 for(
int i=0;i<concurrency_degree_-1;++i){
780 const auto delta = chunk_size * i;
782 workers.launch(*
this, process_chunk, chunk_firsts, chunk_size, i);
785 const auto delta = chunk_size * (concurrency_degree_-1);
787 process_chunk(chunk_firsts, sequence_size - delta, concurrency_degree_-1);
790 return seq.reduce(std::next(partial_results.begin()),
791 partial_results.size()-1, std::forward<result_type>(partial_results[0]),
792 std::forward<Combiner>(combine_op));
795 template <
typename ... InputIterators,
typename OutputIterator,
796 typename StencilTransformer,
typename Neighbourhood>
798 std::tuple<InputIterators...> firsts, OutputIterator first_out,
799 std::size_t sequence_size,
800 StencilTransformer && transform_op,
801 Neighbourhood && neighbour_op)
const 805 [&transform_op, &neighbour_op,seq](
auto fins, std::size_t sz,
auto fout)
808 std::forward<StencilTransformer>(transform_op),
809 std::forward<Neighbourhood>(neighbour_op));
812 const auto chunk_size = sequence_size / concurrency_degree_;
816 for (
int i=0; i!=concurrency_degree_-1; ++i) {
817 const auto delta = chunk_size * i;
819 const auto chunk_out = std::next(first_out,delta);
820 workers.launch(*
this, process_chunk, chunk_firsts, chunk_size, chunk_out);
823 const auto delta = chunk_size * (concurrency_degree_ - 1);
825 const auto chunk_out = std::next(first_out,delta);
826 process_chunk(chunk_firsts, sequence_size - delta, chunk_out);
830 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
833 Divider && divide_op,
835 Combiner && combine_op)
const 837 std::atomic<int> num_threads{concurrency_degree_-1};
839 return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
840 std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
845 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
848 Divider && divide_op,
849 Predicate && predicate_op,
851 Combiner && combine_op)
const 853 std::atomic<int> num_threads{concurrency_degree_-1};
855 return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
856 std::forward<Predicate>(predicate_op),
857 std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
861 template <
typename Generator,
typename ... Transformers>
863 Generator && generate_op,
864 Transformers && ... transform_ops)
const 867 using result_type = decay_t<typename result_of<Generator()>::type>;
868 using output_type = pair<result_type,long>;
869 auto output_queue = make_queue<output_type>();
871 thread generator_task([&,
this]() {
872 auto manager = thread_manager();
876 auto item{generate_op()};
877 output_queue.push(make_pair(item, order));
883 do_pipeline(output_queue, forward<Transformers>(transform_ops)...);
884 generator_task.join();
889 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
892 Divider && divide_op,
894 Combiner && combine_op,
895 std::atomic<int> & num_threads)
const 898 if (num_threads.load() <=0) {
900 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
901 std::forward<Combiner>(combine_op));
904 auto subproblems = divide_op(std::forward<Input>(input));
905 if (subproblems.size()<=1) {
return solve_op(std::forward<Input>(input)); }
907 using subresult_type =
908 std::decay_t<typename std::result_of<Solver(Input)>::type>;
909 std::vector<subresult_type> partials(subproblems.size()-1);
911 auto process_subproblem = [&,
this](
auto it, std::size_t div) {
913 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
914 std::forward<Combiner>(combine_op), num_threads);
920 auto i = subproblems.begin() + 1;
921 while (i!=subproblems.end() && num_threads.load()>0) {
922 workers.
launch(*
this,process_subproblem, i++, division++);
926 while (i!=subproblems.end()) {
927 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
928 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
929 std::forward<Combiner>(combine_op));
932 auto subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
933 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
934 std::forward<Combiner>(combine_op), num_threads);
938 return seq.reduce(partials.begin(), partials.size(),
939 std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
942 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
945 Divider && divide_op,
946 Predicate && predicate_op,
948 Combiner && combine_op,
949 std::atomic<int> & num_threads)
const 952 if (num_threads.load() <=0) {
954 std::forward<Divider>(divide_op),
955 std::forward<Predicate>(predicate_op),
956 std::forward<Solver>(solve_op),
957 std::forward<Combiner>(combine_op));
960 if (predicate_op(input)) {
return solve_op(std::forward<Input>(input)); }
961 auto subproblems = divide_op(std::forward<Input>(input));
963 using subresult_type =
964 std::decay_t<typename std::result_of<Solver(Input)>::type>;
965 std::vector<subresult_type> partials(subproblems.size()-1);
967 auto process_subproblem = [&,
this](
auto it, std::size_t div) {
969 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
970 std::forward<Solver>(solve_op),
971 std::forward<Combiner>(combine_op), num_threads);
977 auto i = subproblems.begin() + 1;
978 while (i!=subproblems.end() && num_threads.load()>0) {
979 workers.
launch(*
this,process_subproblem, i++, division++);
983 while (i!=subproblems.end()) {
984 partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
985 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
986 std::forward<Combiner>(combine_op));
989 auto subresult =
divide_conquer(std::forward<Input>(*subproblems.begin()),
990 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
991 std::forward<Combiner>(combine_op), num_threads);
995 return seq.reduce(partials.begin(), partials.size(),
996 std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
998 template <
typename Queue,
typename Consumer,
1000 void parallel_execution_native::do_pipeline(
1001 Queue & input_queue,
1002 Consumer && consume_op)
const 1004 using namespace std;
1005 using gen_value_type =
typename Queue::value_type;
1006 using input_value_type =
typename gen_value_type::first_type;
1008 auto manager = thread_manager();
1009 if (!is_ordered()) {
1011 auto item = input_queue.pop();
1012 if (!item.first)
break;
1013 consume_op(*item.first);
1017 vector<gen_value_type> elements;
1020 auto item = input_queue.pop();
1021 if (!item.first)
break;
1022 if(current == item.second){
1023 consume_op(*item.first);
1027 elements.push_back(item);
1030 for (
auto it=elements.begin(); it!=elements.end(); it++) {
1031 if(it->second == current) {
1032 consume_op(*it->first);
1039 while (elements.size()>0) {
1041 for (
auto it = elements.begin(); it != elements.end(); it++) {
1042 if(it->second == current) {
1043 consume_op(*it->first);
1053 template <
typename Inqueue,
typename Transformer,
typename output_type,
1055 void parallel_execution_native::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1058 using namespace std;
1059 using namespace experimental;
1061 using input_item_type =
typename Inqueue::value_type;
1062 using input_item_value_type =
typename input_item_type::first_type::value_type;
1064 using output_optional_type =
typename output_type::first_type;
1065 using output_item_value_type =
typename output_type::first_type::value_type;
1067 auto item{input_queue.pop()};
1068 if(!item.first)
break;
1069 auto out = output_item_value_type{transform_op(*item.first)};
1070 output_queue.
push(make_pair(out,item.second)) ;
1076 template <
typename Queue,
typename Transformer,
1077 typename ... OtherTransformers,
1079 void parallel_execution_native::do_pipeline(
1080 Queue & input_queue,
1081 Transformer && transform_op,
1082 OtherTransformers && ... other_transform_ops)
const 1084 using namespace std;
1085 using namespace experimental;
1087 using input_item_type =
typename Queue::value_type;
1088 using input_item_value_type =
typename input_item_type::first_type::value_type;
1089 using transform_result_type =
1090 decay_t<typename result_of<Transformer(input_item_value_type)>::type>;
1091 using output_item_value_type = optional<transform_result_type>;
1092 using output_item_type = pair<output_item_value_type,long>;
1094 decltype(
auto) output_queue =
1095 get_output_queue<output_item_type>(other_transform_ops...);
1097 thread task([&,
this]() {
1098 auto manager = thread_manager();
1102 auto item{input_queue.pop()};
1103 if (!item.first)
break;
1104 auto out = output_item_value_type{transform_op(*item.first)};
1105 output_queue.
push(make_pair(out, item.second));
1107 output_queue.
push(make_pair(output_item_value_type{},-1));
1110 do_pipeline(output_queue,
1111 forward<OtherTransformers>(other_transform_ops)...);
1115 template <
typename Queue,
typename FarmTransformer,
1116 template <
typename>
class Farm,
1118 void parallel_execution_native::do_pipeline(
1119 Queue & input_queue,
1120 Farm<FarmTransformer> && farm_obj)
const 1122 using namespace std;
1123 using input_item_type =
typename Queue::value_type;
1124 using input_item_value_type =
typename input_item_type::first_type::value_type;
1125 using transform_result_type =
1126 decay_t<typename result_of<FarmTransformer(input_item_value_type)>::type>;
1127 using output_item_value_type = experimental::optional<transform_result_type>;
1128 using output_item_type = pair<output_item_value_type,long>;
1130 auto farm_task = [&](
int nt) {
1132 auto item{input_queue.pop()};
1133 while (item.first) {
1134 farm_obj(*item.first);
1135 item = input_queue.pop();
1137 input_queue.push(item);
1140 auto ntasks = farm_obj.cardinality();
1147 template <
typename Queue,
typename Execution,
typename Transformer,
1148 template <
typename,
typename>
class Context,
1149 typename ... OtherTransformers,
1151 void parallel_execution_native::do_pipeline(Queue & input_queue,
1152 Context<Execution,Transformer> && context_op,
1153 OtherTransformers &&... other_ops)
const 1155 using namespace std;
1156 using namespace experimental;
1158 using input_item_type =
typename Queue::value_type;
1159 using input_item_value_type =
typename input_item_type::first_type::value_type;
1162 using output_optional_type = experimental::optional<output_type>;
1163 using output_item_type = pair <output_optional_type, long> ;
1165 decltype(
auto) output_queue =
1166 get_output_queue<output_item_type>(other_ops...);
1168 auto context_task = [&](
int nt) {
1169 context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1170 output_queue.
push( make_pair(output_optional_type{}, -1) );
1176 do_pipeline(output_queue,
1177 forward<OtherTransformers>(other_ops)... );
1183 template <
typename Queue,
typename FarmTransformer,
1184 template <
typename>
class Farm,
1185 typename ... OtherTransformers,
1187 void parallel_execution_native::do_pipeline(
1188 Queue & input_queue,
1189 Farm<FarmTransformer> && farm_obj,
1190 OtherTransformers && ... other_transform_ops)
const 1192 using namespace std;
1193 using namespace experimental;
1195 using input_item_type =
typename Queue::value_type;
1196 using input_item_value_type =
typename input_item_type::first_type::value_type;
1199 using output_optional_type = experimental::optional<output_type>;
1200 using output_item_type = pair <output_optional_type, long> ;
1202 decltype(
auto) output_queue =
1203 get_output_queue<output_item_type>(other_transform_ops...);
1205 atomic<int> done_threads{0};
1207 auto farm_task = [&](
int nt) {
1208 do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1210 if (done_threads == nt) {
1211 output_queue.
push(make_pair(output_optional_type{}, -1));
1213 input_queue.push(input_item_type{});
1217 auto ntasks = farm_obj.cardinality();
1220 do_pipeline(output_queue,
1221 forward<OtherTransformers>(other_transform_ops)... );
1226 template <
typename Queue,
typename Predicate,
1227 template <
typename>
class Filter,
1228 typename ... OtherTransformers,
1230 void parallel_execution_native::do_pipeline(
1231 Queue & input_queue,
1232 Filter<Predicate> && filter_obj,
1233 OtherTransformers && ... other_transform_ops)
const 1235 using namespace std;
1236 using namespace experimental;
1238 using input_item_type =
typename Queue::value_type;
1239 using input_value_type =
typename input_item_type::first_type;
1240 auto filter_queue = make_queue<input_item_type>();
1242 auto filter_task = [&,
this]() {
1243 auto manager = thread_manager();
1244 auto item{input_queue.pop()};
1245 while (item.first) {
1246 if (filter_obj(*item.first)) {
1247 filter_queue.push(item);
1250 filter_queue.push(make_pair(input_value_type{}, item.second));
1252 item = input_queue.pop();
1254 filter_queue.push(make_pair(input_value_type{}, -1));
1256 thread filter_thread{filter_task};
1259 decltype(
auto) output_queue =
1260 get_output_queue<input_item_type>(other_transform_ops...);
1262 thread ordering_thread;
1264 auto ordering_task = [&]() {
1265 auto manager = thread_manager();
1266 vector<input_item_type> elements;
1269 auto item{filter_queue.pop()};
1271 if(!item.first && item.second == -1)
break;
1272 if (item.second == current) {
1274 output_queue.
push(make_pair(item.first,order));
1280 elements.push_back(item);
1283 for (
auto it=elements.begin(); it<elements.end(); it++) {
1284 if (it->second == current) {
1286 output_queue.
push(make_pair(it->first,order));
1294 item = filter_queue.pop();
1296 while (elements.size()>0) {
1298 for (
auto it=elements.begin(); it<elements.end(); it++) {
1299 if (it->second == current) {
1301 output_queue.
push(make_pair(it->first,order));
1310 output_queue.
push(item);
1313 ordering_thread = thread{ordering_task};
1314 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1315 filter_thread.join();
1316 ordering_thread.join();
1319 do_pipeline(filter_queue, forward<OtherTransformers>(other_transform_ops)...);
1320 filter_thread.join();
1324 template <
typename Queue,
typename Combiner,
typename Identity,
1325 template <
typename C,
typename I>
class Reduce,
1326 typename ... OtherTransformers,
1328 void parallel_execution_native::do_pipeline(
1329 Queue && input_queue,
1330 Reduce<Combiner,Identity> && reduce_obj,
1331 OtherTransformers && ... other_transform_ops)
const 1333 using namespace std;
1334 using namespace experimental;
1336 using input_item_type =
typename decay_t<Queue>::value_type;
1337 using input_item_value_type =
typename input_item_type::first_type::value_type;
1338 using output_item_value_type = optional<decay_t<Identity>>;
1339 using output_item_type = pair<output_item_value_type,long>;
1340 decltype(
auto) output_queue =
1341 get_output_queue<output_item_type>(other_transform_ops...);
1343 auto reduce_task = [&,
this]() {
1344 auto manager = thread_manager();
1345 auto item{input_queue.pop()};
1347 while (item.first) {
1348 reduce_obj.add_item(std::forward<Identity>(*item.first));
1349 item = input_queue.pop();
1350 if (reduce_obj.reduction_needed()) {
1352 auto red = reduce_obj.reduce_window(seq);
1353 output_queue.
push(make_pair(red, order++));
1356 output_queue.
push(make_pair(output_item_value_type{}, -1));
1358 thread reduce_thread{reduce_task};
1359 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1360 reduce_thread.join();
1363 template <
typename Queue,
typename Transformer,
typename Predicate,
1364 template <
typename T,
typename P>
class Iteration,
1365 typename ... OtherTransformers,
1368 void parallel_execution_native::do_pipeline(
1369 Queue & input_queue,
1370 Iteration<Transformer,Predicate> && iteration_obj,
1371 OtherTransformers && ... other_transform_ops)
const 1373 using namespace std;
1374 using namespace experimental;
1376 using input_item_type =
typename decay_t<Queue>::value_type;
1377 using input_item_value_type =
typename input_item_type::first_type::value_type;
1379 decltype(
auto) output_queue =
1380 get_output_queue<input_item_type>(other_transform_ops...);
1382 auto iteration_task = [&]() {
1384 auto item = input_queue.pop();
1385 if (!item.first)
break;
1386 auto value = iteration_obj.transform(*item.first);
1387 auto new_item = input_item_type{value,item.second};
1388 if (iteration_obj.predicate(value)) {
1389 output_queue.
push(new_item);
1392 input_queue.push(new_item);
1395 while (!input_queue.is_empty()) {
1396 auto item = input_queue.pop();
1397 auto value = iteration_obj.transform(*item.first);
1398 auto new_item = input_item_type{value,item.second};
1399 if (iteration_obj.predicate(value)) {
1400 output_queue.
push(new_item);
1403 input_queue.push(new_item);
1406 output_queue.
push(input_item_type{{},-1});
1409 thread iteration_thread{iteration_task};
1410 do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1411 iteration_thread.join();
1414 template <
typename Queue,
typename Transformer,
typename Predicate,
1415 template <
typename T,
typename P>
class Iteration,
1416 typename ... OtherTransformers,
1417 requires_iteration<Iteration<Transformer,Predicate>>,
1419 void parallel_execution_native::do_pipeline(
1420 Queue & input_queue,
1421 Iteration<Transformer,Predicate> && iteration_obj,
1422 OtherTransformers && ... other_transform_ops)
const 1424 static_assert(!is_pipeline<Transformer>,
"Not implemented");
1428 template <
typename Queue,
typename ... Transformers,
1429 template <
typename...>
class Pipeline,
1431 void parallel_execution_native::do_pipeline(
1432 Queue & input_queue,
1433 Pipeline<Transformers...> && pipeline_obj)
const 1437 pipeline_obj.transformers(),
1438 std::make_index_sequence<
sizeof...(Transformers)>());
1441 template <
typename Queue,
typename ... Transformers,
1442 template <
typename...>
class Pipeline,
1443 typename ... OtherTransformers,
1445 void parallel_execution_native::do_pipeline(
1446 Queue & input_queue,
1447 Pipeline<Transformers...> && pipeline_obj,
1448 OtherTransformers && ... other_transform_ops)
const 1452 std::tuple_cat(pipeline_obj.transformers(),
1453 std::forward_as_tuple(other_transform_ops...)),
1454 std::make_index_sequence<
sizeof...(Transformers)+
sizeof...(OtherTransformers)>());
1457 template <
typename Queue,
typename ... Transformers,
1459 void parallel_execution_native::do_pipeline_nested(
1460 Queue & input_queue,
1461 std::tuple<Transformers...> && transform_ops,
1462 std::index_sequence<I...>)
const 1464 do_pipeline(input_queue,
1465 std::forward<Transformers>(std::get<I>(transform_ops))...);
Definition: callable_traits.h:26
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_native.h:179
void pipeline(const Execution &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream.
Definition: pipeline.h:51
void launch(const E &ex, F f, Args &&...args)
Launch a function in the pool.
Definition: worker_pool.h:62
constexpr bool is_supported< parallel_execution_native >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_native.h:639
void stencil(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, OutputIt out, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op)
Invoke Stencil pattern on a data sequence with sequential execution.
Definition: stencil.h:59
native_thread_manager(thread_registry ®istry)
Saves a reference to the registry and registers current thread.
Definition: parallel_execution_native.h:120
std::enable_if_t< is_reduce< T >, int > requires_reduce
Definition: reduce_pattern.h:135
constexpr bool supports_map< parallel_execution_native >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_native.h:646
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:55
Pool of worker threads. This class offers a simple pool of worker threads.
Definition: worker_pool.h:34
constexpr bool supports_reduce< parallel_execution_native >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_native.h:653
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_native.h:189
native_thread_manager thread_manager() const
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:205
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:184
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_native.h:257
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_native.h:797
typename std::enable_if_t< is_iteration< T >, int > requires_iteration
Definition: iteration_pattern.h:88
constexpr bool supports_divide_conquer< parallel_execution_native >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_native.h:674
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>()
Definition: parallel_execution_native.h:220
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_native.h:721
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:175
constexpr bool supports_map_reduce< parallel_execution_native >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_native.h:660
queue_mode
Definition: mpmc_queue.h:35
void deregister_thread() noexcept
Removes current thread id from the registry.
Definition: parallel_execution_native.h:88
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:140
thread_registry() noexcept=default
Definition: mpmc_queue.h:38
void launch_tasks(const E &ex, F &&f, Args &&...args)
Definition: worker_pool.h:70
parallel_execution_native() noexcept
Default construct a native parallel execution policy.
Definition: parallel_execution_native.h:153
~native_thread_manager()
Deregisters current thread from the registry.
Definition: parallel_execution_native.h:127
constexpr bool is_parallel_execution_native()
Metafunction that determines if type E is parallel_execution_native.
Definition: parallel_execution_native.h:630
void register_thread() noexcept
Adds the current thread id in the registry.
Definition: parallel_execution_native.h:79
constexpr void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:502
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern comming from another context that uses mpmc_queues as communication channels...
Definition: parallel_execution_native.h:405
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:92
auto map_reduce(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke Map/reduce pattern on a data sequence.
Definition: mapreduce.h:57
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_native.h:758
constexpr bool supports_stencil< parallel_execution_native >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_native.h:667
int current_index() const noexcept
Integer index for current thread.
Definition: parallel_execution_native.h:98
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a trasnformation to multiple sequences leaving the result in another sequence by chunks accor...
Definition: parallel_execution_native.h:685
return_type type
Definition: patterns.h:103
typename std::enable_if_t< is_filter< T >, int > requires_filter
Definition: filter_pattern.h:70
Sequential execution policy.
Definition: sequential_execution.h:41
parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept
Constructs a native parallel execution policy.
Definition: parallel_execution_native.h:167
void pipeline(Generator &&generate_op, Transformers &&...transform_ops) const
Invoke Pipeline pattern.
Definition: parallel_execution_native.h:862
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: sequential_execution.h:543
decltype(auto) apply_deref_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the values obtained from the iterators in a tuple-like object...
Definition: iterator.h:63
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of appliying a function on a input type.
Definition: patterns.h:110
Thread index table to provide portable natural thread indices.
Definition: parallel_execution_native.h:53
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_native.h:194
RAII class to manage registration/deregistration pairs. This class allows to manage automatic deregis...
Definition: parallel_execution_native.h:115
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: parallel_execution_native.h:831
parallel_execution_native(const parallel_execution_native &ex)
Definition: parallel_execution_native.h:172
auto divide_conquer(const Execution &ex, Input &&input, Divider &÷r_op, Solver &&solver_op, Combiner &&combiner_op)
Invoke Divide/conquer pattern. Execution Execution type.
Definition: divideconquer.h:53
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_native.h:245
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:232
constexpr bool supports_pipeline< parallel_execution_native >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_native.h:681
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_native.h:213
typename std::enable_if_t< is_context< T >, int > requires_context
Definition: common/context.h:95
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_native.h:199
void map(const Execution &ex, std::tuple< InputIterators... > firsts, InputIt last, OutputIt first_out, Transformer transform_op)
Invoke Map pattern on a data sequence.
Definition: map.h:56
bool push(T item)
Definition: mpmc_queue.h:128
typename std::enable_if_t< is_farm< T >, int > requires_farm
Definition: farm_pattern.h:89