16 #ifndef GRPPI_SEQ_SEQUENTIAL_EXECUTION_H
17 #define GRPPI_SEQ_SEQUENTIAL_EXECUTION_H
19 #include "../common/mpmc_queue.h"
20 #include "../common/iterator.h"
21 #include "../common/callable_traits.h"
22 #include "../common/execution_traits.h"
23 #include "../common/patterns.h"
24 #include "../common/pack_traits.h"
26 #include <type_traits>
71 constexpr
bool is_ordered() const noexcept {
return true; }
87 template <
typename ... InputIterators,
typename OutputIterator,
89 constexpr
void map(std::tuple<InputIterators...> firsts,
90 OutputIterator first_out, std::size_t sequence_size,
91 Transformer && transform_op)
const;
106 template <
typename InputIterator,
typename Identity,
typename Combiner>
107 constexpr
auto reduce(InputIterator first, std::size_t sequence_size,
108 Identity && identity,
109 Combiner && combine_op)
const;
125 template <
typename ... InputIterators,
typename Identity,
126 typename Transformer,
typename Combiner>
127 constexpr
auto map_reduce(std::tuple<InputIterators...> firsts,
128 std::size_t sequence_size,
129 Identity && identity,
130 Transformer && transform_op, Combiner && combine_op)
const;
148 template <
typename ... InputIterators,
typename OutputIterator,
149 typename StencilTransformer,
typename Neighbourhood>
150 constexpr
void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
151 std::size_t sequence_size,
152 StencilTransformer && transform_op,
153 Neighbourhood && neighbour_op)
const;
167 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
168 [[deprecated(
"Use new interface with predicate argument")]]
170 Divider && divide_op,
172 Combiner && combine_op)
const;
188 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
190 Divider && divide_op,
191 Predicate && predicate_op,
193 Combiner && combine_op)
const;
203 template <
typename Generator,
typename ... Transformers>
204 void pipeline(Generator && generate_op,
205 Transformers && ... transform_op)
const;
217 template <
typename InputType,
typename Transformer,
typename OutputType>
222 using optional_output_type =
typename OutputType::first_type;
224 auto item = input_queue.
pop();
225 if(!item.first)
break;
226 do_pipeline(*item.first, std::forward<Transformer>(transform_op),
227 [&](
auto output_item) {
228 output_queue.push( make_pair(optional_output_type{output_item}, item.second) );
232 output_queue.push( make_pair(optional_output_type{}, -1) );
238 template <
typename Item,
typename Consumer,
239 requires_no_pattern<Consumer> = 0>
240 void do_pipeline(Item && item, Consumer && consume_op)
const;
242 template <
typename Item,
typename Transformer,
typename ... OtherTransformers,
243 requires_no_pattern<Transformer> = 0>
244 void do_pipeline(Item && item, Transformer && transform_op,
245 OtherTransformers && ... other_ops)
const;
247 template <
typename Item,
typename FarmTransformer,
248 template <
typename>
class Farm,
249 requires_farm<Farm<FarmTransformer>> = 0>
250 void do_pipeline(Item && item, Farm<FarmTransformer> & farm_obj)
const
252 do_pipeline(std::forward<Item>(item), std::move(farm_obj));
255 template <
typename Item,
typename FarmTransformer,
256 template <
typename>
class Farm,
257 requires_farm<Farm<FarmTransformer>> = 0>
258 void do_pipeline(Item && item, Farm<FarmTransformer> && farm_obj)
const;
260 template <
typename Item,
typename Execution,
typename Transformer,
261 template <
typename,
typename>
class Context,
262 typename ... OtherTransformers,
263 requires_context<Context<Execution,Transformer>> = 0>
264 void do_pipeline(Item && item, Context<Execution,Transformer> && context_op,
265 OtherTransformers &&... other_ops)
const
267 do_pipeline(item, std::forward<Transformer>(context_op.transformer()),
268 std::forward<OtherTransformers>(other_ops)...);
271 template <
typename Item,
typename Execution,
typename Transformer,
272 template <
typename,
typename>
class Context,
273 typename ... OtherTransformers,
274 requires_context<Context<Execution,Transformer>> = 0>
275 void do_pipeline(Item && item, Context<Execution,Transformer> & context_op,
276 OtherTransformers &&... other_ops)
const
278 do_pipeline(item, std::move(context_op),
279 std::forward<OtherTransformers>(other_ops)...);
283 template <
typename Item,
typename FarmTransformer,
284 template <
typename>
class Farm,
285 typename... OtherTransformers,
286 requires_farm<Farm<FarmTransformer>> = 0>
287 void do_pipeline(Item && item, Farm<FarmTransformer> & farm_obj,
288 OtherTransformers && ... other_transform_ops)
const
290 do_pipeline(std::forward<Item>(item), std::move(farm_obj),
291 std::forward<OtherTransformers>(other_transform_ops)...);
294 template <
typename Item,
typename FarmTransformer,
295 template <
typename>
class Farm,
296 typename... OtherTransformers,
297 requires_farm<Farm<FarmTransformer>> = 0>
298 void do_pipeline(Item && item, Farm<FarmTransformer> && farm_obj,
299 OtherTransformers && ... other_transform_ops)
const;
301 template <
typename Item,
typename Predicate,
302 template <
typename>
class Filter,
303 typename ... OtherTransformers,
304 requires_filter<Filter<Predicate>> = 0>
305 void do_pipeline(Item && item, Filter<Predicate> & filter_obj,
306 OtherTransformers && ... other_transform_ops)
const
308 do_pipeline(std::forward<Item>(item), std::move(filter_obj),
309 std::forward<OtherTransformers>(other_transform_ops)...);
312 template <
typename Item,
typename Predicate,
313 template <
typename>
class Filter,
314 typename ... OtherTransformers,
315 requires_filter<Filter<Predicate>> = 0>
316 void do_pipeline(Item && item, Filter<Predicate> && filter_obj,
317 OtherTransformers && ... other_transform_ops)
const;
319 template <
typename Item,
typename Combiner,
typename Identity,
320 template <
typename C,
typename I>
class Reduce,
321 typename ... OtherTransformers,
322 requires_reduce<Reduce<Combiner,Identity>> = 0>
323 void do_pipeline(Item && item, Reduce<Combiner,Identity> & reduce_obj,
324 OtherTransformers && ... other_transform_ops)
const
326 do_pipeline(std::forward<Item>(item), std::move(reduce_obj),
327 std::forward<OtherTransformers>(other_transform_ops)...);
331 template <
typename Item,
typename Combiner,
typename Identity,
332 template <
typename C,
typename I>
class Reduce,
333 typename ... OtherTransformers,
334 requires_reduce<Reduce<Combiner,Identity>> = 0>
335 void do_pipeline(Item && item, Reduce<Combiner,Identity> && reduce_obj,
336 OtherTransformers && ... other_transform_ops)
const;
338 template <
typename Item,
typename Transformer,
typename Predicate,
339 template <
typename T,
typename P>
class Iteration,
340 typename ... OtherTransformers,
341 requires_iteration<Iteration<Transformer,Predicate>> = 0>
342 void do_pipeline(Item && item,
343 Iteration<Transformer,Predicate> & iteration_obj,
344 OtherTransformers && ... other_transform_ops)
const
346 do_pipeline(std::forward<Item>(item), std::move(iteration_obj),
347 std::forward<OtherTransformers>(other_transform_ops)...);
350 template <
typename Item,
typename Transformer,
typename Predicate,
351 template <
typename T,
typename P>
class Iteration,
352 typename ...OtherTransformers,
353 requires_iteration<Iteration<Transformer,Predicate>> = 0,
354 requires_no_pattern<Transformer> = 0>
355 void do_pipeline(Item && item,
356 Iteration<Transformer,Predicate> && iteration_obj,
357 OtherTransformers && ... other_transform_ops)
const;
359 template <
typename Item,
typename Transformer,
typename Predicate,
360 template <
typename T,
typename P>
class Iteration,
361 typename ...OtherTransformers,
362 requires_iteration<Iteration<Transformer,Predicate>> = 0,
363 requires_pipeline<Transformer> = 0>
364 void do_pipeline(Item && item,
365 Iteration<Transformer,Predicate> && iteration_obj,
366 OtherTransformers && ... other_transform_ops)
const;
368 template <
typename Item,
typename ... Transformers,
369 template <
typename...>
class Pipeline,
370 typename ... OtherTransformers,
372 void do_pipeline(Item && item, Pipeline<Transformers...> & pipeline_obj,
373 OtherTransformers && ... other_transform_ops)
const
375 do_pipeline(std::forward<Item>(item), std::move(pipeline_obj),
376 std::forward<OtherTransformers>(other_transform_ops)...);
379 template <
typename Item,
typename ... Transformers,
380 template <
typename...>
class Pipeline,
381 typename ... OtherTransformers,
383 void do_pipeline(Item && item, Pipeline<Transformers...> && pipeline_obj,
384 OtherTransformers && ... other_transform_ops)
const;
386 template <
typename Item,
typename ... Transformers, std::size_t ... I>
387 void do_pipeline_nested(Item && item,
388 std::tuple<Transformers...> && transform_ops,
389 std::index_sequence<I...>)
const;
394 template <
typename E>
396 return std::is_same<E, sequential_execution>::value;
448 template <
typename ... InputIterators,
typename OutputIterator,
449 typename Transformer>
451 std::tuple<InputIterators...> firsts,
452 OutputIterator first_out,
453 std::size_t sequence_size,
454 Transformer && transform_op)
const
456 const auto last = std::next(std::get<0>(firsts), sequence_size);
457 while (std::get<0>(firsts) != last) {
459 std::forward<Transformer>(transform_op), firsts);
463 template <
typename InputIterator,
typename Identity,
typename Combiner>
466 std::size_t sequence_size,
467 Identity && identity,
468 Combiner && combine_op)
const
470 const auto last = std::next(first, sequence_size);
471 auto result{identity};
472 while (first != last) {
473 result = combine_op(result, *first++);
478 template <
typename ... InputIterators,
typename Identity,
479 typename Transformer,
typename Combiner>
481 std::tuple<InputIterators...> firsts,
482 std::size_t sequence_size,
483 Identity && identity,
484 Transformer && transform_op, Combiner && combine_op)
const
486 const auto last = std::next(std::get<0>(firsts), sequence_size);
487 auto result{identity};
488 while (std::get<0>(firsts) != last) {
490 std::forward<Transformer>(transform_op), firsts));
495 template <
typename ... InputIterators,
typename OutputIterator,
496 typename StencilTransformer,
typename Neighbourhood>
498 std::tuple<InputIterators...> firsts, OutputIterator first_out,
499 std::size_t sequence_size,
500 StencilTransformer && transform_op,
501 Neighbourhood && neighbour_op)
const
503 const auto last = std::next(std::get<0>(firsts), sequence_size);
504 while (std::get<0>(firsts) != last) {
505 const auto f = std::get<0>(firsts);
506 *first_out++ = transform_op(f,
512 template <
typename Input,
typename Div
ider,
typename Predicate,
typename Solver,
typename Combiner>
515 Divider && divide_op,
516 Predicate && predicate_op,
518 Combiner && combine_op)
const
521 if (predicate_op(input)) {
return solve_op(std::forward<Input>(input)); }
522 auto subproblems = divide_op(std::forward<Input>(input));
524 using subproblem_type =
525 std::decay_t<
typename std::result_of<Solver(Input)>::type>;
526 std::vector<subproblem_type> solutions;
527 for (
auto && sp : subproblems) {
529 std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),std::forward<Solver>(solve_op),
530 std::forward<Combiner>(combine_op)));
532 return reduce(std::next(solutions.begin()), solutions.size()-1, solutions[0],
533 std::forward<Combiner>(combine_op));
537 template <
typename Input,
typename Div
ider,
typename Solver,
typename Combiner>
540 Divider && divide_op,
542 Combiner && combine_op)
const
545 auto subproblems = divide_op(std::forward<Input>(input));
546 if (subproblems.size()<=1) {
return solve_op(std::forward<Input>(input)); }
548 using subproblem_type =
549 std::decay_t<
typename std::result_of<Solver(Input)>::type>;
550 std::vector<subproblem_type> solutions;
551 for (
auto && sp : subproblems) {
553 std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
554 std::forward<Combiner>(combine_op)));
556 return reduce(std::next(solutions.begin()), solutions.size()-1, solutions[0],
557 std::forward<Combiner>(combine_op));
560 template <
typename Generator,
typename ... Transformers>
562 Generator && generate_op,
563 Transformers && ... transform_ops)
const
565 static_assert(is_generator<Generator>,
566 "First pipeline stage must be a generator");
569 auto x = generate_op();
571 do_pipeline(*x, std::forward<Transformers>(transform_ops)...);
575 template <
typename Item,
typename Consumer,
577 void sequential_execution::do_pipeline(
579 Consumer && consume_op)
const
581 consume_op(std::forward<Item>(item));
584 template <
typename Item,
typename Transformer,
typename ... OtherTransformers,
585 requires_no_pattern<Transformer>>
586 void sequential_execution::do_pipeline(
588 Transformer && transform_op,
589 OtherTransformers && ... other_ops)
const
591 static_assert(!is_consumer<Transformer,Item>,
592 "Intermediate pipeline stage cannot be a consumer");
594 do_pipeline(transform_op(std::forward<Item>(item)),
595 std::forward<OtherTransformers>(other_ops)...);
598 template <
typename Item,
typename FarmTransformer,
599 template <
typename>
class Farm,
600 requires_farm<Farm<FarmTransformer>>>
601 void sequential_execution::do_pipeline(
603 Farm<FarmTransformer> && farm_obj)
const
605 farm_obj(std::forward<Item>(item));
608 template <
typename Item,
typename FarmTransformer,
609 template <
typename>
class Farm,
610 typename... OtherTransformers,
611 requires_farm<Farm<FarmTransformer>>>
612 void sequential_execution::do_pipeline(
614 Farm<FarmTransformer> && farm_obj,
615 OtherTransformers && ... other_transform_ops)
const
617 static_assert(!
is_consumer<Farm<FarmTransformer>,Item>,
618 "Intermediate pipeline stage cannot be a consumer");
619 do_pipeline(farm_obj(std::forward<Item>(item)),
620 std::forward<OtherTransformers>(other_transform_ops)...);
623 template <
typename Item,
typename Predicate,
624 template <
typename>
class Filter,
625 typename ... OtherTransformers,
626 requires_filter<Filter<Predicate>>>
627 void sequential_execution::do_pipeline(
629 Filter<Predicate> && filter_obj,
630 OtherTransformers && ... other_transform_ops)
const
632 if (filter_obj(std::forward<Item>(item))) {
633 do_pipeline(std::forward<Item>(item),
634 std::forward<OtherTransformers>(other_transform_ops)...);
638 template <
typename Item,
typename Combiner,
typename Identity,
639 template <
typename C,
typename I>
class Reduce,
640 typename ... OtherTransformers,
641 requires_reduce<Reduce<Combiner,Identity>>>
642 void sequential_execution::do_pipeline(
644 Reduce<Combiner,Identity> && reduce_obj,
645 OtherTransformers && ... other_transform_ops)
const
647 reduce_obj.add_item(std::forward<Identity>(item));
648 if (reduce_obj.reduction_needed()) {
649 auto red = reduce_obj.reduce_window(*
this);
651 std::forward<OtherTransformers...>(other_transform_ops)...);
655 template <
typename Item,
typename Transformer,
typename Predicate,
656 template <
typename T,
typename P>
class Iteration,
657 typename ... OtherTransformers,
658 requires_iteration<Iteration<Transformer,Predicate>>,
659 requires_no_pattern<Transformer>>
660 void sequential_execution::do_pipeline(
662 Iteration<Transformer,Predicate> && iteration_obj,
663 OtherTransformers && ... other_transform_ops)
const
665 auto new_item = iteration_obj.transform(std::forward<Item>(item));
666 while (!iteration_obj.predicate(new_item)) {
667 new_item = iteration_obj.transform(new_item);
669 do_pipeline(new_item,
670 std::forward<OtherTransformers...>(other_transform_ops)...);
673 template <
typename Item,
typename Transformer,
typename Predicate,
674 template <
typename T,
typename P>
class Iteration,
675 typename ... OtherTransformers,
676 requires_iteration<Iteration<Transformer,Predicate>>,
677 requires_pipeline<Transformer>>
678 void sequential_execution::do_pipeline(
680 Iteration<Transformer,Predicate> &&,
681 OtherTransformers && ...)
const
683 static_assert(!is_pipeline<Transformer>,
"Not implemented");
686 template <
typename Item,
typename ... Transformers,
687 template <
typename...>
class Pipeline,
688 typename ... OtherTransformers,
690 void sequential_execution::do_pipeline(
692 Pipeline<Transformers...> && pipeline_obj,
693 OtherTransformers && ... other_transform_ops)
const
696 std::forward<Item>(item),
697 std::tuple_cat(pipeline_obj.transformers(),
698 std::forward_as_tuple(other_transform_ops...)),
699 std::make_index_sequence<
sizeof...(Transformers)+
sizeof...(OtherTransformers)>());
702 template <
typename Item,
typename ... Transformers, std::size_t ... I>
703 void sequential_execution::do_pipeline_nested(
705 std::tuple<Transformers...> && transform_ops,
706 std::index_sequence<I...>)
const
709 std::forward<Item>(item),
710 std::forward<Transformers>(std::get<I>(transform_ops))...);
Definition: mpmc_queue.h:33
T pop()
Definition: mpmc_queue.h:95
Sequential execution policy.
Definition: sequential_execution.h:36
constexpr bool is_ordered() const noexcept
Is execution ordered.
Definition: sequential_execution.h:71
constexpr int concurrency_degree() const noexcept
Get number of grppi threads.
Definition: sequential_execution.h:53
constexpr auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: sequential_execution.h:464
constexpr void disable_ordering() const noexcept
Disable ordering.
Definition: sequential_execution.h:65
constexpr void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:497
void pipeline(Generator &&generate_op, Transformers &&... transform_op) const
Invoke Pipeline pattern.
Definition: sequential_execution.h:561
constexpr void set_concurrency_degree(int) const noexcept
Set number of grppi threads.
Definition: sequential_execution.h:47
constexpr auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: sequential_execution.h:480
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern coming from another context that uses mpmc_queues as communication channels.
Definition: sequential_execution.h:218
auto divide_conquer(Input &&input, Divider &÷_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: sequential_execution.h:538
constexpr sequential_execution() noexcept=default
Default constructor.
constexpr void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer &&transform_op) const
Applies a transformation to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:450
constexpr void enable_ordering() const noexcept
Enable ordering.
Definition: sequential_execution.h:59
auto divide_conquer(const Execution &ex, Input &&input, Divider &÷r_op, Solver &&solver_op, Combiner &&combiner_op)
Invoke md_divide-conquer. \parapm Execution Execution type.
Definition: divideconquer.h:49
void map(const Execution &ex, std::tuple< InputIterators... > firsts, InputIt last, OutputIt first_out, Transformer transform_op)
Invoke Map pattern on a data sequence.
Definition: map.h:51
auto map_reduce(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke md_map-reduce on a data sequence.
Definition: mapreduce.h:52
void pipeline(const Execution &ex, Generator &&generate_op, Transformers &&... transform_ops)
Invoke Pipeline pattern on a data stream.
Definition: pipeline.h:46
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:50
void stencil(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, OutputIt out, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op)
Invoke Stencil pattern on a data sequence with sequential execution.
Definition: stencil.h:54
decltype(auto) apply_deref_increment(F &&f, T< Iterators... > &iterators, std::index_sequence< I... >)
Definition: iterator.h:27
decltype(auto) apply_increment(F &&f, T< Iterators... > &iterators, std::index_sequence< I... >)
Definition: iterator.h:71
Definition: callable_traits.h:21
constexpr bool supports_pipeline< sequential_execution >()
Determines if an execution policy supports the pipeline pattern.
Definition: sequential_execution.h:446
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
constexpr bool supports_stencil< sequential_execution >()
Determines if an execution policy supports the stencil pattern.
Definition: sequential_execution.h:432
constexpr bool supports_reduce< sequential_execution >()
Determines if an execution policy supports the reduce pattern.
Definition: sequential_execution.h:418
constexpr bool is_sequential_execution()
Determine if a type is a sequential execution policy.
Definition: sequential_execution.h:395
constexpr bool supports_map_reduce< sequential_execution >()
Determines if an execution policy supports the map-reduce pattern.
Definition: sequential_execution.h:425
constexpr bool is_consumer
Definition: callable_traits.h:98
constexpr bool supports_divide_conquer< sequential_execution >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: sequential_execution.h:439
constexpr bool supports_map< sequential_execution >()
Determines if an execution policy supports the map pattern.
Definition: sequential_execution.h:411
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:87
constexpr bool is_supported< sequential_execution >()
Determines if an execution policy is supported in the current compilation.
Definition: sequential_execution.h:404