GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
parallel_execution_native.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
22 #define GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
23 
24 #include "worker_pool.h"
25 #include "../common/mpmc_queue.h"
26 #include "../common/iterator.h"
27 #include "../common/execution_traits.h"
28 
29 #include <thread>
30 #include <atomic>
31 #include <algorithm>
32 #include <vector>
33 #include <type_traits>
34 #include <tuple>
35 #include <experimental/optional>
36 
37 namespace grppi {
38 
54 public:
55  thread_registry() noexcept = default;
56 
60  void register_thread() noexcept;
61 
65  void deregister_thread() noexcept;
66 
72  int current_index() const noexcept;
73 
74 private:
75  mutable std::atomic_flag lock_ = ATOMIC_FLAG_INIT;
76  std::vector<std::thread::id> ids_;
77 };
78 
79 inline void thread_registry::register_thread() noexcept
80 {
81  using namespace std;
82  while (lock_.test_and_set(memory_order_acquire)) {}
83  auto this_id = this_thread::get_id();
84  ids_.push_back(this_id);
85  lock_.clear(memory_order_release);
86 }
87 
88 inline void thread_registry::deregister_thread() noexcept
89 {
90  using namespace std;
91  while (lock_.test_and_set(memory_order_acquire)) {}
92  auto this_id = this_thread::get_id();
93  auto current = find(begin(ids_), end(ids_), this_id);
94  *current = {}; //Empty thread
95  lock_.clear(memory_order_release);
96 }
97 
98 inline int thread_registry::current_index() const noexcept
99 {
100  using namespace std;
101  while (lock_.test_and_set(memory_order_acquire)) {}
102  auto this_id = this_thread::get_id();
103  auto current = find(begin(ids_), end(ids_), this_id);
104  auto index = distance(begin(ids_), current);
105  lock_.clear(memory_order_release);
106  return index;
107 };
108 
116 public:
121  : registry_{registry}
122  { registry_.register_thread(); }
123 
128  registry_.deregister_thread();
129  }
130 
131 private:
132  thread_registry & registry_;
133 };
134 
141 public:
142 
155  static_cast<int>(2 * std::thread::hardware_concurrency()),
156  true}
157  {}
158 
167  parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept :
168  concurrency_degree_{concurrency_degree},
169  ordering_{ordering}
170  {}
171 
173  parallel_execution_native{ex.concurrency_degree_, ex.ordering_}
174  {}
175 
179  void set_concurrency_degree(int degree) noexcept { concurrency_degree_ = degree; }
180 
184  int concurrency_degree() const noexcept { return concurrency_degree_; }
185 
189  void enable_ordering() noexcept { ordering_=true; }
190 
194  void disable_ordering() noexcept { ordering_=false; }
195 
199  bool is_ordered() const noexcept { return ordering_; }
200 
206  return native_thread_manager{thread_registry_};
207  }
208 
213  int get_thread_id() const noexcept {
214  return thread_registry_.current_index();
215  }
216 
220  void set_queue_attributes(int size, queue_mode mode) noexcept {
221  queue_size_ = size;
222  queue_mode_ = mode;
223  }
224 
231  template <typename T>
233  return {queue_size_, queue_mode_};
234  }
235 
244  template <typename T, typename ... Transformers>
245  mpmc_queue<T>& get_output_queue(mpmc_queue<T> & queue, Transformers && ...) const {
246  return queue;
247  }
248 
256  template <typename T, typename ... Transformers>
257  mpmc_queue<T> get_output_queue(Transformers && ...) const{
258  return std::move(make_queue<T>());
259  }
260 
275  template <typename ... InputIterators, typename OutputIterator,
276  typename Transformer>
277  void map(std::tuple<InputIterators...> firsts,
278  OutputIterator first_out,
279  std::size_t sequence_size, Transformer transform_op) const;
280 
293  template <typename InputIterator, typename Identity, typename Combiner>
294  auto reduce(InputIterator first, std::size_t sequence_size,
295  Identity && identity, Combiner && combine_op) const;
296 
311  template <typename ... InputIterators, typename Identity,
312  typename Transformer, typename Combiner>
313  auto map_reduce(std::tuple<InputIterators...> firsts,
314  std::size_t sequence_size,
315  Identity && identity,
316  Transformer && transform_op, Combiner && combine_op) const;
317 
334  template <typename ... InputIterators, typename OutputIterator,
335  typename StencilTransformer, typename Neighbourhood>
336  void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
337  std::size_t sequence_size,
338  StencilTransformer && transform_op,
339  Neighbourhood && neighbour_op) const;
340 
353  template <typename Input, typename Divider, typename Solver, typename Combiner>
354  [[deprecated("Use new interface with predicate argument")]]
355  auto divide_conquer(Input && input,
356  Divider && divide_op,
357  Solver && solve_op,
358  Combiner && combine_op) const;
359 
374  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
375  auto divide_conquer(Input && input,
376  Divider && divide_op,
377  Predicate && predicate_op,
378  Solver && solve_op,
379  Combiner && combine_op) const;
380 
381 
382 
390  template <typename Generator, typename ... Transformers>
391  void pipeline(Generator && generate_op,
392  Transformers && ... transform_ops) const;
393 
404  template <typename InputType, typename Transformer, typename OutputType>
405  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
406  mpmc_queue<OutputType> &output_queue) const
407  {
408  do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
409  }
410 
411 private:
412 
413  template <typename Input, typename Divider, typename Solver, typename Combiner>
414  auto divide_conquer(Input && input,
415  Divider && divide_op,
416  Solver && solve_op,
417  Combiner && combine_op,
418  std::atomic<int> & num_threads) const;
419 
420  template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
421  auto divide_conquer(Input && input,
422  Divider && divide_op,
423  Predicate && predicate_op,
424  Solver && solve_op,
425  Combiner && combine_op,
426  std::atomic<int> & num_threads) const;
427 
428 
429  template <typename Queue, typename Consumer,
431  void do_pipeline(Queue & input_queue, Consumer && consume_op) const;
432 
433  template <typename Inqueue, typename Transformer, typename output_type,
435  void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
436  mpmc_queue<output_type> & output_queue) const;
437 
438  template <typename T, typename ... Others>
439  void do_pipeline(mpmc_queue<T> & in_q, mpmc_queue<T> & same_queue, Others &&... ops) const
440  { }
441 
442  template <typename T>
443  void do_pipeline(mpmc_queue<T> & in_q) const {}
444 
445 
446  template <typename Queue, typename Transformer, typename ... OtherTransformers,
447  requires_no_pattern<Transformer> = 0>
448  void do_pipeline(Queue & input_queue, Transformer && transform_op,
449  OtherTransformers && ... other_ops) const;
450 
451  template <typename Queue, typename FarmTransformer,
452  template <typename> class Farm,
454  void do_pipeline(Queue & input_queue,
455  Farm<FarmTransformer> & farm_obj) const
456  {
457  do_pipeline(input_queue, std::move(farm_obj));
458  }
459 
460  template <typename Queue, typename FarmTransformer,
461  template <typename> class Farm,
462  requires_farm<Farm<FarmTransformer>> = 0>
463  void do_pipeline( Queue & input_queue,
464  Farm<FarmTransformer> && farm_obj) const;
465 
466  template <typename Queue, typename Execution, typename Transformer,
467  template <typename, typename> class Context,
468  typename ... OtherTransformers,
470  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
471  OtherTransformers &&... other_ops) const;
472 
473  template <typename Queue, typename Execution, typename Transformer,
474  template <typename, typename> class Context,
475  typename ... OtherTransformers,
476  requires_context<Context<Execution,Transformer>> = 0>
477  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
478  OtherTransformers &&... other_ops) const
479  {
480  do_pipeline(input_queue, std::move(context_op),
481  std::forward<OtherTransformers>(other_ops)...);
482  }
483 
484  template <typename Queue, typename FarmTransformer,
485  template <typename> class Farm,
486  typename ... OtherTransformers,
487  requires_farm<Farm<FarmTransformer>> = 0>
488  void do_pipeline(Queue & input_queue,
489  Farm<FarmTransformer> & farm_obj,
490  OtherTransformers && ... other_transform_ops) const
491  {
492  do_pipeline(input_queue, std::move(farm_obj),
493  std::forward<OtherTransformers>(other_transform_ops)...);
494  }
495 
496  template <typename Queue, typename FarmTransformer,
497  template <typename> class Farm,
498  typename ... OtherTransformers,
499  requires_farm<Farm<FarmTransformer>> = 0>
500  void do_pipeline(Queue & input_queue,
501  Farm<FarmTransformer> && farm_obj,
502  OtherTransformers && ... other_transform_ops) const;
503 
504  template <typename Queue, typename Predicate,
505  template <typename> class Filter,
506  typename ... OtherTransformers,
508  void do_pipeline(Queue & input_queue,
509  Filter<Predicate> & filter_obj,
510  OtherTransformers && ... other_transform_ops) const
511  {
512  do_pipeline(input_queue, std::move(filter_obj),
513  std::forward<OtherTransformers>(other_transform_ops)...);
514  }
515 
516  template <typename Queue, typename Predicate,
517  template <typename> class Filter,
518  typename ... OtherTransformers,
519  requires_filter<Filter<Predicate>> =0>
520  void do_pipeline(Queue & input_queue,
521  Filter<Predicate> && farm_obj,
522  OtherTransformers && ... other_transform_ops) const;
523 
524  template <typename Queue, typename Combiner, typename Identity,
525  template <typename C, typename I> class Reduce,
526  typename ... OtherTransformers,
528  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
529  OtherTransformers && ... other_transform_ops) const
530  {
531  do_pipeline(input_queue, std::move(reduce_obj),
532  std::forward<OtherTransformers>(other_transform_ops)...);
533  };
534 
535  template <typename Queue, typename Combiner, typename Identity,
536  template <typename C, typename I> class Reduce,
537  typename ... OtherTransformers,
538  requires_reduce<Reduce<Combiner,Identity>> = 0>
539  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
540  OtherTransformers && ... other_transform_ops) const;
541 
542  template <typename Queue, typename Transformer, typename Predicate,
543  template <typename T, typename P> class Iteration,
544  typename ... OtherTransformers,
546  requires_no_pattern<Transformer> =0>
547  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
548  OtherTransformers && ... other_transform_ops) const
549  {
550  do_pipeline(input_queue, std::move(iteration_obj),
551  std::forward<OtherTransformers>(other_transform_ops)...);
552  }
553 
554  template <typename Queue, typename Transformer, typename Predicate,
555  template <typename T, typename P> class Iteration,
556  typename ... OtherTransformers,
557  requires_iteration<Iteration<Transformer,Predicate>> =0,
558  requires_no_pattern<Transformer> =0>
559  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
560  OtherTransformers && ... other_transform_ops) const;
561 
562  template <typename Queue, typename Transformer, typename Predicate,
563  template <typename T, typename P> class Iteration,
564  typename ... OtherTransformers,
565  requires_iteration<Iteration<Transformer,Predicate>> =0,
567  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
568  OtherTransformers && ... other_transform_ops) const;
569 
570 
571  template <typename Queue, typename ... Transformers,
572  template <typename...> class Pipeline,
573  requires_pipeline<Pipeline<Transformers...>> = 0>
574  void do_pipeline(Queue & input_queue,
575  Pipeline<Transformers...> & pipeline_obj) const
576  {
577  do_pipeline(input_queue, std::move(pipeline_obj));
578  }
579 
580  template <typename Queue, typename ... Transformers,
581  template <typename...> class Pipeline,
582  requires_pipeline<Pipeline<Transformers...>> = 0>
583  void do_pipeline(Queue & input_queue,
584  Pipeline<Transformers...> && pipeline_obj) const;
585 
586  template <typename Queue, typename ... Transformers,
587  template <typename...> class Pipeline,
588  typename ... OtherTransformers,
589  requires_pipeline<Pipeline<Transformers...>> = 0>
590  void do_pipeline(Queue & input_queue,
591  Pipeline<Transformers...> & pipeline_obj,
592  OtherTransformers && ... other_transform_ops) const
593  {
594  do_pipeline(input_queue, std::move(pipeline_obj),
595  std::forward<OtherTransformers>(other_transform_ops)...);
596  }
597 
598  template <typename Queue, typename ... Transformers,
599  template <typename...> class Pipeline,
600  typename ... OtherTransformers,
601  requires_pipeline<Pipeline<Transformers...>> = 0>
602  void do_pipeline(Queue & input_queue,
603  Pipeline<Transformers...> && pipeline_obj,
604  OtherTransformers && ... other_transform_ops) const;
605 
606  template <typename Queue, typename ... Transformers,
607  std::size_t ... I>
608  void do_pipeline_nested(
609  Queue & input_queue,
610  std::tuple<Transformers...> && transform_ops,
611  std::index_sequence<I...>) const;
612 
613 private:
614  mutable thread_registry thread_registry_;
615 
616  int concurrency_degree_;
617  bool ordering_;
618 
619  constexpr static int default_queue_size = 100;
620  int queue_size_ = default_queue_size;
621 
622  queue_mode queue_mode_ = queue_mode::blocking;
623 };
624 
629 template <typename E>
631  return std::is_same<E, parallel_execution_native>::value;
632 }
633 
638 template <>
639 constexpr bool is_supported<parallel_execution_native>() { return true; }
640 
645 template <>
646 constexpr bool supports_map<parallel_execution_native>() { return true; }
647 
652 template <>
653 constexpr bool supports_reduce<parallel_execution_native>() { return true; }
654 
659 template <>
660 constexpr bool supports_map_reduce<parallel_execution_native>() { return true; }
661 
666 template <>
667 constexpr bool supports_stencil<parallel_execution_native>() { return true; }
668 
673 template <>
674 constexpr bool supports_divide_conquer<parallel_execution_native>() { return true; }
675 
680 template <>
681 constexpr bool supports_pipeline<parallel_execution_native>() { return true; }
682 
683 template <typename ... InputIterators, typename OutputIterator,
684  typename Transformer>
686  std::tuple<InputIterators...> firsts,
687  OutputIterator first_out,
688  std::size_t sequence_size, Transformer transform_op) const
689 {
690  using namespace std;
691 
692  auto process_chunk =
693  [&transform_op](auto fins, std::size_t size, auto fout)
694  {
695  const auto l = next(get<0>(fins), size);
696  while (get<0>(fins)!=l) {
697  *fout++ = apply_deref_increment(
698  std::forward<Transformer>(transform_op), fins);
699  }
700  };
701 
702  const int chunk_size = sequence_size / concurrency_degree_;
703 
704  {
705  worker_pool workers{concurrency_degree_};
706  for (int i=0; i!=concurrency_degree_-1; ++i) {
707  const auto delta = chunk_size * i;
708  const auto chunk_firsts = iterators_next(firsts,delta);
709  const auto chunk_first_out = next(first_out, delta);
710  workers.launch(*this, process_chunk, chunk_firsts, chunk_size, chunk_first_out);
711  }
712 
713  const auto delta = chunk_size * (concurrency_degree_ - 1);
714  const auto chunk_firsts = iterators_next(firsts,delta);
715  const auto chunk_first_out = next(first_out, delta);
716  process_chunk(chunk_firsts, sequence_size - delta, chunk_first_out);
717  } // Pool synch
718 }
719 
720 template <typename InputIterator, typename Identity, typename Combiner>
722  InputIterator first, std::size_t sequence_size,
723  Identity && identity,
724  Combiner && combine_op) const
725 {
726  using result_type = std::decay_t<Identity>;
727  std::vector<result_type> partial_results(concurrency_degree_);
728 
729  constexpr sequential_execution seq;
730  auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
731  partial_results[id] = seq.reduce(f,sz, std::forward<Identity>(identity),
732  std::forward<Combiner>(combine_op));
733  };
734 
735  const auto chunk_size = sequence_size / concurrency_degree_;
736 
737  {
738  worker_pool workers{concurrency_degree_};
739  for (int i=0; i<concurrency_degree_-1; ++i) {
740  const auto delta = chunk_size * i;
741  const auto chunk_first = std::next(first,delta);
742  workers.launch(*this, process_chunk, chunk_first, chunk_size, i);
743  }
744 
745  const auto delta = chunk_size * (concurrency_degree_-1);
746  const auto chunk_first = std::next(first, delta);
747  const auto chunk_sz = sequence_size - delta;
748  process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
749  } // Pool synch
750 
751  return seq.reduce(std::next(partial_results.begin()),
752  partial_results.size()-1, std::forward<result_type>(partial_results[0]),
753  std::forward<Combiner>(combine_op));
754 }
755 
756 template <typename ... InputIterators, typename Identity,
757  typename Transformer, typename Combiner>
759  std::tuple<InputIterators...> firsts,
760  std::size_t sequence_size,
761  Identity && identity,
762  Transformer && transform_op, Combiner && combine_op) const
763 {
764  using result_type = std::decay_t<Identity>;
765  std::vector<result_type> partial_results(concurrency_degree_);
766 
767  constexpr sequential_execution seq;
768  auto process_chunk = [&](auto f, std::size_t sz, std::size_t id) {
769  partial_results[id] = seq.map_reduce(f, sz,
770  std::forward<Identity>(partial_results[id]),
771  std::forward<Transformer>(transform_op),
772  std::forward<Combiner>(combine_op));
773  };
774 
775  const auto chunk_size = sequence_size / concurrency_degree_;
776 
777  {
778  worker_pool workers{concurrency_degree_};
779  for(int i=0;i<concurrency_degree_-1;++i){
780  const auto delta = chunk_size * i;
781  const auto chunk_firsts = iterators_next(firsts,delta);
782  workers.launch(*this, process_chunk, chunk_firsts, chunk_size, i);
783  }
784 
785  const auto delta = chunk_size * (concurrency_degree_-1);
786  const auto chunk_firsts = iterators_next(firsts, delta);
787  process_chunk(chunk_firsts, sequence_size - delta, concurrency_degree_-1);
788  } // Pool synch
789 
790  return seq.reduce(std::next(partial_results.begin()),
791  partial_results.size()-1, std::forward<result_type>(partial_results[0]),
792  std::forward<Combiner>(combine_op));
793 }
794 
795 template <typename ... InputIterators, typename OutputIterator,
796  typename StencilTransformer, typename Neighbourhood>
798  std::tuple<InputIterators...> firsts, OutputIterator first_out,
799  std::size_t sequence_size,
800  StencilTransformer && transform_op,
801  Neighbourhood && neighbour_op) const
802 {
803  constexpr sequential_execution seq;
804  auto process_chunk =
805  [&transform_op, &neighbour_op,seq](auto fins, std::size_t sz, auto fout)
806  {
807  seq.stencil(fins, fout, sz,
808  std::forward<StencilTransformer>(transform_op),
809  std::forward<Neighbourhood>(neighbour_op));
810  };
811 
812  const auto chunk_size = sequence_size / concurrency_degree_;
813  {
814  worker_pool workers{concurrency_degree_};
815 
816  for (int i=0; i!=concurrency_degree_-1; ++i) {
817  const auto delta = chunk_size * i;
818  const auto chunk_firsts = iterators_next(firsts,delta);
819  const auto chunk_out = std::next(first_out,delta);
820  workers.launch(*this, process_chunk, chunk_firsts, chunk_size, chunk_out);
821  }
822 
823  const auto delta = chunk_size * (concurrency_degree_ - 1);
824  const auto chunk_firsts = iterators_next(firsts,delta);
825  const auto chunk_out = std::next(first_out,delta);
826  process_chunk(chunk_firsts, sequence_size - delta, chunk_out);
827  } // Pool synch
828 }
829 
830 template <typename Input, typename Divider, typename Solver, typename Combiner>
832  Input && problem,
833  Divider && divide_op,
834  Solver && solve_op,
835  Combiner && combine_op) const
836 {
837  std::atomic<int> num_threads{concurrency_degree_-1};
838 
839  return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
840  std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
841  num_threads);
842 }
843 
844 
845 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
847  Input && problem,
848  Divider && divide_op,
849  Predicate && predicate_op,
850  Solver && solve_op,
851  Combiner && combine_op) const
852 {
853  std::atomic<int> num_threads{concurrency_degree_-1};
854 
855  return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
856  std::forward<Predicate>(predicate_op),
857  std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
858  num_threads);
859 }
860 
861 template <typename Generator, typename ... Transformers>
863  Generator && generate_op,
864  Transformers && ... transform_ops) const
865 {
866  using namespace std;
867  using result_type = decay_t<typename result_of<Generator()>::type>;
868  using output_type = pair<result_type,long>;
869  auto output_queue = make_queue<output_type>();
870 
871  thread generator_task([&,this]() {
872  auto manager = thread_manager();
873 
874  long order = 0;
875  for (;;) {
876  auto item{generate_op()};
877  output_queue.push(make_pair(item, order));
878  order++;
879  if (!item) break;
880  }
881  });
882 
883  do_pipeline(output_queue, forward<Transformers>(transform_ops)...);
884  generator_task.join();
885 }
886 
887 // PRIVATE MEMBERS
888 
889 template <typename Input, typename Divider, typename Solver, typename Combiner>
891  Input && input,
892  Divider && divide_op,
893  Solver && solve_op,
894  Combiner && combine_op,
895  std::atomic<int> & num_threads) const
896 {
897  constexpr sequential_execution seq;
898  if (num_threads.load() <=0) {
899  return seq.divide_conquer(std::forward<Input>(input),
900  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
901  std::forward<Combiner>(combine_op));
902  }
903 
904  auto subproblems = divide_op(std::forward<Input>(input));
905  if (subproblems.size()<=1) { return solve_op(std::forward<Input>(input)); }
906 
907  using subresult_type =
908  std::decay_t<typename std::result_of<Solver(Input)>::type>;
909  std::vector<subresult_type> partials(subproblems.size()-1);
910 
911  auto process_subproblem = [&,this](auto it, std::size_t div) {
912  partials[div] = this->divide_conquer(std::forward<Input>(*it),
913  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
914  std::forward<Combiner>(combine_op), num_threads);
915  };
916 
917  int division = 0;
918 
919  worker_pool workers{num_threads.load()};
920  auto i = subproblems.begin() + 1;
921  while (i!=subproblems.end() && num_threads.load()>0) {
922  workers.launch(*this,process_subproblem, i++, division++);
923  num_threads--;
924  }
925 
926  while (i!=subproblems.end()) {
927  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
928  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
929  std::forward<Combiner>(combine_op));
930  }
931 
932  auto subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
933  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
934  std::forward<Combiner>(combine_op), num_threads);
935 
936  workers.wait();
937 
938  return seq.reduce(partials.begin(), partials.size(),
939  std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
940 }
941 
942 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
944  Input && input,
945  Divider && divide_op,
946  Predicate && predicate_op,
947  Solver && solve_op,
948  Combiner && combine_op,
949  std::atomic<int> & num_threads) const
950 {
951  constexpr sequential_execution seq;
952  if (num_threads.load() <=0) {
953  return seq.divide_conquer(std::forward<Input>(input),
954  std::forward<Divider>(divide_op),
955  std::forward<Predicate>(predicate_op),
956  std::forward<Solver>(solve_op),
957  std::forward<Combiner>(combine_op));
958  }
959 
960  if (predicate_op(input)) { return solve_op(std::forward<Input>(input)); }
961  auto subproblems = divide_op(std::forward<Input>(input));
962 
963  using subresult_type =
964  std::decay_t<typename std::result_of<Solver(Input)>::type>;
965  std::vector<subresult_type> partials(subproblems.size()-1);
966 
967  auto process_subproblem = [&,this](auto it, std::size_t div) {
968  partials[div] = this->divide_conquer(std::forward<Input>(*it),
969  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
970  std::forward<Solver>(solve_op),
971  std::forward<Combiner>(combine_op), num_threads);
972  };
973 
974  int division = 0;
975 
976  worker_pool workers{num_threads.load()};
977  auto i = subproblems.begin() + 1;
978  while (i!=subproblems.end() && num_threads.load()>0) {
979  workers.launch(*this,process_subproblem, i++, division++);
980  num_threads--;
981  }
982 
983  while (i!=subproblems.end()) {
984  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
985  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
986  std::forward<Combiner>(combine_op));
987  }
988 
989  auto subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
990  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
991  std::forward<Combiner>(combine_op), num_threads);
992 
993  workers.wait();
994 
995  return seq.reduce(partials.begin(), partials.size(),
996  std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
997 }
998 template <typename Queue, typename Consumer,
1000 void parallel_execution_native::do_pipeline(
1001  Queue & input_queue,
1002  Consumer && consume_op) const
1003 {
1004  using namespace std;
1005  using gen_value_type = typename Queue::value_type;
1006  using input_value_type = typename gen_value_type::first_type;
1007 
1008  auto manager = thread_manager();
1009  if (!is_ordered()) {
1010  for (;;) {
1011  auto item = input_queue.pop();
1012  if (!item.first) break;
1013  consume_op(*item.first);
1014  }
1015  return;
1016  }
1017  vector<gen_value_type> elements;
1018  long current = 0;
1019  for (;;) {
1020  auto item = input_queue.pop();
1021  if (!item.first) break;
1022  if(current == item.second){
1023  consume_op(*item.first);
1024  current ++;
1025  }
1026  else {
1027  elements.push_back(item);
1028  }
1029  // TODO: Probably find_if() + erase
1030  for (auto it=elements.begin(); it!=elements.end(); it++) {
1031  if(it->second == current) {
1032  consume_op(*it->first);
1033  elements.erase(it);
1034  current++;
1035  break;
1036  }
1037  }
1038  }
1039  while (elements.size()>0) {
1040  // TODO: Probably find_if() + erase
1041  for (auto it = elements.begin(); it != elements.end(); it++) {
1042  if(it->second == current) {
1043  consume_op(*it->first);
1044  elements.erase(it);
1045  current++;
1046  break;
1047  }
1048  }
1049  }
1050 }
1051 
1052 
1053 template <typename Inqueue, typename Transformer, typename output_type,
1055 void parallel_execution_native::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1056  mpmc_queue<output_type> & output_queue) const
1057 {
1058  using namespace std;
1059  using namespace experimental;
1060 
1061  using input_item_type = typename Inqueue::value_type;
1062  using input_item_value_type = typename input_item_type::first_type::value_type;
1063 
1064  using output_optional_type = typename output_type::first_type;
1065  using output_item_value_type = typename output_type::first_type::value_type;
1066  for (;;) {
1067  auto item{input_queue.pop()};
1068  if(!item.first) break;
1069  auto out = output_item_value_type{transform_op(*item.first)};
1070  output_queue.push(make_pair(out,item.second)) ;
1071  }
1072 }
1073 
1074 
1075 
1076 template <typename Queue, typename Transformer,
1077  typename ... OtherTransformers,
1079 void parallel_execution_native::do_pipeline(
1080  Queue & input_queue,
1081  Transformer && transform_op,
1082  OtherTransformers && ... other_transform_ops) const
1083 {
1084  using namespace std;
1085  using namespace experimental;
1086 
1087  using input_item_type = typename Queue::value_type;
1088  using input_item_value_type = typename input_item_type::first_type::value_type;
1089  using transform_result_type =
1090  decay_t<typename result_of<Transformer(input_item_value_type)>::type>;
1091  using output_item_value_type = optional<transform_result_type>;
1092  using output_item_type = pair<output_item_value_type,long>;
1093 
1094  decltype(auto) output_queue =
1095  get_output_queue<output_item_type>(other_transform_ops...);
1096 
1097  thread task([&,this]() {
1098  auto manager = thread_manager();
1099 
1100  long order = 0;
1101  for (;;) {
1102  auto item{input_queue.pop()};
1103  if (!item.first) break;
1104  auto out = output_item_value_type{transform_op(*item.first)};
1105  output_queue.push(make_pair(out, item.second));
1106  }
1107  output_queue.push(make_pair(output_item_value_type{},-1));
1108  });
1109 
1110  do_pipeline(output_queue,
1111  forward<OtherTransformers>(other_transform_ops)...);
1112  task.join();
1113 }
1114 
1115 template <typename Queue, typename FarmTransformer,
1116  template <typename> class Farm,
1118 void parallel_execution_native::do_pipeline(
1119  Queue & input_queue,
1120  Farm<FarmTransformer> && farm_obj) const
1121 {
1122  using namespace std;
1123  using input_item_type = typename Queue::value_type;
1124  using input_item_value_type = typename input_item_type::first_type::value_type;
1125  using transform_result_type =
1126  decay_t<typename result_of<FarmTransformer(input_item_value_type)>::type>;
1127  using output_item_value_type = experimental::optional<transform_result_type>;
1128  using output_item_type = pair<output_item_value_type,long>;
1129 
1130  auto farm_task = [&](int nt) {
1131  long order = 0;
1132  auto item{input_queue.pop()};
1133  while (item.first) {
1134  farm_obj(*item.first);
1135  item = input_queue.pop();
1136  }
1137  input_queue.push(item);
1138  };
1139 
1140  auto ntasks = farm_obj.cardinality();
1141  worker_pool workers{ntasks};
1142  workers.launch_tasks(*this, farm_task, ntasks);
1143  workers.wait();
1144 }
1145 
1146 
1147 template <typename Queue, typename Execution, typename Transformer,
1148  template <typename, typename> class Context,
1149  typename ... OtherTransformers,
1151 void parallel_execution_native::do_pipeline(Queue & input_queue,
1152  Context<Execution,Transformer> && context_op,
1153  OtherTransformers &&... other_ops) const
1154 {
1155  using namespace std;
1156  using namespace experimental;
1157 
1158  using input_item_type = typename Queue::value_type;
1159  using input_item_value_type = typename input_item_type::first_type::value_type;
1160 
1162  using output_optional_type = experimental::optional<output_type>;
1163  using output_item_type = pair <output_optional_type, long> ;
1164 
1165  decltype(auto) output_queue =
1166  get_output_queue<output_item_type>(other_ops...);
1167 
1168  auto context_task = [&](int nt) {
1169  context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1170  output_queue.push( make_pair(output_optional_type{}, -1) );
1171  };
1172 
1173  worker_pool workers{1};
1174  workers.launch_tasks(*this, context_task, 1);
1175 
1176  do_pipeline(output_queue,
1177  forward<OtherTransformers>(other_ops)... );
1178 
1179  workers.wait();
1180 }
1181 
1182 
1183 template <typename Queue, typename FarmTransformer,
1184  template <typename> class Farm,
1185  typename ... OtherTransformers,
1187 void parallel_execution_native::do_pipeline(
1188  Queue & input_queue,
1189  Farm<FarmTransformer> && farm_obj,
1190  OtherTransformers && ... other_transform_ops) const
1191 {
1192  using namespace std;
1193  using namespace experimental;
1194 
1195  using input_item_type = typename Queue::value_type;
1196  using input_item_value_type = typename input_item_type::first_type::value_type;
1197 
1199  using output_optional_type = experimental::optional<output_type>;
1200  using output_item_type = pair <output_optional_type, long> ;
1201 
1202  decltype(auto) output_queue =
1203  get_output_queue<output_item_type>(other_transform_ops...);
1204 
1205  atomic<int> done_threads{0};
1206 
1207  auto farm_task = [&](int nt) {
1208  do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1209  done_threads++;
1210  if (done_threads == nt) {
1211  output_queue.push(make_pair(output_optional_type{}, -1));
1212  }else{
1213  input_queue.push(input_item_type{});
1214  }
1215  };
1216 
1217  auto ntasks = farm_obj.cardinality();
1218  worker_pool workers{ntasks};
1219  workers.launch_tasks(*this, farm_task, ntasks);
1220  do_pipeline(output_queue,
1221  forward<OtherTransformers>(other_transform_ops)... );
1222 
1223  workers.wait();
1224 }
1225 
1226 template <typename Queue, typename Predicate,
1227  template <typename> class Filter,
1228  typename ... OtherTransformers,
1230 void parallel_execution_native::do_pipeline(
1231  Queue & input_queue,
1232  Filter<Predicate> && filter_obj,
1233  OtherTransformers && ... other_transform_ops) const
1234 {
1235  using namespace std;
1236  using namespace experimental;
1237 
1238  using input_item_type = typename Queue::value_type;
1239  using input_value_type = typename input_item_type::first_type;
1240  auto filter_queue = make_queue<input_item_type>();
1241 
1242  auto filter_task = [&,this]() {
1243  auto manager = thread_manager();
1244  auto item{input_queue.pop()};
1245  while (item.first) {
1246  if (filter_obj(*item.first)) {
1247  filter_queue.push(item);
1248  }
1249  else {
1250  filter_queue.push(make_pair(input_value_type{}, item.second));
1251  }
1252  item = input_queue.pop();
1253  }
1254  filter_queue.push(make_pair(input_value_type{}, -1));
1255  };
1256  thread filter_thread{filter_task};
1257 
1258  using queue_type = mpmc_queue<input_item_type>;
1259  decltype(auto) output_queue =
1260  get_output_queue<input_item_type>(other_transform_ops...);
1261 
1262  thread ordering_thread;
1263  if (is_ordered()) {
1264  auto ordering_task = [&]() {
1265  auto manager = thread_manager();
1266  vector<input_item_type> elements;
1267  int current = 0;
1268  long order = 0;
1269  auto item{filter_queue.pop()};
1270  for (;;) {
1271  if(!item.first && item.second == -1) break;
1272  if (item.second == current) {
1273  if (item.first) {
1274  output_queue.push(make_pair(item.first,order));
1275  order++;
1276  }
1277  current++;
1278  }
1279  else {
1280  elements.push_back(item);
1281  }
1282  // TODO: Probably find_if() + erase
1283  for (auto it=elements.begin(); it<elements.end(); it++) {
1284  if (it->second == current) {
1285  if (it->first) {
1286  output_queue.push(make_pair(it->first,order));
1287  order++;
1288  }
1289  elements.erase(it);
1290  current++;
1291  break;
1292  }
1293  }
1294  item = filter_queue.pop();
1295  }
1296  while (elements.size()>0) {
1297  // TODO: Probably find_if() + erase
1298  for (auto it=elements.begin(); it<elements.end(); it++) {
1299  if (it->second == current) {
1300  if(it->first) {
1301  output_queue.push(make_pair(it->first,order));
1302  order++;
1303  }
1304  elements.erase(it);
1305  current++;
1306  break;
1307  }
1308  }
1309  }
1310  output_queue.push(item);
1311  };
1312 
1313  ordering_thread = thread{ordering_task};
1314  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1315  filter_thread.join();
1316  ordering_thread.join();
1317  }
1318  else {
1319  do_pipeline(filter_queue, forward<OtherTransformers>(other_transform_ops)...);
1320  filter_thread.join();
1321  }
1322 }
1323 
1324 template <typename Queue, typename Combiner, typename Identity,
1325  template <typename C, typename I> class Reduce,
1326  typename ... OtherTransformers,
1328 void parallel_execution_native::do_pipeline(
1329  Queue && input_queue,
1330  Reduce<Combiner,Identity> && reduce_obj,
1331  OtherTransformers && ... other_transform_ops) const
1332 {
1333  using namespace std;
1334  using namespace experimental;
1335 
1336  using input_item_type = typename decay_t<Queue>::value_type;
1337  using input_item_value_type = typename input_item_type::first_type::value_type;
1338  using output_item_value_type = optional<decay_t<Identity>>;
1339  using output_item_type = pair<output_item_value_type,long>;
1340  decltype(auto) output_queue =
1341  get_output_queue<output_item_type>(other_transform_ops...);
1342 
1343  auto reduce_task = [&,this]() {
1344  auto manager = thread_manager();
1345  auto item{input_queue.pop()};
1346  int order = 0;
1347  while (item.first) {
1348  reduce_obj.add_item(std::forward<Identity>(*item.first));
1349  item = input_queue.pop();
1350  if (reduce_obj.reduction_needed()) {
1351  constexpr sequential_execution seq;
1352  auto red = reduce_obj.reduce_window(seq);
1353  output_queue.push(make_pair(red, order++));
1354  }
1355  }
1356  output_queue.push(make_pair(output_item_value_type{}, -1));
1357  };
1358  thread reduce_thread{reduce_task};
1359  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1360  reduce_thread.join();
1361 }
1362 
1363 template <typename Queue, typename Transformer, typename Predicate,
1364  template <typename T, typename P> class Iteration,
1365  typename ... OtherTransformers,
1368 void parallel_execution_native::do_pipeline(
1369  Queue & input_queue,
1370  Iteration<Transformer,Predicate> && iteration_obj,
1371  OtherTransformers && ... other_transform_ops) const
1372 {
1373  using namespace std;
1374  using namespace experimental;
1375 
1376  using input_item_type = typename decay_t<Queue>::value_type;
1377  using input_item_value_type = typename input_item_type::first_type::value_type;
1378 
1379  decltype(auto) output_queue =
1380  get_output_queue<input_item_type>(other_transform_ops...);
1381 
1382  auto iteration_task = [&]() {
1383  for (;;) {
1384  auto item = input_queue.pop();
1385  if (!item.first) break;
1386  auto value = iteration_obj.transform(*item.first);
1387  auto new_item = input_item_type{value,item.second};
1388  if (iteration_obj.predicate(value)) {
1389  output_queue.push(new_item);
1390  }
1391  else {
1392  input_queue.push(new_item);
1393  }
1394  }
1395  while (!input_queue.is_empty()) {
1396  auto item = input_queue.pop();
1397  auto value = iteration_obj.transform(*item.first);
1398  auto new_item = input_item_type{value,item.second};
1399  if (iteration_obj.predicate(value)) {
1400  output_queue.push(new_item);
1401  }
1402  else {
1403  input_queue.push(new_item);
1404  }
1405  }
1406  output_queue.push(input_item_type{{},-1});
1407  };
1408 
1409  thread iteration_thread{iteration_task};
1410  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1411  iteration_thread.join();
1412 }
1413 
1414 template <typename Queue, typename Transformer, typename Predicate,
1415  template <typename T, typename P> class Iteration,
1416  typename ... OtherTransformers,
1417  requires_iteration<Iteration<Transformer,Predicate>>,
1419 void parallel_execution_native::do_pipeline(
1420  Queue & input_queue,
1421  Iteration<Transformer,Predicate> && iteration_obj,
1422  OtherTransformers && ... other_transform_ops) const
1423 {
1424  static_assert(!is_pipeline<Transformer>, "Not implemented");
1425 }
1426 
1427 
1428 template <typename Queue, typename ... Transformers,
1429  template <typename...> class Pipeline,
1430  requires_pipeline<Pipeline<Transformers...>>>
1431 void parallel_execution_native::do_pipeline(
1432  Queue & input_queue,
1433  Pipeline<Transformers...> && pipeline_obj) const
1434 {
1435  do_pipeline_nested(
1436  input_queue,
1437  pipeline_obj.transformers(),
1438  std::make_index_sequence<sizeof...(Transformers)>());
1439 }
1440 
1441 template <typename Queue, typename ... Transformers,
1442  template <typename...> class Pipeline,
1443  typename ... OtherTransformers,
1444  requires_pipeline<Pipeline<Transformers...>>>
1445 void parallel_execution_native::do_pipeline(
1446  Queue & input_queue,
1447  Pipeline<Transformers...> && pipeline_obj,
1448  OtherTransformers && ... other_transform_ops) const
1449 {
1450  do_pipeline_nested(
1451  input_queue,
1452  std::tuple_cat(pipeline_obj.transformers(),
1453  std::forward_as_tuple(other_transform_ops...)),
1454  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
1455 }
1456 
1457 template <typename Queue, typename ... Transformers,
1458  std::size_t ... I>
1459 void parallel_execution_native::do_pipeline_nested(
1460  Queue & input_queue,
1461  std::tuple<Transformers...> && transform_ops,
1462  std::index_sequence<I...>) const
1463 {
1464  do_pipeline(input_queue,
1465  std::forward<Transformers>(std::get<I>(transform_ops))...);
1466 }
1467 
1468 } // end namespace grppi
1469 
1470 #endif
Definition: callable_traits.h:26
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_native.h:179
void pipeline(const Execution &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream.
Definition: pipeline.h:51
void launch(const E &ex, F f, Args &&...args)
Launch a function in the pool.
Definition: worker_pool.h:62
constexpr bool is_supported< parallel_execution_native >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_native.h:639
void stencil(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, OutputIt out, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op)
Invoke Stencil pattern on a data sequence with sequential execution.
Definition: stencil.h:59
native_thread_manager(thread_registry &registry)
Saves a reference to the registry and registers current thread.
Definition: parallel_execution_native.h:120
std::enable_if_t< is_reduce< T >, int > requires_reduce
Definition: reduce_pattern.h:135
constexpr bool supports_map< parallel_execution_native >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_native.h:646
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:55
Pool of worker threads. This class offers a simple pool of worker threads.
Definition: worker_pool.h:34
constexpr bool supports_reduce< parallel_execution_native >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_native.h:653
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_native.h:189
native_thread_manager thread_manager() const
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:205
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:184
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_native.h:257
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_native.h:797
typename std::enable_if_t< is_iteration< T >, int > requires_iteration
Definition: iteration_pattern.h:88
constexpr bool supports_divide_conquer< parallel_execution_native >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_native.h:674
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>()
Definition: parallel_execution_native.h:220
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_native.h:721
STL namespace.
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:175
constexpr bool supports_map_reduce< parallel_execution_native >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_native.h:660
queue_mode
Definition: mpmc_queue.h:35
void deregister_thread() noexcept
Removes current thread id from the registry.
Definition: parallel_execution_native.h:88
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:140
thread_registry() noexcept=default
Definition: mpmc_queue.h:38
void launch_tasks(const E &ex, F &&f, Args &&...args)
Definition: worker_pool.h:70
parallel_execution_native() noexcept
Default construct a native parallel execution policy.
Definition: parallel_execution_native.h:153
~native_thread_manager()
Deregisters current thread from the registry.
Definition: parallel_execution_native.h:127
constexpr bool is_parallel_execution_native()
Metafunction that determines if type E is parallel_execution_native.
Definition: parallel_execution_native.h:630
void register_thread() noexcept
Adds the current thread id in the registry.
Definition: parallel_execution_native.h:79
constexpr void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:502
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern comming from another context that uses mpmc_queues as communication channels...
Definition: parallel_execution_native.h:405
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:92
auto map_reduce(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke Map/reduce pattern on a data sequence.
Definition: mapreduce.h:57
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_native.h:758
constexpr bool supports_stencil< parallel_execution_native >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_native.h:667
int current_index() const noexcept
Integer index for current thread.
Definition: parallel_execution_native.h:98
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a trasnformation to multiple sequences leaving the result in another sequence by chunks accor...
Definition: parallel_execution_native.h:685
return_type type
Definition: patterns.h:103
typename std::enable_if_t< is_filter< T >, int > requires_filter
Definition: filter_pattern.h:70
Sequential execution policy.
Definition: sequential_execution.h:41
parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept
Constructs a native parallel execution policy.
Definition: parallel_execution_native.h:167
void pipeline(Generator &&generate_op, Transformers &&...transform_ops) const
Invoke Pipeline pattern.
Definition: parallel_execution_native.h:862
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: sequential_execution.h:543
decltype(auto) apply_deref_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the values obtained from the iterators in a tuple-like object...
Definition: iterator.h:63
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of appliying a function on a input type.
Definition: patterns.h:110
Thread index table to provide portable natural thread indices.
Definition: parallel_execution_native.h:53
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_native.h:194
RAII class to manage registration/deregistration pairs. This class allows to manage automatic deregis...
Definition: parallel_execution_native.h:115
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: parallel_execution_native.h:831
parallel_execution_native(const parallel_execution_native &ex)
Definition: parallel_execution_native.h:172
auto divide_conquer(const Execution &ex, Input &&input, Divider &&divider_op, Solver &&solver_op, Combiner &&combiner_op)
Invoke Divide/conquer pattern. Execution Execution type.
Definition: divideconquer.h:53
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_native.h:245
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:232
constexpr bool supports_pipeline< parallel_execution_native >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_native.h:681
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_native.h:213
typename std::enable_if_t< is_context< T >, int > requires_context
Definition: common/context.h:95
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_native.h:199
void map(const Execution &ex, std::tuple< InputIterators... > firsts, InputIt last, OutputIt first_out, Transformer transform_op)
Invoke Map pattern on a data sequence.
Definition: map.h:56
bool push(T item)
Definition: mpmc_queue.h:128
typename std::enable_if_t< is_farm< T >, int > requires_farm
Definition: farm_pattern.h:89