GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
parallel_execution_native.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
17 #define GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
18 
19 #include "worker_pool.h"
20 #include "../common/mpmc_queue.h"
21 #include "../common/iterator.h"
22 #include "../common/execution_traits.h"
23 
24 #include <thread>
25 #include <atomic>
26 #include <algorithm>
27 #include <vector>
28 #include <type_traits>
29 #include <tuple>
30 #include <experimental/optional>
31 
32 namespace grppi {
33 
49 public:
50  thread_registry() = default;
51 
55  void register_thread() noexcept;
56 
60  void deregister_thread() noexcept;
61 
67  int current_index() const noexcept;
68 
69 private:
70  mutable std::atomic_flag lock_ = ATOMIC_FLAG_INIT;
71  std::vector<std::thread::id> ids_{};
72 };
73 
74 inline void thread_registry::register_thread() noexcept
75 {
76  using namespace std;
77  while (lock_.test_and_set(memory_order_acquire)) {}
78  auto this_id = this_thread::get_id();
79  ids_.push_back(this_id);
80  lock_.clear(memory_order_release);
81 }
82 
83 inline void thread_registry::deregister_thread() noexcept
84 {
85  using namespace std;
86  while (lock_.test_and_set(memory_order_acquire)) {}
87  auto this_id = this_thread::get_id();
88  auto current = find(begin(ids_), end(ids_), this_id);
89  *current = {}; //Empty thread
90  lock_.clear(memory_order_release);
91 }
92 
93 inline int thread_registry::current_index() const noexcept
94 {
95  using namespace std;
96  while (lock_.test_and_set(memory_order_acquire)) {}
97  auto this_id = this_thread::get_id();
98  auto current = find(begin(ids_), end(ids_), this_id);
99  auto index = distance(begin(ids_), current);
100  lock_.clear(memory_order_release);
101  return index;
102 }
103 
111 public:
116  : registry_{registry}
117  { registry_.register_thread(); }
118 
123  registry_.deregister_thread();
124  }
125 
126 private:
127  thread_registry & registry_;
128 };
129 
136 public:
137 
150  static_cast<int>(2 * std::thread::hardware_concurrency()),
151  true}
152  {}
153 
162  parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept :
163  concurrency_degree_{concurrency_degree},
164  ordering_{ordering}
165  {}
166 
168  parallel_execution_native{ex.concurrency_degree_, ex.ordering_}
169  {}
170 
174  void set_concurrency_degree(int degree) noexcept { concurrency_degree_ = degree; }
175 
179  int concurrency_degree() const noexcept { return concurrency_degree_; }
180 
184  void enable_ordering() noexcept { ordering_=true; }
185 
189  void disable_ordering() noexcept { ordering_=false; }
190 
194  bool is_ordered() const noexcept { return ordering_; }
195 
201  return native_thread_manager{thread_registry_};
202  }
203 
208  int get_thread_id() const noexcept {
209  return thread_registry_.current_index();
210  }
211 
215  void set_queue_attributes(int size, queue_mode mode) noexcept {
216  queue_size_ = size;
217  queue_mode_ = mode;
218  }
219 
226  template <typename T>
228  return {queue_size_, queue_mode_};
229  }
230 
239  template <typename T, typename ... Transformers>
240  mpmc_queue<T>& get_output_queue(mpmc_queue<T> & queue, Transformers && ...) const {
241  return queue;
242  }
243 
251  template <typename T, typename ... Transformers>
252  mpmc_queue<T> get_output_queue(Transformers && ...) const{
253  return std::move(make_queue<T>());
254  }
255 
270  template <typename ... InputIterators, typename OutputIterator,
271  typename Transformer>
272  void map(std::tuple<InputIterators...> firsts,
273  OutputIterator first_out,
274  std::size_t sequence_size, Transformer transform_op) const;
275 
288  template <typename InputIterator, typename Identity, typename Combiner>
289  auto reduce(InputIterator first, std::size_t sequence_size,
290  Identity && identity, Combiner && combine_op) const;
291 
306  template <typename ... InputIterators, typename Identity,
307  typename Transformer, typename Combiner>
308  auto map_reduce(std::tuple<InputIterators...> firsts,
309  std::size_t sequence_size,
310  Identity && identity,
311  Transformer && transform_op, Combiner && combine_op) const;
312 
329  template <typename ... InputIterators, typename OutputIterator,
330  typename StencilTransformer, typename Neighbourhood>
331  void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
332  std::size_t sequence_size,
333  StencilTransformer && transform_op,
334  Neighbourhood && neighbour_op) const;
335 
348  template <typename Input, typename Divider, typename Solver, typename Combiner>
349  [[deprecated("Use new interface with predicate argument")]]
350  auto divide_conquer(Input && input,
351  Divider && divide_op,
352  Solver && solve_op,
353  Combiner && combine_op) const;
354 
369  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
370  auto divide_conquer(Input && input,
371  Divider && divide_op,
372  Predicate && predicate_op,
373  Solver && solve_op,
374  Combiner && combine_op) const;
375 
376 
377 
385  template <typename Generator, typename ... Transformers>
386  void pipeline(Generator && generate_op,
387  Transformers && ... transform_ops) const;
388 
399  template <typename InputType, typename Transformer, typename OutputType>
400  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
401  mpmc_queue<OutputType> &output_queue) const
402  {
403  do_pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
404  }
405 
406 private:
407 
408  template <typename Input, typename Divider, typename Solver, typename Combiner>
409  auto divide_conquer(Input && input,
410  Divider && divide_op,
411  Solver && solve_op,
412  Combiner && combine_op,
413  std::atomic<int> & num_threads) const;
414 
415  template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
416  auto divide_conquer(Input && input,
417  Divider && divide_op,
418  Predicate && predicate_op,
419  Solver && solve_op,
420  Combiner && combine_op,
421  std::atomic<int> & num_threads) const;
422 
423 
424  template <typename Queue, typename Consumer,
426  void do_pipeline(Queue & input_queue, Consumer && consume_op) const;
427 
428  template <typename Inqueue, typename Transformer, typename output_type,
430  void do_pipeline(Inqueue & input_queue, Transformer && transform_op,
431  mpmc_queue<output_type> & output_queue) const;
432 
433  template <typename T, typename ... Others>
434  void do_pipeline(mpmc_queue<T> & in_q, mpmc_queue<T> & same_queue, Others &&... ops) const;
435 
436  template <typename T>
437  void do_pipeline(mpmc_queue<T> &) const {}
438 
439 
440  template <typename Queue, typename Transformer, typename ... OtherTransformers,
441  requires_no_pattern<Transformer> = 0>
442  void do_pipeline(Queue & input_queue, Transformer && transform_op,
443  OtherTransformers && ... other_ops) const;
444 
445  template <typename Queue, typename FarmTransformer,
446  template <typename> class Farm,
447  requires_farm<Farm<FarmTransformer>> = 0>
448  void do_pipeline(Queue & input_queue,
449  Farm<FarmTransformer> & farm_obj) const
450  {
451  do_pipeline(input_queue, std::move(farm_obj));
452  }
453 
454  template <typename Queue, typename FarmTransformer,
455  template <typename> class Farm,
456  requires_farm<Farm<FarmTransformer>> = 0>
457  void do_pipeline( Queue & input_queue,
458  Farm<FarmTransformer> && farm_obj) const;
459 
460  template <typename Queue, typename Execution, typename Transformer,
461  template <typename, typename> class Context,
462  typename ... OtherTransformers,
463  requires_context<Context<Execution,Transformer>> = 0>
464  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> && context_op,
465  OtherTransformers &&... other_ops) const;
466 
467  template <typename Queue, typename Execution, typename Transformer,
468  template <typename, typename> class Context,
469  typename ... OtherTransformers,
470  requires_context<Context<Execution,Transformer>> = 0>
471  void do_pipeline(Queue & input_queue, Context<Execution,Transformer> & context_op,
472  OtherTransformers &&... other_ops) const
473  {
474  do_pipeline(input_queue, std::move(context_op),
475  std::forward<OtherTransformers>(other_ops)...);
476  }
477 
478  template <typename Queue, typename FarmTransformer,
479  template <typename> class Farm,
480  typename ... OtherTransformers,
481  requires_farm<Farm<FarmTransformer>> = 0>
482  void do_pipeline(Queue & input_queue,
483  Farm<FarmTransformer> & farm_obj,
484  OtherTransformers && ... other_transform_ops) const
485  {
486  do_pipeline(input_queue, std::move(farm_obj),
487  std::forward<OtherTransformers>(other_transform_ops)...);
488  }
489 
490  template <typename Queue, typename FarmTransformer,
491  template <typename> class Farm,
492  typename ... OtherTransformers,
493  requires_farm<Farm<FarmTransformer>> = 0>
494  void do_pipeline(Queue & input_queue,
495  Farm<FarmTransformer> && farm_obj,
496  OtherTransformers && ... other_transform_ops) const;
497 
498  template <typename Queue, typename Predicate,
499  template <typename> class Filter,
500  typename ... OtherTransformers,
501  requires_filter<Filter<Predicate>> =0>
502  void do_pipeline(Queue & input_queue,
503  Filter<Predicate> & filter_obj,
504  OtherTransformers && ... other_transform_ops) const
505  {
506  do_pipeline(input_queue, std::move(filter_obj),
507  std::forward<OtherTransformers>(other_transform_ops)...);
508  }
509 
510  template <typename Queue, typename Predicate,
511  template <typename> class Filter,
512  typename ... OtherTransformers,
513  requires_filter<Filter<Predicate>> =0>
514  void do_pipeline(Queue & input_queue,
515  Filter<Predicate> && farm_obj,
516  OtherTransformers && ... other_transform_ops) const;
517 
518  template <typename Queue, typename Combiner, typename Identity,
519  template <typename C, typename I> class Reduce,
520  typename ... OtherTransformers,
521  requires_reduce<Reduce<Combiner,Identity>> = 0>
522  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> & reduce_obj,
523  OtherTransformers && ... other_transform_ops) const
524  {
525  do_pipeline(input_queue, std::move(reduce_obj),
526  std::forward<OtherTransformers>(other_transform_ops)...);
527  }
528 
529  template <typename Queue, typename Combiner, typename Identity,
530  template <typename C, typename I> class Reduce,
531  typename ... OtherTransformers,
532  requires_reduce<Reduce<Combiner,Identity>> = 0>
533  void do_pipeline(Queue && input_queue, Reduce<Combiner,Identity> && reduce_obj,
534  OtherTransformers && ... other_transform_ops) const;
535 
536  template <typename Queue, typename Transformer, typename Predicate,
537  template <typename T, typename P> class Iteration,
538  typename ... OtherTransformers,
539  requires_iteration<Iteration<Transformer,Predicate>> =0,
540  requires_no_pattern<Transformer> =0>
541  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> & iteration_obj,
542  OtherTransformers && ... other_transform_ops) const
543  {
544  do_pipeline(input_queue, std::move(iteration_obj),
545  std::forward<OtherTransformers>(other_transform_ops)...);
546  }
547 
548  template <typename Queue, typename Transformer, typename Predicate,
549  template <typename T, typename P> class Iteration,
550  typename ... OtherTransformers,
551  requires_iteration<Iteration<Transformer,Predicate>> =0,
552  requires_no_pattern<Transformer> =0>
553  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
554  OtherTransformers && ... other_transform_ops) const;
555 
556  template <typename Queue, typename Transformer, typename Predicate,
557  template <typename T, typename P> class Iteration,
558  typename ... OtherTransformers,
559  requires_iteration<Iteration<Transformer,Predicate>> =0,
560  requires_pipeline<Transformer> =0>
561  void do_pipeline(Queue & input_queue, Iteration<Transformer,Predicate> && iteration_obj,
562  OtherTransformers && ... other_transform_ops) const;
563 
564 
565  template <typename Queue, typename ... Transformers,
566  template <typename...> class Pipeline,
567  requires_pipeline<Pipeline<Transformers...>> = 0>
568  void do_pipeline(Queue & input_queue,
569  Pipeline<Transformers...> & pipeline_obj) const
570  {
571  do_pipeline(input_queue, std::move(pipeline_obj));
572  }
573 
574  template <typename Queue, typename ... Transformers,
575  template <typename...> class Pipeline,
576  requires_pipeline<Pipeline<Transformers...>> = 0>
577  void do_pipeline(Queue & input_queue,
578  Pipeline<Transformers...> && pipeline_obj) const;
579 
580  template <typename Queue, typename ... Transformers,
581  template <typename...> class Pipeline,
582  typename ... OtherTransformers,
583  requires_pipeline<Pipeline<Transformers...>> = 0>
584  void do_pipeline(Queue & input_queue,
585  Pipeline<Transformers...> & pipeline_obj,
586  OtherTransformers && ... other_transform_ops) const
587  {
588  do_pipeline(input_queue, std::move(pipeline_obj),
589  std::forward<OtherTransformers>(other_transform_ops)...);
590  }
591 
592  template <typename Queue, typename ... Transformers,
593  template <typename...> class Pipeline,
594  typename ... OtherTransformers,
595  requires_pipeline<Pipeline<Transformers...>> = 0>
596  void do_pipeline(Queue & input_queue,
597  Pipeline<Transformers...> && pipeline_obj,
598  OtherTransformers && ... other_transform_ops) const;
599 
600  template <typename Queue, typename ... Transformers,
601  std::size_t ... I>
602  void do_pipeline_nested(
603  Queue & input_queue,
604  std::tuple<Transformers...> && transform_ops,
605  std::index_sequence<I...>) const;
606 
607 private:
608  mutable thread_registry thread_registry_{};
609 
610  int concurrency_degree_;
611  bool ordering_;
612 
613  constexpr static int default_queue_size = 100;
614  int queue_size_ = default_queue_size;
615 
616  queue_mode queue_mode_ = queue_mode::blocking;
617 };
618 
623 template <typename E>
625  return std::is_same<E, parallel_execution_native>::value;
626 }
627 
632 template <>
633 constexpr bool is_supported<parallel_execution_native>() { return true; }
634 
639 template <>
640 constexpr bool supports_map<parallel_execution_native>() { return true; }
641 
646 template <>
647 constexpr bool supports_reduce<parallel_execution_native>() { return true; }
648 
653 template <>
654 constexpr bool supports_map_reduce<parallel_execution_native>() { return true; }
655 
660 template <>
661 constexpr bool supports_stencil<parallel_execution_native>() { return true; }
662 
667 template <>
668 constexpr bool supports_divide_conquer<parallel_execution_native>() { return true; }
669 
674 template <>
675 constexpr bool supports_pipeline<parallel_execution_native>() { return true; }
676 
677 template <typename ... InputIterators, typename OutputIterator,
678  typename Transformer>
680  std::tuple<InputIterators...> firsts,
681  OutputIterator first_out,
682  std::size_t sequence_size, Transformer transform_op) const
683 {
684  using namespace std;
685 
686  auto process_chunk =
687  [&transform_op](auto fins, std::size_t size, auto fout)
688  {
689  const auto l = next(get<0>(fins), size);
690  while (get<0>(fins)!=l) {
691  *fout++ = apply_deref_increment(
692  std::forward<Transformer>(transform_op), fins);
693  }
694  };
695 
696  const int chunk_size = sequence_size / concurrency_degree_;
697 
698  {
699  worker_pool workers{concurrency_degree_};
700  for (int i=0; i!=concurrency_degree_-1; ++i) {
701  const auto delta = chunk_size * i;
702  const auto chunk_firsts = iterators_next(firsts,delta);
703  const auto chunk_first_out = next(first_out, delta);
704  workers.launch(*this, process_chunk, chunk_firsts, chunk_size, chunk_first_out);
705  }
706 
707  const auto delta = chunk_size * (concurrency_degree_ - 1);
708  const auto chunk_firsts = iterators_next(firsts,delta);
709  const auto chunk_first_out = next(first_out, delta);
710  process_chunk(chunk_firsts, sequence_size - delta, chunk_first_out);
711  } // Pool synch
712 }
713 
714 template <typename InputIterator, typename Identity, typename Combiner>
716  InputIterator first, std::size_t sequence_size,
717  Identity && identity,
718  Combiner && combine_op) const
719 {
720  using result_type = std::decay_t<Identity>;
721  std::vector<result_type> partial_results(concurrency_degree_);
722 
723  constexpr sequential_execution seq;
724  auto process_chunk = [&](InputIterator f, std::size_t sz, std::size_t id) {
725  partial_results[id] = seq.reduce(f,sz, std::forward<Identity>(identity),
726  std::forward<Combiner>(combine_op));
727  };
728 
729  const auto chunk_size = sequence_size / concurrency_degree_;
730 
731  {
732  worker_pool workers{concurrency_degree_};
733  for (int i=0; i<concurrency_degree_-1; ++i) {
734  const auto delta = chunk_size * i;
735  const auto chunk_first = std::next(first,delta);
736  workers.launch(*this, process_chunk, chunk_first, chunk_size, i);
737  }
738 
739  const auto delta = chunk_size * (concurrency_degree_-1);
740  const auto chunk_first = std::next(first, delta);
741  const auto chunk_sz = sequence_size - delta;
742  process_chunk(chunk_first, chunk_sz, concurrency_degree_-1);
743  } // Pool synch
744 
745  return seq.reduce(std::next(partial_results.begin()),
746  partial_results.size()-1, std::forward<result_type>(partial_results[0]),
747  std::forward<Combiner>(combine_op));
748 }
749 
750 template <typename ... InputIterators, typename Identity,
751  typename Transformer, typename Combiner>
753  std::tuple<InputIterators...> firsts,
754  std::size_t sequence_size,
755  Identity &&,
756  Transformer && transform_op, Combiner && combine_op) const
757 {
758  using result_type = std::decay_t<Identity>;
759  std::vector<result_type> partial_results(concurrency_degree_);
760 
761  constexpr sequential_execution seq;
762  auto process_chunk = [&](auto f, std::size_t sz, std::size_t id) {
763  partial_results[id] = seq.map_reduce(f, sz,
764  std::forward<Identity>(partial_results[id]),
765  std::forward<Transformer>(transform_op),
766  std::forward<Combiner>(combine_op));
767  };
768 
769  const auto chunk_size = sequence_size / concurrency_degree_;
770 
771  {
772  worker_pool workers{concurrency_degree_};
773  for(int i=0;i<concurrency_degree_-1;++i){
774  const auto delta = chunk_size * i;
775  const auto chunk_firsts = iterators_next(firsts,delta);
776  workers.launch(*this, process_chunk, chunk_firsts, chunk_size, i);
777  }
778 
779  const auto delta = chunk_size * (concurrency_degree_-1);
780  const auto chunk_firsts = iterators_next(firsts, delta);
781  process_chunk(chunk_firsts, sequence_size - delta, concurrency_degree_-1);
782  } // Pool synch
783 
784  return seq.reduce(std::next(partial_results.begin()),
785  partial_results.size()-1, std::forward<result_type>(partial_results[0]),
786  std::forward<Combiner>(combine_op));
787 }
788 
789 template <typename ... InputIterators, typename OutputIterator,
790  typename StencilTransformer, typename Neighbourhood>
792  std::tuple<InputIterators...> firsts, OutputIterator first_out,
793  std::size_t sequence_size,
794  StencilTransformer && transform_op,
795  Neighbourhood && neighbour_op) const
796 {
797  constexpr sequential_execution seq;
798  auto process_chunk =
799  [&transform_op, &neighbour_op,seq](auto fins, std::size_t sz, auto fout)
800  {
801  seq.stencil(fins, fout, sz,
802  std::forward<StencilTransformer>(transform_op),
803  std::forward<Neighbourhood>(neighbour_op));
804  };
805 
806  const auto chunk_size = sequence_size / concurrency_degree_;
807  {
808  worker_pool workers{concurrency_degree_};
809 
810  for (int i=0; i!=concurrency_degree_-1; ++i) {
811  const auto delta = chunk_size * i;
812  const auto chunk_firsts = iterators_next(firsts,delta);
813  const auto chunk_out = std::next(first_out,delta);
814  workers.launch(*this, process_chunk, chunk_firsts, chunk_size, chunk_out);
815  }
816 
817  const auto delta = chunk_size * (concurrency_degree_ - 1);
818  const auto chunk_firsts = iterators_next(firsts,delta);
819  const auto chunk_out = std::next(first_out,delta);
820  process_chunk(chunk_firsts, sequence_size - delta, chunk_out);
821  } // Pool synch
822 }
823 
824 template <typename Input, typename Divider, typename Solver, typename Combiner>
826  Input && problem,
827  Divider && divide_op,
828  Solver && solve_op,
829  Combiner && combine_op) const
830 {
831  std::atomic<int> num_threads{concurrency_degree_-1};
832 
833  return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
834  std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
835  num_threads);
836 }
837 
838 
839 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
841  Input && problem,
842  Divider && divide_op,
843  Predicate && predicate_op,
844  Solver && solve_op,
845  Combiner && combine_op) const
846 {
847  std::atomic<int> num_threads{concurrency_degree_-1};
848 
849  return divide_conquer(std::forward<Input>(problem), std::forward<Divider>(divide_op),
850  std::forward<Predicate>(predicate_op),
851  std::forward<Solver>(solve_op), std::forward<Combiner>(combine_op),
852  num_threads);
853 }
854 
855 template <typename Generator, typename ... Transformers>
857  Generator && generate_op,
858  Transformers && ... transform_ops) const
859 {
860  using namespace std;
861  using result_type = decay_t<typename result_of<Generator()>::type>;
862  using output_type = pair<result_type,long>;
863  auto output_queue = make_queue<output_type>();
864 
865  thread generator_task([&,this]() {
866  auto manager = thread_manager();
867 
868  long order = 0;
869  for (;;) {
870  auto item{generate_op()};
871  output_queue.push(make_pair(item, order));
872  order++;
873  if (!item) break;
874  }
875  });
876 
877  do_pipeline(output_queue, forward<Transformers>(transform_ops)...);
878  generator_task.join();
879 }
880 
881 // PRIVATE MEMBERS
882 
883 template <typename Input, typename Divider, typename Solver, typename Combiner>
885  Input && input,
886  Divider && divide_op,
887  Solver && solve_op,
888  Combiner && combine_op,
889  std::atomic<int> & num_threads) const
890 {
891  constexpr sequential_execution seq;
892  if (num_threads.load() <=0) {
893  return seq.divide_conquer(std::forward<Input>(input),
894  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
895  std::forward<Combiner>(combine_op));
896  }
897 
898  auto subproblems = divide_op(std::forward<Input>(input));
899  if (subproblems.size()<=1) { return solve_op(std::forward<Input>(input)); }
900 
901  using subresult_type =
902  std::decay_t<typename std::result_of<Solver(Input)>::type>;
903  std::vector<subresult_type> partials(subproblems.size()-1);
904 
905  auto process_subproblem = [&,this](auto it, std::size_t div) {
906  partials[div] = this->divide_conquer(std::forward<Input>(*it),
907  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
908  std::forward<Combiner>(combine_op), num_threads);
909  };
910 
911  int division = 0;
912 
913  worker_pool workers{num_threads.load()};
914  auto i = subproblems.begin() + 1;
915  while (i!=subproblems.end() && num_threads.load()>0) {
916  workers.launch(*this,process_subproblem, i++, division++);
917  num_threads--;
918  }
919 
920  while (i!=subproblems.end()) {
921  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
922  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
923  std::forward<Combiner>(combine_op));
924  }
925 
926  auto subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
927  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
928  std::forward<Combiner>(combine_op), num_threads);
929 
930  workers.wait();
931 
932  return seq.reduce(partials.begin(), partials.size(),
933  std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
934 }
935 
936 template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
938  Input && input,
939  Divider && divide_op,
940  Predicate && predicate_op,
941  Solver && solve_op,
942  Combiner && combine_op,
943  std::atomic<int> & num_threads) const
944 {
945  constexpr sequential_execution seq;
946  if (num_threads.load() <=0) {
947  return seq.divide_conquer(std::forward<Input>(input),
948  std::forward<Divider>(divide_op),
949  std::forward<Predicate>(predicate_op),
950  std::forward<Solver>(solve_op),
951  std::forward<Combiner>(combine_op));
952  }
953 
954  if (predicate_op(input)) { return solve_op(std::forward<Input>(input)); }
955  auto subproblems = divide_op(std::forward<Input>(input));
956 
957  using subresult_type =
958  std::decay_t<typename std::result_of<Solver(Input)>::type>;
959  std::vector<subresult_type> partials(subproblems.size()-1);
960 
961  auto process_subproblem = [&,this](auto it, std::size_t div) {
962  partials[div] = this->divide_conquer(std::forward<Input>(*it),
963  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
964  std::forward<Solver>(solve_op),
965  std::forward<Combiner>(combine_op), num_threads);
966  };
967 
968  int division = 0;
969 
970  worker_pool workers{num_threads.load()};
971  auto i = subproblems.begin() + 1;
972  while (i!=subproblems.end() && num_threads.load()>0) {
973  workers.launch(*this,process_subproblem, i++, division++);
974  num_threads--;
975  }
976 
977  while (i!=subproblems.end()) {
978  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
979  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
980  std::forward<Combiner>(combine_op));
981  }
982 
983  auto subresult = divide_conquer(std::forward<Input>(*subproblems.begin()),
984  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
985  std::forward<Combiner>(combine_op), num_threads);
986 
987  workers.wait();
988 
989  return seq.reduce(partials.begin(), partials.size(),
990  std::forward<subresult_type>(subresult), std::forward<Combiner>(combine_op));
991 }
992 template <typename Queue, typename Consumer,
993  requires_no_pattern<Consumer>>
994 void parallel_execution_native::do_pipeline(
995  Queue & input_queue,
996  Consumer && consume_op) const
997 {
998  using namespace std;
999  using gen_value_type = typename Queue::value_type;
1000 
1001  auto manager = thread_manager();
1002  if (!is_ordered()) {
1003  for (;;) {
1004  auto item = input_queue.pop();
1005  if (!item.first) break;
1006  consume_op(*item.first);
1007  }
1008  return;
1009  }
1010  vector<gen_value_type> elements;
1011  long current = 0;
1012  for (;;) {
1013  auto item = input_queue.pop();
1014  if (!item.first) break;
1015  if(current == item.second){
1016  consume_op(*item.first);
1017  current ++;
1018  }
1019  else {
1020  elements.push_back(item);
1021  }
1022  auto it = find_if(elements.begin(), elements.end(),
1023  [&](auto x) { return x.second== current; });
1024  if(it != elements.end()){
1025  consume_op(*it->first);
1026  elements.erase(it);
1027  current++;
1028  }
1029  }
1030  while (elements.size()>0) {
1031  auto it = find_if(elements.begin(), elements.end(),
1032  [&](auto x) { return x.second== current; });
1033  if(it != elements.end()){
1034  consume_op(*it->first);
1035  elements.erase(it);
1036  current++;
1037  }
1038  }
1039 }
1040 
1041 
1042 template <typename Inqueue, typename Transformer, typename output_type,
1043  requires_no_pattern<Transformer>>
1044 void parallel_execution_native::do_pipeline(Inqueue & input_queue, Transformer && transform_op,
1045  mpmc_queue<output_type> & output_queue) const
1046 {
1047  using namespace std;
1048  using namespace experimental;
1049 
1050  using output_item_value_type = typename output_type::first_type::value_type;
1051  for (;;) {
1052  auto item{input_queue.pop()};
1053  if(!item.first) break;
1054  auto out = output_item_value_type{transform_op(*item.first)};
1055  output_queue.push(make_pair(out,item.second)) ;
1056  }
1057 }
1058 
1059 
1060 
1061 template <typename Queue, typename Transformer,
1062  typename ... OtherTransformers,
1063  requires_no_pattern<Transformer>>
1064 void parallel_execution_native::do_pipeline(
1065  Queue & input_queue,
1066  Transformer && transform_op,
1067  OtherTransformers && ... other_transform_ops) const
1068 {
1069  using namespace std;
1070  using namespace experimental;
1071 
1072  using input_item_type = typename Queue::value_type;
1073  using input_item_value_type = typename input_item_type::first_type::value_type;
1074  using transform_result_type =
1075  decay_t<typename result_of<Transformer(input_item_value_type)>::type>;
1076  using output_item_value_type = optional<transform_result_type>;
1077  using output_item_type = pair<output_item_value_type,long>;
1078 
1079  decltype(auto) output_queue =
1080  get_output_queue<output_item_type>(other_transform_ops...);
1081 
1082  thread task([&,this]() {
1083  auto manager = thread_manager();
1084 
1085  for (;;) {
1086  auto item{input_queue.pop()};
1087  if (!item.first) break;
1088  auto out = output_item_value_type{transform_op(*item.first)};
1089  output_queue.push(make_pair(out, item.second));
1090  }
1091  output_queue.push(make_pair(output_item_value_type{},-1));
1092  });
1093 
1094  do_pipeline(output_queue,
1095  forward<OtherTransformers>(other_transform_ops)...);
1096  task.join();
1097 }
1098 
1099 template <typename Queue, typename FarmTransformer,
1100  template <typename> class Farm,
1101  requires_farm<Farm<FarmTransformer>>>
1102 void parallel_execution_native::do_pipeline(
1103  Queue & input_queue,
1104  Farm<FarmTransformer> && farm_obj) const
1105 {
1106  using namespace std;
1107 
1108  auto farm_task = [&](int) {
1109  auto item{input_queue.pop()};
1110  while (item.first) {
1111  farm_obj(*item.first);
1112  item = input_queue.pop();
1113  }
1114  input_queue.push(item);
1115  };
1116 
1117  auto ntasks = farm_obj.cardinality();
1118  worker_pool workers{ntasks};
1119  workers.launch_tasks(*this, farm_task, ntasks);
1120  workers.wait();
1121 }
1122 
1123 
1124 template <typename Queue, typename Execution, typename Transformer,
1125  template <typename, typename> class Context,
1126  typename ... OtherTransformers,
1127  requires_context<Context<Execution,Transformer>>>
1128 void parallel_execution_native::do_pipeline(Queue & input_queue,
1129  Context<Execution,Transformer> && context_op,
1130  OtherTransformers &&... other_ops) const
1131 {
1132  using namespace std;
1133  using namespace experimental;
1134 
1135  using input_item_type = typename Queue::value_type;
1136  using input_item_value_type = typename input_item_type::first_type::value_type;
1137 
1139  using output_optional_type = experimental::optional<output_type>;
1140  using output_item_type = pair <output_optional_type, long> ;
1141 
1142  decltype(auto) output_queue =
1143  get_output_queue<output_item_type>(other_ops...);
1144 
1145  auto context_task = [&]() {
1146  context_op.execution_policy().pipeline(input_queue, context_op.transformer(), output_queue);
1147  output_queue.push( make_pair(output_optional_type{}, -1) );
1148  };
1149 
1150  worker_pool workers{1};
1151  workers.launch_tasks(*this, context_task);
1152 
1153  do_pipeline(output_queue,
1154  forward<OtherTransformers>(other_ops)... );
1155 
1156  workers.wait();
1157 }
1158 
1159 
1160 template <typename Queue, typename FarmTransformer,
1161  template <typename> class Farm,
1162  typename ... OtherTransformers,
1163  requires_farm<Farm<FarmTransformer>>>
1164 void parallel_execution_native::do_pipeline(
1165  Queue & input_queue,
1166  Farm<FarmTransformer> && farm_obj,
1167  OtherTransformers && ... other_transform_ops) const
1168 {
1169  using namespace std;
1170  using namespace experimental;
1171 
1172  using input_item_type = typename Queue::value_type;
1173  using input_item_value_type = typename input_item_type::first_type::value_type;
1174 
1176  using output_optional_type = experimental::optional<output_type>;
1177  using output_item_type = pair <output_optional_type, long> ;
1178 
1179  decltype(auto) output_queue =
1180  get_output_queue<output_item_type>(other_transform_ops...);
1181 
1182  atomic<int> done_threads{0};
1183 
1184  auto farm_task = [&](int nt) {
1185  do_pipeline(input_queue, farm_obj.transformer(), output_queue);
1186  done_threads++;
1187  if (done_threads == nt) {
1188  output_queue.push(make_pair(output_optional_type{}, -1));
1189  }else{
1190  input_queue.push(input_item_type{});
1191  }
1192  };
1193 
1194  auto ntasks = farm_obj.cardinality();
1195  worker_pool workers{ntasks};
1196  workers.launch_tasks(*this, farm_task, ntasks);
1197  do_pipeline(output_queue,
1198  forward<OtherTransformers>(other_transform_ops)... );
1199 
1200  workers.wait();
1201 }
1202 
1203 template <typename Queue, typename Predicate,
1204  template <typename> class Filter,
1205  typename ... OtherTransformers,
1206  requires_filter<Filter<Predicate>>>
1207 void parallel_execution_native::do_pipeline(
1208  Queue & input_queue,
1209  Filter<Predicate> && filter_obj,
1210  OtherTransformers && ... other_transform_ops) const
1211 {
1212  using namespace std;
1213  using namespace experimental;
1214 
1215  using input_item_type = typename Queue::value_type;
1216  using input_value_type = typename input_item_type::first_type;
1217  auto filter_queue = make_queue<input_item_type>();
1218 
1219  auto filter_task = [&,this]() {
1220  auto manager = thread_manager();
1221  auto item{input_queue.pop()};
1222  while (item.first) {
1223  if (filter_obj(*item.first)) {
1224  filter_queue.push(item);
1225  }
1226  else {
1227  filter_queue.push(make_pair(input_value_type{}, item.second));
1228  }
1229  item = input_queue.pop();
1230  }
1231  filter_queue.push(make_pair(input_value_type{}, -1));
1232  };
1233  thread filter_thread{filter_task};
1234 
1235  decltype(auto) output_queue =
1236  get_output_queue<input_item_type>(other_transform_ops...);
1237 
1238  thread ordering_thread;
1239  if (is_ordered()) {
1240  auto ordering_task = [&]() {
1241  auto manager = thread_manager();
1242  vector<input_item_type> elements;
1243  int current = 0;
1244  long order = 0;
1245  auto item{filter_queue.pop()};
1246  for (;;) {
1247  if(!item.first && item.second == -1) break;
1248  if (item.second == current) {
1249  if (item.first) {
1250  output_queue.push(make_pair(item.first,order));
1251  order++;
1252  }
1253  current++;
1254  }
1255  else {
1256  elements.push_back(item);
1257  }
1258  auto it = find_if(elements.begin(), elements.end(),
1259  [&](auto x) { return x.second== current; });
1260  if(it != elements.end()){
1261  if (it->first) {
1262  output_queue.push(make_pair(it->first,order));
1263  order++;
1264  }
1265  elements.erase(it);
1266  current++;
1267  }
1268  item = filter_queue.pop();
1269  }
1270  while (elements.size()>0) {
1271  auto it = find_if(elements.begin(), elements.end(),
1272  [&](auto x) { return x.second== current; });
1273  if(it != elements.end()){
1274  if (it->first) {
1275  output_queue.push(make_pair(it->first,order));
1276  order++;
1277  }
1278  elements.erase(it);
1279  current++;
1280  }
1281  }
1282  output_queue.push(item);
1283  };
1284 
1285  ordering_thread = thread{ordering_task};
1286  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1287  filter_thread.join();
1288  ordering_thread.join();
1289  }
1290  else {
1291  do_pipeline(filter_queue, forward<OtherTransformers>(other_transform_ops)...);
1292  filter_thread.join();
1293  }
1294 }
1295 
1296 template <typename Queue, typename Combiner, typename Identity,
1297  template <typename C, typename I> class Reduce,
1298  typename ... OtherTransformers,
1299  requires_reduce<Reduce<Combiner,Identity>>>
1300 void parallel_execution_native::do_pipeline(
1301  Queue && input_queue,
1302  Reduce<Combiner,Identity> && reduce_obj,
1303  OtherTransformers && ... other_transform_ops) const
1304 {
1305  using namespace std;
1306  using namespace experimental;
1307 
1308  using output_item_value_type = optional<decay_t<Identity>>;
1309  using output_item_type = pair<output_item_value_type,long>;
1310  decltype(auto) output_queue =
1311  get_output_queue<output_item_type>(other_transform_ops...);
1312 
1313  auto reduce_task = [&,this]() {
1314  auto manager = thread_manager();
1315  auto item{input_queue.pop()};
1316  int order = 0;
1317  while (item.first) {
1318  reduce_obj.add_item(std::forward<Identity>(*item.first));
1319  item = input_queue.pop();
1320  if (reduce_obj.reduction_needed()) {
1321  constexpr sequential_execution seq;
1322  auto red = reduce_obj.reduce_window(seq);
1323  output_queue.push(make_pair(red, order++));
1324  }
1325  }
1326  output_queue.push(make_pair(output_item_value_type{}, -1));
1327  };
1328  thread reduce_thread{reduce_task};
1329  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1330  reduce_thread.join();
1331 }
1332 
1333 template <typename Queue, typename Transformer, typename Predicate,
1334  template <typename T, typename P> class Iteration,
1335  typename ... OtherTransformers,
1336  requires_iteration<Iteration<Transformer,Predicate>>,
1337  requires_no_pattern<Transformer>>
1338 void parallel_execution_native::do_pipeline(
1339  Queue & input_queue,
1340  Iteration<Transformer,Predicate> && iteration_obj,
1341  OtherTransformers && ... other_transform_ops) const
1342 {
1343  using namespace std;
1344  using namespace experimental;
1345 
1346  using input_item_type = typename decay_t<Queue>::value_type;
1347 
1348  decltype(auto) output_queue =
1349  get_output_queue<input_item_type>(other_transform_ops...);
1350 
1351  auto iteration_task = [&]() {
1352  for (;;) {
1353  auto item = input_queue.pop();
1354  if (!item.first) break;
1355  auto value = iteration_obj.transform(*item.first);
1356  auto new_item = input_item_type{value,item.second};
1357  if (iteration_obj.predicate(value)) {
1358  output_queue.push(new_item);
1359  }
1360  else {
1361  input_queue.push(new_item);
1362  }
1363  }
1364  while (!input_queue.is_empty()) {
1365  auto item = input_queue.pop();
1366  auto value = iteration_obj.transform(*item.first);
1367  auto new_item = input_item_type{value,item.second};
1368  if (iteration_obj.predicate(value)) {
1369  output_queue.push(new_item);
1370  }
1371  else {
1372  input_queue.push(new_item);
1373  }
1374  }
1375  output_queue.push(input_item_type{{},-1});
1376  };
1377 
1378  thread iteration_thread{iteration_task};
1379  do_pipeline(output_queue, forward<OtherTransformers>(other_transform_ops)...);
1380  iteration_thread.join();
1381 }
1382 
1383 template <typename Queue, typename Transformer, typename Predicate,
1384  template <typename T, typename P> class Iteration,
1385  typename ... OtherTransformers,
1386  requires_iteration<Iteration<Transformer,Predicate>>,
1387  requires_pipeline<Transformer>>
1388 void parallel_execution_native::do_pipeline(
1389  Queue &,
1390  Iteration<Transformer,Predicate> &&,
1391  OtherTransformers && ...) const
1392 {
1393  static_assert(!is_pipeline<Transformer>, "Not implemented");
1394 }
1395 
1396 
1397 template <typename Queue, typename ... Transformers,
1398  template <typename...> class Pipeline,
1399  requires_pipeline<Pipeline<Transformers...>>>
1400 void parallel_execution_native::do_pipeline(
1401  Queue & input_queue,
1402  Pipeline<Transformers...> && pipeline_obj) const
1403 {
1404  do_pipeline_nested(
1405  input_queue,
1406  pipeline_obj.transformers(),
1407  std::make_index_sequence<sizeof...(Transformers)>());
1408 }
1409 
1410 template <typename Queue, typename ... Transformers,
1411  template <typename...> class Pipeline,
1412  typename ... OtherTransformers,
1413  requires_pipeline<Pipeline<Transformers...>>>
1414 void parallel_execution_native::do_pipeline(
1415  Queue & input_queue,
1416  Pipeline<Transformers...> && pipeline_obj,
1417  OtherTransformers && ... other_transform_ops) const
1418 {
1419  do_pipeline_nested(
1420  input_queue,
1421  std::tuple_cat(pipeline_obj.transformers(),
1422  std::forward_as_tuple(other_transform_ops...)),
1423  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
1424 }
1425 
1426 template <typename Queue, typename ... Transformers,
1427  std::size_t ... I>
1428 void parallel_execution_native::do_pipeline_nested(
1429  Queue & input_queue,
1430  std::tuple<Transformers...> && transform_ops,
1431  std::index_sequence<I...>) const
1432 {
1433  do_pipeline(input_queue,
1434  std::forward<Transformers>(std::get<I>(transform_ops))...);
1435 }
1436 
1437 template<typename T, typename... Others>
1438 void parallel_execution_native::do_pipeline(mpmc_queue<T> &, mpmc_queue<T> &, Others &&...) const
1439 {
1440 }
1441 
1442 } // end namespace grppi
1443 
1444 #endif
Definition: mpmc_queue.h:33
RAII class to manage registration/deregistration pairs. This class allows to manage automatic deregis...
Definition: parallel_execution_native.h:110
~native_thread_manager()
Deregisters current thread from the registry.
Definition: parallel_execution_native.h:122
native_thread_manager(thread_registry &registry)
Saves a reference to the registry and registers current thread.
Definition: parallel_execution_native.h:115
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:135
void pipeline(Generator &&generate_op, Transformers &&... transform_ops) const
Invoke Pipeline pattern.
Definition: parallel_execution_native.h:856
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:227
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_native.h:174
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_native.h:752
parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept
Constructs a native parallel execution policy.
Definition: parallel_execution_native.h:162
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>()
Definition: parallel_execution_native.h:215
parallel_execution_native(const parallel_execution_native &ex)
Definition: parallel_execution_native.h:167
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_native.h:189
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: parallel_execution_native.h:825
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_native.h:194
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a transformation to multiple sequences leaving the result in another sequence by chunks accor...
Definition: parallel_execution_native.h:679
native_thread_manager thread_manager() const
Get a manager object for registration/deregistration in the thread index table for current thread.
Definition: parallel_execution_native.h:200
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_native.h:208
mpmc_queue< T > get_output_queue(Transformers &&...) const
Makes a communication queue for elements of type T if the queue has not been created in an outer patt...
Definition: parallel_execution_native.h:252
int concurrency_degree() const noexcept
Get number of grppi threads.
Definition: parallel_execution_native.h:179
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern coming from another context that uses mpmc_queues as communication channels.
Definition: parallel_execution_native.h:400
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_native.h:715
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_native.h:184
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_native.h:791
parallel_execution_native() noexcept
Default construct a native parallel execution policy.
Definition: parallel_execution_native.h:148
mpmc_queue< T > & get_output_queue(mpmc_queue< T > &queue, Transformers &&...) const
Returns the reference of a communication queue for elements of type T if the queue has been created i...
Definition: parallel_execution_native.h:240
Sequential execution policy.
Definition: sequential_execution.h:36
constexpr auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: sequential_execution.h:464
constexpr void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:497
constexpr auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: sequential_execution.h:480
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: sequential_execution.h:538
Thread index table to provide portable natural thread indices.
Definition: parallel_execution_native.h:48
void register_thread() noexcept
Adds the current thread id in the registry.
Definition: parallel_execution_native.h:74
int current_index() const noexcept
Integer index for current thread.
Definition: parallel_execution_native.h:93
void deregister_thread() noexcept
Removes current thread id from the registry.
Definition: parallel_execution_native.h:83
Pool of worker threads. This class offers a simple pool of worker threads.
Definition: worker_pool.h:29
Definition: callable_traits.h:21
constexpr bool supports_reduce< parallel_execution_native >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_native.h:647
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
constexpr bool supports_map_reduce< parallel_execution_native >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_native.h:654
constexpr bool is_parallel_execution_native()
Metafunction that determines if type E is parallel_execution_native.
Definition: parallel_execution_native.h:624
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of applying a function on a input type.
Definition: patterns.h:105
constexpr bool supports_stencil< parallel_execution_native >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_native.h:661
constexpr bool supports_pipeline< parallel_execution_native >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_native.h:675
constexpr bool supports_divide_conquer< parallel_execution_native >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_native.h:668
queue_mode
Definition: mpmc_queue.h:30
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:170
constexpr bool is_supported< parallel_execution_native >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_native.h:633
constexpr bool supports_map< parallel_execution_native >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_native.h:640
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:87
decltype(auto) apply_deref_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the values obtained from the iterators in a tuple-like object....
Definition: iterator.h:58
return_type type
Definition: patterns.h:98