GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
seq/stream_reduce.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_SEQ_STREAM_REDUCE_H
22 #define GRPPI_SEQ_STREAM_REDUCE_H
23 
24 #include "sequential_execution.h"
25 #include "reduce.h"
26 
27 #include <vector>
28 
29 namespace grppi {
30 
54 template <typename Identity, typename Combiner, typename Consumer,
55  typename Generator>
57  int window_size, int offset, Identity identity,
58  Generator && generate_op, Combiner && combine_op,
59  Consumer && consume_op)
60 {
61  using namespace std;
62  using generated_type = typename result_of<Generator()>::type;
63  using generated_value_type = typename generated_type::value_type;
64  // TODO: Evaluate better structure than vector
65  vector<generated_value_type> values;
66  values.reserve(window_size);
67 
68  auto item{generate_op()};
69  for (;;) {
70  while (item && values.size()!=window_size) {
71  values.push_back(*item);
72  item = generate_op();
73  }
74  if (values.size()>0) {
75  auto reduced_value = reduce(ex, values.begin(), values.end(), identity,
76  std::forward<Combiner>(combine_op));
77  consume_op(reduced_value);
78  if(item){
79  if (offset <= window_size) {
80  values.erase(values.begin(), values.begin() + offset);
81  }
82  else {
83  values.erase(values.begin(), values.end());
84  auto diff = offset - window_size;
85  while (diff > 0 && item) {
86  item = generate_op();
87  diff--;
88  }
89  }
90  }
91  }
92  if (!item ) break;
93  }
94 }
95 
101 }
102 
103 #endif
Definition: callable_traits.h:24
STL namespace.
Sequential execution policy.
Definition: sequential_execution.h:31
void stream_reduce(parallel_execution_native &ex, int window_size, int offset, Identity identity, Generator &&generate_op, Combiner &&combine_op, Consumer &&consume_op)
Invoke Stream reduction pattern on a stream with native parallel execution.
Definition: native/stream_reduce.h:56
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51