21 #ifndef GRPPI_NATIVE_STREAM_REDUCE_H 22 #define GRPPI_NATIVE_STREAM_REDUCE_H 54 template <
typename Identity,
typename Combiner,
typename Consumer,
57 int window_size,
int offset, Identity identity,
58 Generator && generate_op, Combiner && combine_op,
59 Consumer && consume_op)
62 using generated_type =
typename result_of<Generator()>::type;
63 using generated_value_type =
typename generated_type::value_type;
65 vector<generated_value_type> values;
66 values.reserve(window_size);
69 auto item = generate_op();
71 while (item && values.size()!=window_size) {
72 values.push_back(*item);
75 if (values.size()>0) {
76 auto reduced_value =
reduce(ex, values.begin(), values.end(), identity,
77 std::forward<Combiner>(combine_op));
78 consume_op(reduced_value);
80 if (offset <= window_size) {
81 values.erase(values.begin(), values.begin() + offset);
84 values.erase(values.begin(), values.end());
85 auto diff = offset - window_size;
86 while (diff > 0 && item) {
Definition: callable_traits.h:24
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
void stream_reduce(parallel_execution_native &ex, int window_size, int offset, Identity identity, Generator &&generate_op, Combiner &&combine_op, Consumer &&consume_op)
Invoke Stream reduction pattern on a stream with native parallel execution.
Definition: native/stream_reduce.h:56
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51