GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
seq/stream_iteration.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_SEQ_STREAM_ITERATION_H
22 #define GRPPI_SEQ_STREAM_ITERATION_H
23 
24 #include "sequential_execution.h"
25 
26 namespace grppi{
27 
48 template<typename Generator, typename Transformer, typename Predicate,
49  typename Consumer>
51  Generator generate_op, Transformer && transform_op,
52  Predicate predicate_op, Consumer consume_op)
53 {
54  for(;;) {
55  auto item = generate_op();
56  if (!item) break;
57  auto val = *item;
58  do {
59  val = transform_op(val);
60  } while (!predicate_op(val));
61  consume_op(val);
62  }
63 }
64 
79 template<typename Generator, typename Transformer, typename Predicate,
80  typename Consumer>
82  Generator && generate_op,
84  Predicate && predicate_op, Consumer && consume_op)
85 {
86  repeat_until(ex,
87  std::forward<Generator>(generate_op),
88  std::forward<farm_info<sequential_execution,Transformer> &&>(farm_obj),
89  std::forward<Predicate>(predicate_op),
90  std::forward<Consumer>(consume_op));
91 }
92 
93 
107 template<typename Generator, typename Transformer, typename Predicate,
108  typename Consumer>
110  Generator generate_op,
112  Predicate predicate_op,
113  Consumer consume_op)
114 {
115  for(;;) {
116  auto item = generate_op();
117  if (!item) break;
118  auto val = *item;
119  do {
120  val = farm_obj.task(val);
121  } while (!predicate_op(val));
122  consume_op(val);
123  }
124 }
125 
139 template<typename Generator, typename Predicate, typename Consumer,
140  typename ... Transformers>
142  Generator && generate_op,
144  Transformers...> & pipe_obj,
145  Predicate && predicate_op, Consumer && consume_op)
146 {
147  using namespace std;
148  repeat_until(ex,
149  forward<Generator>(generate_op),
151  Transformers...> &&>(pipe),
152  forward<Predicate>(predicate_op),
153  forward<Consumer>(consume_op));
154 }
155 
169 template<typename Generator, typename Predicate, typename Consumer,
170  typename ...Transformers>
172  Generator generate_op,
174  Predicate predicate_op, Consumer consume_op)
175 {
176  using namespace std;
177  for (;;) {
178  auto item = generate_op();
179  if (!item) break;
180  auto val = *item;
181  do {
182  using generated_type = typename result_of<Generator()>::type;
183  using generated_value_type = typename generated_type::value_type;
184  using pipeline_info_type = pipeline_info<sequential_execution,
185  Transformers...>;
186  val = composed_pipeline<generated_value_type,0,Transformers...>(val,
187  forward<pipeline_info_type>(pipe));
188  } while (!predicate_op(val));
189  consume_op(val);
190  }
191 }
192 
193 }
194 
195 #endif
Definition: callable_traits.h:24
Definition: patterns.h:29
void repeat_until(parallel_execution_native &ex, Generator &&generate_op, pipeline_info< parallel_execution_native, MoreTransformers... > &&pipe, Predicate predicate_op, Consumer consume_op)
Invoke Stream iteration pattern on a data stream with native parallel execution with a generator...
Definition: native/stream_iteration.h:55
STL namespace.
Definition: patterns.h:51
Sequential execution policy.
Definition: sequential_execution.h:31
void composed_pipeline(InQueue &input_queue, const pipeline_info< parallel_execution_native, MoreTransformers... > &pipe, OutQueue &output_queue, std::vector< std::thread > &tasks)
Definition: native/pipeline.h:36