GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/stream_reduce.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_STREAM_REDUCE_H
22 #define GRPPI_NATIVE_STREAM_REDUCE_H
23 
25 #include "reduce.h"
26 
27 #include <thread>
28 
29 namespace grppi {
30 
54 template <typename Identity, typename Combiner, typename Consumer,
55  typename Generator>
57  int window_size, int offset, Identity identity,
58  Generator && generate_op, Combiner && combine_op,
59  Consumer && consume_op)
60 {
61  using namespace std;
62  using generated_type = typename result_of<Generator()>::type;
63  using generated_value_type = typename generated_type::value_type;
64  // TODO: Evaluate better structure than vector
65  vector<generated_value_type> values;
66  values.reserve(window_size);
67 
68  // TODO: Set generator and consumer in separate threads
69  auto item = generate_op();
70  for (;;) {
71  while (item && values.size()!=window_size) {
72  values.push_back(*item);
73  item = generate_op();
74  }
75  if (values.size()>0) {
76  auto reduced_value = reduce(ex, values.begin(), values.end(), identity,
77  std::forward<Combiner>(combine_op));
78  consume_op(reduced_value);
79  if (item) {
80  if (offset <= window_size) {
81  values.erase(values.begin(), values.begin() + offset);
82  }
83  else {
84  values.erase(values.begin(), values.end());
85  auto diff = offset - window_size;
86  while (diff > 0 && item) {
87  item = generate_op();
88  diff--;
89  }
90  }
91  }
92  }
93  if (!item) break;
94  }
95 }
96 
102 }
103 
104 #endif
Definition: callable_traits.h:24
STL namespace.
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
void stream_reduce(parallel_execution_native &ex, int window_size, int offset, Identity identity, Generator &&generate_op, Combiner &&combine_op, Consumer &&consume_op)
Invoke Stream reduction pattern on a stream with native parallel execution.
Definition: native/stream_reduce.h:56
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51