GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
sequential_execution.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_SEQ_SEQUENTIAL_EXECUTION_H
22 #define GRPPI_SEQ_SEQUENTIAL_EXECUTION_H
23 
24 #include "../common/mpmc_queue.h"
25 #include "../common/iterator.h"
26 #include "../common/callable_traits.h"
27 #include "../common/execution_traits.h"
28 #include "../common/patterns.h"
29 #include "../common/pack_traits.h"
30 
31 #include <type_traits>
32 #include <tuple>
33 #include <iterator>
34 //#include <experimental/optional>
35 
36 namespace grppi {
37 
42 
43 public:
44 
46  constexpr sequential_execution() noexcept = default;
47 
52  constexpr void set_concurrency_degree(int n) const noexcept {}
53 
58  constexpr int concurrency_degree() const noexcept { return 1; }
59 
64  constexpr void enable_ordering() const noexcept {}
65 
70  constexpr void disable_ordering() const noexcept {}
71 
76  constexpr bool is_ordered() const noexcept { return true; }
77 
92  template <typename ... InputIterators, typename OutputIterator,
93  typename Transformer>
94  constexpr void map(std::tuple<InputIterators...> firsts,
95  OutputIterator first_out, std::size_t sequence_size,
96  Transformer && transform_op) const;
97 
111  template <typename InputIterator, typename Identity, typename Combiner>
112  constexpr auto reduce(InputIterator first, std::size_t sequence_size,
113  Identity && identity,
114  Combiner && combine_op) const;
115 
130  template <typename ... InputIterators, typename Identity,
131  typename Transformer, typename Combiner>
132  constexpr auto map_reduce(std::tuple<InputIterators...> firsts,
133  std::size_t sequence_size,
134  Identity && identity,
135  Transformer && transform_op, Combiner && combine_op) const;
136 
153  template <typename ... InputIterators, typename OutputIterator,
154  typename StencilTransformer, typename Neighbourhood>
155  constexpr void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
156  std::size_t sequence_size,
157  StencilTransformer && transform_op,
158  Neighbourhood && neighbour_op) const;
159 
172  template <typename Input, typename Divider, typename Solver, typename Combiner>
173  [[deprecated("Use new interface with predicate argument")]]
174  auto divide_conquer(Input && input,
175  Divider && divide_op,
176  Solver && solve_op,
177  Combiner && combine_op) const;
178 
193  template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
194  auto divide_conquer(Input && input,
195  Divider && divide_op,
196  Predicate && predicate_op,
197  Solver && solve_op,
198  Combiner && combine_op) const;
199 
200 
208  template <typename Generator, typename ... Transformers>
209  void pipeline(Generator && generate_op,
210  Transformers && ... transform_op) const;
211 
222  template <typename InputType, typename Transformer, typename OutputType>
223  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
224  mpmc_queue<OutputType> & output_queue) const
225  {
226  using namespace std;
227  using optional_output_type = typename OutputType::first_type;
228  for(;;){
229  auto item = input_queue.pop();
230  if(!item.first) break;
231  do_pipeline(*item.first, std::forward<Transformer>(transform_op),
232  [&](auto output_item) {
233  output_queue.push( make_pair(optional_output_type{output_item}, item.second) );
234  }
235  );
236  }
237  output_queue.push( make_pair(optional_output_type{}, -1) );
238  }
239 
240 
241 private:
242 
243  template <typename Item, typename Consumer,
245  void do_pipeline(Item && item, Consumer && consume_op) const;
246 
247  template <typename Item, typename Transformer, typename ... OtherTransformers,
249  void do_pipeline(Item && item, Transformer && transform_op,
250  OtherTransformers && ... other_ops) const;
251 
252  template <typename Item, typename FarmTransformer,
253  template <typename> class Farm,
255  void do_pipeline(Item && item, Farm<FarmTransformer> & farm_obj) const
256  {
257  do_pipeline(std::forward<Item>(item), std::move(farm_obj));
258  }
259 
260  template <typename Item, typename FarmTransformer,
261  template <typename> class Farm,
262  requires_farm<Farm<FarmTransformer>> = 0>
263  void do_pipeline(Item && item, Farm<FarmTransformer> && farm_obj) const;
264 
265  template <typename Item, typename Execution, typename Transformer,
266  template <typename, typename> class Context,
267  typename ... OtherTransformers,
269  void do_pipeline(Item && item, Context<Execution,Transformer> && context_op,
270  OtherTransformers &&... other_ops) const
271  {
272  do_pipeline(item, std::forward<Transformer>(context_op.transformer()),
273  std::forward<OtherTransformers>(other_ops)...);
274  }
275 
276  template <typename Item, typename Execution, typename Transformer,
277  template <typename, typename> class Context,
278  typename ... OtherTransformers,
279  requires_context<Context<Execution,Transformer>> = 0>
280  void do_pipeline(Item && item, Context<Execution,Transformer> & context_op,
281  OtherTransformers &&... other_ops) const
282  {
283  do_pipeline(item, std::move(context_op),
284  std::forward<OtherTransformers>(other_ops)...);
285  }
286 
287 
288  template <typename Item, typename FarmTransformer,
289  template <typename> class Farm,
290  typename... OtherTransformers,
291  requires_farm<Farm<FarmTransformer>> = 0>
292  void do_pipeline(Item && item, Farm<FarmTransformer> & farm_obj,
293  OtherTransformers && ... other_transform_ops) const
294  {
295  do_pipeline(std::forward<Item>(item), std::move(farm_obj),
296  std::forward<OtherTransformers>(other_transform_ops)...);
297  }
298 
299  template <typename Item, typename FarmTransformer,
300  template <typename> class Farm,
301  typename... OtherTransformers,
302  requires_farm<Farm<FarmTransformer>> = 0>
303  void do_pipeline(Item && item, Farm<FarmTransformer> && farm_obj,
304  OtherTransformers && ... other_transform_ops) const;
305 
306  template <typename Item, typename Predicate,
307  template <typename> class Filter,
308  typename ... OtherTransformers,
310  void do_pipeline(Item && item, Filter<Predicate> & filter_obj,
311  OtherTransformers && ... other_transform_ops) const
312  {
313  do_pipeline(std::forward<Item>(item), std::move(filter_obj),
314  std::forward<OtherTransformers>(other_transform_ops)...);
315  }
316 
317  template <typename Item, typename Predicate,
318  template <typename> class Filter,
319  typename ... OtherTransformers,
320  requires_filter<Filter<Predicate>> = 0>
321  void do_pipeline(Item && item, Filter<Predicate> && filter_obj,
322  OtherTransformers && ... other_transform_ops) const;
323 
324  template <typename Item, typename Combiner, typename Identity,
325  template <typename C, typename I> class Reduce,
326  typename ... OtherTransformers,
328  void do_pipeline(Item && item, Reduce<Combiner,Identity> & reduce_obj,
329  OtherTransformers && ... other_transform_ops) const
330  {
331  do_pipeline(std::forward<Item>(item), std::move(reduce_obj),
332  std::forward<OtherTransformers>(other_transform_ops)...);
333  };
334 
335 
336  template <typename Item, typename Combiner, typename Identity,
337  template <typename C, typename I> class Reduce,
338  typename ... OtherTransformers,
339  requires_reduce<Reduce<Combiner,Identity>> = 0>
340  void do_pipeline(Item && item, Reduce<Combiner,Identity> && reduce_obj,
341  OtherTransformers && ... other_transform_ops) const;
342 
343  template <typename Item, typename Transformer, typename Predicate,
344  template <typename T, typename P> class Iteration,
345  typename ... OtherTransformers,
347  void do_pipeline(Item && item,
348  Iteration<Transformer,Predicate> & iteration_obj,
349  OtherTransformers && ... other_transform_ops) const
350  {
351  do_pipeline(std::forward<Item>(item), std::move(iteration_obj),
352  std::forward<OtherTransformers>(other_transform_ops)...);
353  }
354 
355  template <typename Item, typename Transformer, typename Predicate,
356  template <typename T, typename P> class Iteration,
357  typename ...OtherTransformers,
358  requires_iteration<Iteration<Transformer,Predicate>> = 0,
359  requires_no_pattern<Transformer> = 0>
360  void do_pipeline(Item && item,
361  Iteration<Transformer,Predicate> && iteration_obj,
362  OtherTransformers && ... other_transform_ops) const;
363 
364  template <typename Item, typename Transformer, typename Predicate,
365  template <typename T, typename P> class Iteration,
366  typename ...OtherTransformers,
367  requires_iteration<Iteration<Transformer,Predicate>> = 0,
369  void do_pipeline(Item && item,
370  Iteration<Transformer,Predicate> && iteration_obj,
371  OtherTransformers && ... other_transform_ops) const;
372 
373  template <typename Item, typename ... Transformers,
374  template <typename...> class Pipeline,
375  typename ... OtherTransformers,
376  requires_pipeline<Pipeline<Transformers...>> = 0>
377  void do_pipeline(Item && item, Pipeline<Transformers...> & pipeline_obj,
378  OtherTransformers && ... other_transform_ops) const
379  {
380  do_pipeline(std::forward<Item>(item), std::move(pipeline_obj),
381  std::forward<OtherTransformers>(other_transform_ops)...);
382  }
383 
384  template <typename Item, typename ... Transformers,
385  template <typename...> class Pipeline,
386  typename ... OtherTransformers,
387  requires_pipeline<Pipeline<Transformers...>> = 0>
388  void do_pipeline(Item && item, Pipeline<Transformers...> && pipeline_obj,
389  OtherTransformers && ... other_transform_ops) const;
390 
391  template <typename Item, typename ... Transformers, std::size_t ... I>
392  void do_pipeline_nested(Item && item,
393  std::tuple<Transformers...> && transform_ops,
394  std::index_sequence<I...>) const;
395 
396 };
397 
399 template <typename E>
400 constexpr bool is_sequential_execution() {
401  return std::is_same<E, sequential_execution>::value;
402 }
403 
408 template <>
409 constexpr bool is_supported<sequential_execution>() { return true; }
410 
415 template <>
416 constexpr bool supports_map<sequential_execution>() { return true; }
417 
422 template <>
423 constexpr bool supports_reduce<sequential_execution>() { return true; }
424 
429 template <>
430 constexpr bool supports_map_reduce<sequential_execution>() { return true; }
431 
436 template <>
437 constexpr bool supports_stencil<sequential_execution>() { return true; }
438 
443 template <>
444 constexpr bool supports_divide_conquer<sequential_execution>() { return true; }
445 
450 template <>
451 constexpr bool supports_pipeline<sequential_execution>() { return true; }
452 
453 template <typename ... InputIterators, typename OutputIterator,
454  typename Transformer>
456  std::tuple<InputIterators...> firsts,
457  OutputIterator first_out,
458  std::size_t sequence_size,
459  Transformer && transform_op) const
460 {
461  const auto last = std::next(std::get<0>(firsts), sequence_size);
462  while (std::get<0>(firsts) != last) {
463  *first_out++ = apply_deref_increment(
464  std::forward<Transformer>(transform_op), firsts);
465  }
466 }
467 
468 template <typename InputIterator, typename Identity, typename Combiner>
470  InputIterator first,
471  std::size_t sequence_size,
472  Identity && identity,
473  Combiner && combine_op) const
474 {
475  const auto last = std::next(first, sequence_size);
476  auto result{identity};
477  while (first != last) {
478  result = combine_op(result, *first++);
479  }
480  return result;
481 }
482 
483 template <typename ... InputIterators, typename Identity,
484  typename Transformer, typename Combiner>
486  std::tuple<InputIterators...> firsts,
487  std::size_t sequence_size,
488  Identity && identity,
489  Transformer && transform_op, Combiner && combine_op) const
490 {
491  const auto last = std::next(std::get<0>(firsts), sequence_size);
492  auto result{identity};
493  while (std::get<0>(firsts) != last) {
494  result = combine_op(result, apply_deref_increment(
495  std::forward<Transformer>(transform_op), firsts));
496  }
497  return result;
498 }
499 
500 template <typename ... InputIterators, typename OutputIterator,
501  typename StencilTransformer, typename Neighbourhood>
503  std::tuple<InputIterators...> firsts, OutputIterator first_out,
504  std::size_t sequence_size,
505  StencilTransformer && transform_op,
506  Neighbourhood && neighbour_op) const
507 {
508  const auto last = std::next(std::get<0>(firsts), sequence_size);
509  while (std::get<0>(firsts) != last) {
510  const auto f = std::get<0>(firsts);
511  *first_out++ = transform_op(f,
512  apply_increment(std::forward<Neighbourhood>(neighbour_op), firsts));
513  }
514 }
515 
516 
517 template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
519  Input && input,
520  Divider && divide_op,
521  Predicate && predicate_op,
522  Solver && solve_op,
523  Combiner && combine_op) const
524 {
525 
526  if (predicate_op(input)) { return solve_op(std::forward<Input>(input)); }
527  auto subproblems = divide_op(std::forward<Input>(input));
528 
529  using subproblem_type =
530  std::decay_t<typename std::result_of<Solver(Input)>::type>;
531  std::vector<subproblem_type> solutions;
532  for (auto && sp : subproblems) {
533  solutions.push_back(divide_conquer(sp,
534  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),std::forward<Solver>(solve_op),
535  std::forward<Combiner>(combine_op)));
536  }
537  return reduce(std::next(solutions.begin()), solutions.size()-1, solutions[0],
538  std::forward<Combiner>(combine_op));
539 }
540 
541 
542 template <typename Input, typename Divider, typename Solver, typename Combiner>
544  Input && input,
545  Divider && divide_op,
546  Solver && solve_op,
547  Combiner && combine_op) const
548 {
549 
550  auto subproblems = divide_op(std::forward<Input>(input));
551  if (subproblems.size()<=1) { return solve_op(std::forward<Input>(input)); }
552 
553  using subproblem_type =
554  std::decay_t<typename std::result_of<Solver(Input)>::type>;
555  std::vector<subproblem_type> solutions;
556  for (auto && sp : subproblems) {
557  solutions.push_back(divide_conquer(sp,
558  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
559  std::forward<Combiner>(combine_op)));
560  }
561  return reduce(std::next(solutions.begin()), solutions.size()-1, solutions[0],
562  std::forward<Combiner>(combine_op));
563 }
564 
565 template <typename Generator, typename ... Transformers>
567  Generator && generate_op,
568  Transformers && ... transform_ops) const
569 {
570  static_assert(is_generator<Generator>,
571  "First pipeline stage must be a generator");
572 
573  for (;;) {
574  auto x = generate_op();
575  if (!x) break;
576  do_pipeline(*x, std::forward<Transformers>(transform_ops)...);
577  }
578 }
579 
580 template <typename Item, typename Consumer,
582 void sequential_execution::do_pipeline(
583  Item && item,
584  Consumer && consume_op) const
585 {
586  consume_op(std::forward<Item>(item));
587 }
588 
589 template <typename Item, typename Transformer, typename ... OtherTransformers,
591 void sequential_execution::do_pipeline(
592  Item && item,
593  Transformer && transform_op,
594  OtherTransformers && ... other_ops) const
595 {
596  static_assert(!is_consumer<Transformer,Item>,
597  "Itermediate pipeline stage cannot be a consumer");
598 
599  do_pipeline(transform_op(std::forward<Item>(item)),
600  std::forward<OtherTransformers>(other_ops)...);
601 }
602 
603 template <typename Item, typename FarmTransformer,
604  template <typename> class Farm,
606 void sequential_execution::do_pipeline(
607  Item && item,
608  Farm<FarmTransformer> && farm_obj) const
609 {
610  farm_obj(std::forward<Item>(item));
611 }
612 
613 template <typename Item, typename FarmTransformer,
614  template <typename> class Farm,
615  typename... OtherTransformers,
617 void sequential_execution::do_pipeline(
618  Item && item,
619  Farm<FarmTransformer> && farm_obj,
620  OtherTransformers && ... other_transform_ops) const
621 {
622  static_assert(!is_consumer<Farm<FarmTransformer>,Item>,
623  "Itermediate pipeline stage cannot be a consumer");
624  do_pipeline(farm_obj(std::forward<Item>(item)),
625  std::forward<OtherTransformers>(other_transform_ops)...);
626 }
627 
628 template <typename Item, typename Predicate,
629  template <typename> class Filter,
630  typename ... OtherTransformers,
632 void sequential_execution::do_pipeline(
633  Item && item,
634  Filter<Predicate> && filter_obj,
635  OtherTransformers && ... other_transform_ops) const
636 {
637  if (filter_obj(std::forward<Item>(item))) {
638  do_pipeline(std::forward<Item>(item),
639  std::forward<OtherTransformers>(other_transform_ops)...);
640  }
641 }
642 
643 template <typename Item, typename Combiner, typename Identity,
644  template <typename C, typename I> class Reduce,
645  typename ... OtherTransformers,
647 void sequential_execution::do_pipeline(
648  Item && item,
649  Reduce<Combiner,Identity> && reduce_obj,
650  OtherTransformers && ... other_transform_ops) const
651 {
652  reduce_obj.add_item(std::forward<Identity>(item));
653  if (reduce_obj.reduction_needed()) {
654  auto red = reduce_obj.reduce_window(*this);
655  do_pipeline(red,
656  std::forward<OtherTransformers...>(other_transform_ops)...);
657  }
658 }
659 
660 template <typename Item, typename Transformer, typename Predicate,
661  template <typename T, typename P> class Iteration,
662  typename ... OtherTransformers,
665 void sequential_execution::do_pipeline(
666  Item && item,
667  Iteration<Transformer,Predicate> && iteration_obj,
668  OtherTransformers && ... other_transform_ops) const
669 {
670  auto new_item = iteration_obj.transform(std::forward<Item>(item));
671  while (!iteration_obj.predicate(new_item)) {
672  new_item = iteration_obj.transform(new_item);
673  }
674  do_pipeline(new_item,
675  std::forward<OtherTransformers...>(other_transform_ops)...);
676 }
677 
678 template <typename Item, typename Transformer, typename Predicate,
679  template <typename T, typename P> class Iteration,
680  typename ... OtherTransformers,
683 void sequential_execution::do_pipeline(
684  Item && item,
685  Iteration<Transformer,Predicate> && iteration_obj,
686  OtherTransformers && ... other_transform_ops) const
687 {
688  static_assert(!is_pipeline<Transformer>, "Not implemented");
689 }
690 
691 template <typename Item, typename ... Transformers,
692  template <typename...> class Pipeline,
693  typename ... OtherTransformers,
694  requires_pipeline<Pipeline<Transformers...>>>
695 void sequential_execution::do_pipeline(
696  Item && item,
697  Pipeline<Transformers...> && pipeline_obj,
698  OtherTransformers && ... other_transform_ops) const
699 {
700  do_pipeline_nested(
701  std::forward<Item>(item),
702  std::tuple_cat(pipeline_obj.transformers(),
703  std::forward_as_tuple(other_transform_ops...)),
704  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
705 }
706 
707 template <typename Item, typename ... Transformers, std::size_t ... I>
708 void sequential_execution::do_pipeline_nested(
709  Item && item,
710  std::tuple<Transformers...> && transform_ops,
711  std::index_sequence<I...>) const
712 {
713  do_pipeline(
714  std::forward<Item>(item),
715  std::forward<Transformers>(std::get<I>(transform_ops))...);
716 }
717 
718 } // end namespace grppi
719 
720 #endif
constexpr bool supports_map< sequential_execution >()
Determines if an execution policy supports the map pattern.
Definition: sequential_execution.h:416
Definition: callable_traits.h:26
constexpr bool supports_pipeline< sequential_execution >()
Determines if an execution policy supports the pipeline pattern.
Definition: sequential_execution.h:451
constexpr int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: sequential_execution.h:58
std::enable_if_t< is_reduce< T >, int > requires_reduce
Definition: reduce_pattern.h:135
constexpr bool is_ordered() const noexcept
Is execution ordered.
Definition: sequential_execution.h:76
constexpr void enable_ordering() const noexcept
Enable ordering.
Definition: sequential_execution.h:64
constexpr void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer &&transform_op) const
Applies a trasnformation to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:455
typename std::enable_if_t< is_iteration< T >, int > requires_iteration
Definition: iteration_pattern.h:88
void pipeline(Generator &&generate_op, Transformers &&...transform_op) const
Invoke Pipeline pattern.
Definition: sequential_execution.h:566
constexpr auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: sequential_execution.h:485
constexpr void disable_ordering() const noexcept
Disable ordering.
Definition: sequential_execution.h:70
constexpr bool supports_divide_conquer< sequential_execution >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: sequential_execution.h:444
constexpr bool is_sequential_execution()
Determine if a type is a sequential execution policy.
Definition: sequential_execution.h:400
STL namespace.
constexpr bool is_supported< sequential_execution >()
Determines if an execution policy is supported in the current compilation.
Definition: sequential_execution.h:409
constexpr bool is_consumer
Definition: callable_traits.h:103
constexpr bool supports_map_reduce< sequential_execution >()
Determines if an execution policy supports the map-reduce pattern.
Definition: sequential_execution.h:430
Definition: mpmc_queue.h:38
constexpr void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:502
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:92
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
constexpr sequential_execution() noexcept=default
Default constructor.
typename std::enable_if_t< is_filter< T >, int > requires_filter
Definition: filter_pattern.h:70
constexpr bool supports_reduce< sequential_execution >()
Determines if an execution policy supports the reduce pattern.
Definition: sequential_execution.h:423
Sequential execution policy.
Definition: sequential_execution.h:41
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke Divide/conquer pattern.
Definition: sequential_execution.h:543
decltype(auto) apply_deref_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the values obtained from the iterators in a tuple-like object...
Definition: iterator.h:63
constexpr void set_concurrency_degree(int n) const noexcept
Set number of grppi threads.
Definition: sequential_execution.h:52
constexpr auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: sequential_execution.h:469
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern comming from another context that uses mpmc_queues as communication channels...
Definition: sequential_execution.h:223
T pop()
Definition: mpmc_queue.h:93
constexpr bool supports_stencil< sequential_execution >()
Determines if an execution policy supports the stencil pattern.
Definition: sequential_execution.h:437
typename std::enable_if_t< is_context< T >, int > requires_context
Definition: common/context.h:95
decltype(auto) apply_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the iterators in a tuple like-object and the increments those iterators...
Definition: iterator.h:106
typename std::enable_if_t< is_farm< T >, int > requires_farm
Definition: farm_pattern.h:89