21 #ifndef GRPPI_SEQ_STREAM_REDUCE_H 22 #define GRPPI_SEQ_STREAM_REDUCE_H 54 template <
typename Identity,
typename Combiner,
typename Consumer,
57 int window_size,
int offset, Identity identity,
58 Generator && generate_op, Combiner && combine_op,
59 Consumer && consume_op)
62 using generated_type =
typename result_of<Generator()>::type;
63 using generated_value_type =
typename generated_type::value_type;
65 vector<generated_value_type> values;
66 values.reserve(window_size);
68 auto item{generate_op()};
70 while (item && values.size()!=window_size) {
71 values.push_back(*item);
74 if (values.size()>0) {
75 auto reduced_value =
reduce(ex, values.begin(), values.end(), identity,
76 std::forward<Combiner>(combine_op));
77 consume_op(reduced_value);
79 if (offset <= window_size) {
80 values.erase(values.begin(), values.begin() + offset);
83 values.erase(values.begin(), values.end());
84 auto diff = offset - window_size;
85 while (diff > 0 && item) {
Definition: callable_traits.h:24
Sequential execution policy.
Definition: sequential_execution.h:31
void stream_reduce(parallel_execution_native &ex, int window_size, int offset, Identity identity, Generator &&generate_op, Combiner &&combine_op, Consumer &&consume_op)
Invoke Stream reduction pattern on a stream with native parallel execution.
Definition: native/stream_reduce.h:56
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51