21 #ifndef GRPPI_TBB_STREAM_REDUCE_H 22 #define GRPPI_TBB_STREAM_REDUCE_H 56 template <
typename Identity,
typename Generator,
typename Combiner,
59 int window_size,
int offset, Identity identity,
60 Generator && generate_op, Combiner && combine_op,
64 using generated_type =
typename result_of<Generator()>::type;
65 using generated_value_type =
typename generated_type::value_type;
67 vector<generated_value_type> values;
68 values.reserve(window_size);
71 auto item = generate_op();
73 while (item && values.size()!=window_size) {
74 values.push_back(*item);
77 if (values.size()>0) {
78 auto reduced_value =
reduce(ex, values.begin(), values.end(), identity,
79 std::forward<Combiner>(combine_op));
80 consume_op(reduced_value);
82 if (offset <= window_size) {
83 values.erase(values.begin(), values.begin() + offset);
86 values.erase(values.begin(), values.end());
87 auto diff = offset - window_size;
88 while (diff > 0 && item) {
Definition: callable_traits.h:24
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:37
void stream_reduce(parallel_execution_native &ex, int window_size, int offset, Identity identity, Generator &&generate_op, Combiner &&combine_op, Consumer &&consume_op)
Invoke Stream reduction pattern on a stream with native parallel execution.
Definition: native/stream_reduce.h:56
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51