GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
pipeline_impl.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_FF_DETAIL_PIPELINE_IMPL_H
22 #define GRPPI_FF_DETAIL_PIPELINE_IMPL_H
23 
24 #ifdef GRPPI_FF
25 
26 #include "simple_node.h"
27 #include "ordered_stream_reduce.h"
29 #include "ordered_stream_filter.h"
31 #include "iteration_nodes.h"
32 #include "../../common/mpmc_queue.h"
33 
34 
35 #include <ff/allocator.hpp>
36 #include <ff/pipeline.hpp>
37 #include <ff/farm.hpp>
38 
39 #include <atomic>
40 
41 namespace grppi {
42 
43 namespace detail_ff {
44 
45 
46 class pipeline_impl : public ff::ff_pipeline {
47 public:
48 
49  template <typename Generator, typename ... Transformers>
50  pipeline_impl(int nworkers, bool ordered, Generator && gen,
51  Transformers && ... transform_ops);
52 
53 private:
54 
58  void set_queue_attributes(int size, queue_mode mode) noexcept {
59  queue_size_ = size;
60  queue_mode_ = mode;
61  }
62 
69  template <typename T>
70  mpmc_queue<T> make_queue() const {
71  return {queue_size_, queue_mode_};
72  }
73 
74  void add_node(std::unique_ptr<ff_node> && p_node) {
75  ff::ff_pipeline::add_stage(p_node.get());
76  nodes_.push_back(std::forward<std::unique_ptr<ff_node>>(p_node));
77  }
78 
79  template <typename Input, typename Transformer,
80  requires_no_pattern<Transformer> = 0>
81  auto add_stages(Transformer &&stage)
82  {
83  using gen_value_type = std::decay_t<Input>;
84  using node_type = node_impl<gen_value_type,void,Transformer>;
85 
86  auto p_stage = std::make_unique<node_type>(std::forward<Transformer>(stage));
87  add_node(std::move(p_stage));
88  }
89 
90  template <typename Input, typename Transformer, typename ... OtherTransformers,
91  requires_no_pattern<Transformer> = 0>
92  auto add_stages(Transformer && transform_op,
93  OtherTransformers && ... other_transform_ops)
94  {
95  static_assert(!std::is_void<Input>::value,
96  "Transformer must take non-void argument");
97  using output_type =
98  std::decay_t<typename std::result_of<Transformer(Input)>::type>;
99  static_assert(!std::is_void<output_type>::value,
100  "Transformer must return a non-void result");
101 
102  using node_type = node_impl<Input,output_type,Transformer>;
103  auto p_stage = std::make_unique<node_type>(
104  std::forward<Transformer>(transform_op));
105 
106  add_node(std::move(p_stage));
107  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
108  }
109 
110  template <typename Input, typename ... Transformers,
111  template <typename...> class Pipeline,
112  typename ... OtherTransformers,
113  requires_pipeline<Pipeline<Transformers...>> = 0>
114  auto add_stages(Pipeline<Transformers...> & pipeline_obj,
115  OtherTransformers && ... other_transform_ops)
116  {
117  return this->template add_stages<Input>(std::move(pipeline_obj),
118  std::forward<OtherTransformers>(other_transform_ops)...);
119  }
120 
121  template <typename Input, typename ... Transformers,
122  template <typename...> class Pipeline,
123  typename ... OtherTransformers,
124  requires_pipeline<Pipeline<Transformers...>> = 0>
125  auto add_stages(Pipeline<Transformers...> && pipeline_obj,
126  OtherTransformers && ... other_transform_ops)
127  {
128  return this->template add_stages_nested<Input>(
129  std::tuple_cat(
130  pipeline_obj.transformers(),
131  std::forward_as_tuple(other_transform_ops...)
132  ),
133  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
134  }
135 
136  template <typename Input, typename ... Transformers, std::size_t ... I>
137  auto add_stages_nested(std::tuple<Transformers...> && transform_ops,
138  std::index_sequence<I...>)
139  {
140  return add_stages<Input>(std::forward<Transformers>(std::get<I>(transform_ops))...);
141  }
142 
143  template <typename Input, typename FarmTransformer,
144  template <typename> class Farm,
145  requires_farm<Farm<FarmTransformer>> = 0>
146  auto add_stages(Farm<FarmTransformer> & farm_obj)
147  {
148  return this->template add_stages<Input>(std::move(farm_obj));
149  }
150 
151  template <typename Input, typename FarmTransformer,
152  template <typename> class Farm,
153  requires_farm<Farm<FarmTransformer>> = 0>
154  auto add_stages(Farm<FarmTransformer> && farm_obj)
155  {
156  static_assert(!std::is_void<Input>::value,
157  "Farm must take non-void argument");
158  using output_type = std::decay_t<typename std::result_of<
159  FarmTransformer(Input)>::type>;
160 
161  using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
162  std::vector<std::unique_ptr<ff::ff_node>> workers;
163  for(int i=0; i<nworkers_; ++i) {
164  workers.push_back(std::make_unique<worker_type>(
165  std::forward<Farm<FarmTransformer>>(farm_obj))
166  );
167  }
168 
169  if(ordered_) {
170  using node_type = ff::ff_OFarm<Input,output_type>;
171  auto p_farm = std::make_unique<node_type>(std::move(workers));
172  add_node(std::move(p_farm));
173  }
174  else {
175  using node_type = ff::ff_Farm<Input,output_type>;
176  auto p_farm = std::make_unique<node_type>(std::move(workers));
177  add_node(std::move(p_farm));
178  }
179  }
180 
181  // parallel stage -- Farm pattern ref with variadic
182  template <typename Input, typename FarmTransformer,
183  template <typename> class Farm,
184  typename ... OtherTransformers,
185  requires_farm<Farm<FarmTransformer>> = 0>
186  auto add_stages(Farm<FarmTransformer> & farm_obj,
187  OtherTransformers && ... other_transform_ops)
188  {
189  return this->template add_stages<Input>(std::move(farm_obj),
190  std::forward<OtherTransformers>(other_transform_ops)...);
191  }
192 
193  // parallel stage -- Farm pattern with variadic
194  template <typename Input, typename FarmTransformer,
195  template <typename> class Farm,
196  typename ... OtherTransformers,
197  requires_farm<Farm<FarmTransformer>> = 0>
198  auto add_stages( Farm<FarmTransformer> && farm_obj,
199  OtherTransformers && ... other_transform_ops)
200  {
201  static_assert(!std::is_void<Input>::value,
202  "Farm must take non-void argument");
203  using output_type =
204  std::decay_t<typename std::result_of<FarmTransformer(Input)>::type>;
205  static_assert(!std::is_void<output_type>::value,
206  "Farm must return a non-void result");
207 
208  using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
209  std::vector<std::unique_ptr<ff::ff_node>> workers;
210 
211  for(int i=0; i<nworkers_; ++i) {
212  workers.push_back(std::make_unique<worker_type>(
213  std::forward<Farm<FarmTransformer>>(farm_obj))
214  );
215  }
216 
217  if(ordered_) {
218  using node_type = ff::ff_OFarm<Input,output_type>;
219  auto p_farm = std::make_unique<node_type>(std::move(workers));
220  add_node(std::move(p_farm));
221  add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
222  }
223  else {
224  using node_type = ff::ff_Farm<Input,output_type>;
225  auto p_farm = std::make_unique<node_type>(std::move(workers));
226  add_node(std::move(p_farm));
227  add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
228  }
229  }
230 
231  // parallel stage -- Filter pattern ref
232  template <typename Input, typename Predicate,
233  template <typename> class Filter,
234  requires_filter<Filter<Predicate>> = 0>
235  auto add_stages(Filter<Predicate> & filter_obj)
236  {
237  return this->template add_stages<Input>(std::move(filter_obj));
238  }
239 
240  // parallel stage -- Filter pattern
241  template <typename Input, typename Predicate,
242  template <typename> class Filter,
243  requires_filter<Filter<Predicate>> = 0>
244  auto add_stages(Filter<Predicate> && filter_obj)
245 {
246  static_assert(!std::is_void<Input>::value,
247  "Filter must take non-void argument");
248 
249  if(ordered_) {
250  using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
251  auto p_farm = std::make_unique<node_type>(
252  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
253  add_node(std::move(p_farm));
254  }
255  else {
256  using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
257  auto p_farm = std::make_unique<node_type>(
258  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
259  add_node(std::move(p_farm));
260  }
261  }
262 
263  // parallel stage -- Filter pattern ref with variadics
264  template <typename Input, typename Predicate,
265  template <typename> class Filter,
266  typename ... OtherTransformers,
267  requires_filter<Filter<Predicate>> = 0>
268  auto add_stages(Filter<Predicate> & filter_obj,
269  OtherTransformers && ... other_transform_ops)
270  {
271  return this->template add_stages<Input>(std::move(filter_obj),
272  std::forward<OtherTransformers>(other_transform_ops)...);
273  }
274 
275  // parallel stage -- Filter pattern with variadics
276  template <typename Input, typename Predicate,
277  template <typename> class Filter,
278  typename ... OtherTransformers,
279  requires_filter<Filter<Predicate>> = 0>
280  auto add_stages(Filter<Predicate> && filter_obj,
281  OtherTransformers && ... other_transform_ops)
282  {
283  static_assert(!std::is_void<Input>::value,
284  "Filter must take non-void argument");
285 
286  if(ordered_) {
287  using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
288  auto p_farm = std::make_unique<node_type>(
289  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
290  add_node(std::move(p_farm));
291  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
292  }
293  else {
294  using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
295  auto p_farm = std::make_unique<node_type>(
296  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
297  add_node(std::move(p_farm));
298  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
299  }
300  }
301 
302  template <typename Input, typename Combiner, typename Identity,
303  template <typename C, typename I> class Reduce,
304  typename ... OtherTransformers,
305  requires_reduce<Reduce<Combiner,Identity>> = 0>
306  auto add_stages(Reduce<Combiner,Identity> & reduce_obj,
307  OtherTransformers && ... other_transform_ops)
308  {
309  return this->template add_stages<Input>(std::move(reduce_obj),
310  std::forward<OtherTransformers>(other_transform_ops)...);
311  }
312 
313  template <typename Input, typename Combiner, typename Identity,
314  template <typename C, typename I> class Reduce,
315  typename ... OtherTransformers,
316  requires_reduce<Reduce<Combiner,Identity>> = 0>
317  auto add_stages(Reduce<Combiner,Identity> && reduce_obj,
318  OtherTransformers && ... other_transform_ops)
319  {
320  static_assert(!std::is_void<Input>::value,
321  "Reduce must take non-void argument");
322 
323  if(ordered_) {
324  using reducer_type = Reduce<Combiner,Identity>;
325  using node_type = ordered_stream_reduce<Input,reducer_type,Combiner>;
326  auto p_farm = std::make_unique<node_type>(
327  std::forward<reducer_type>(reduce_obj),
328  nworkers_);
329  add_node(std::move(p_farm));
330  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
331  }
332  else {
333  using reducer_type = Reduce<Combiner,Identity>;
334  using node_type = unordered_stream_reduce<Input,reducer_type,Combiner>;
335  auto p_farm = std::make_unique<node_type>(
336  std::forward<Reduce<Combiner,Identity>>(reduce_obj),
337  nworkers_);
338  add_node(std::move(p_farm));
339  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
340  }
341  }
342 
347  template <typename Input, typename Transformer, typename Predicate,
348  template <typename T, typename P> class Iteration,
349  typename ... OtherTransformers,
350  requires_iteration<Iteration<Transformer,Predicate>> =0,
351  requires_no_pattern<Transformer> =0>
352  auto add_stages(Iteration<Transformer,Predicate> & iteration_obj,
353  OtherTransformers && ... other_transform_ops)
354  {
355  return this->template add_stages<Input>(std::move(iteration_obj),
356  std::forward<OtherTransformers>(other_transform_ops)...);
357  }
358 
359 
364  template <typename Input, typename Transformer, typename Predicate,
365  template <typename T, typename P> class Iteration,
366  typename ... OtherTransformers,
367  requires_iteration<Iteration<Transformer,Predicate>> =0,
368  requires_no_pattern<Transformer> =0>
369  auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
370  OtherTransformers && ... other_transform_ops)
371  {
372  std::vector<std::unique_ptr<ff::ff_node>> workers;
373 
374  using iteration_type = Iteration<Transformer,Predicate>;
375  using worker_type = iteration_worker<Input,iteration_type>;
376  for (int i=0; i<nworkers_; ++i)
377  workers.push_back(
378  std::make_unique<worker_type>(
379  std::forward<iteration_type>(iteration_obj)));
380 
381  if (ordered_) {
382  using node_type = ff::ff_OFarm<Input>;
383  auto p_farm = std::make_unique<node_type>(std::move(workers));
384  add_node(std::move(p_farm));
385  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
386  }
387  else {
388  using node_type = ff::ff_Farm<Input>;
389  auto p_farm = std::make_unique<node_type>(std::move(workers));
390  add_node(std::move(p_farm));
391  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
392  }
393  }
394 
400  template <typename Input, typename Transformer, typename Predicate,
401  template <typename T, typename P> class Iteration,
402  typename ... OtherTransformers,
403  requires_iteration<Iteration<Transformer,Predicate>> =0,
404  requires_pipeline<Transformer> =0>
405  auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
406  OtherTransformers && ... other_transform_ops)
407  {
408  static_assert(!is_pipeline<Transformer>, "Not implemented");
409  }
410 
411  template <typename Input, typename Execution, typename Transformer,
412  template <typename, typename> class Context,
413  typename ... OtherTransformers,
414  requires_context<Context<Execution,Transformer>> = 0>
415  auto add_stages(Context<Execution,Transformer> & context_op,
416  OtherTransformers &&... other_ops)
417  {
418  return this->template add_stages<Input>(std::move(context_op),
419  std::forward<OtherTransformers>(other_ops)...);
420  }
421 
422  template <typename Input, typename Execution, typename Transformer,
423  template <typename, typename> class Context,
424  typename ... OtherTransformers,
425  requires_context<Context<Execution,Transformer>> = 0>
426  auto add_stages(Context<Execution,Transformer> && context_op,
427  OtherTransformers &&... other_ops)
428  {
429 
430  return this->template add_stages<Input>(context_op.transformer(),
431  std::forward<OtherTransformers>(other_ops)...);
432  }
433 
434 
435 private:
436 
437  int nworkers_;
438  bool ordered_;
439  std::vector<std::unique_ptr<ff_node>> nodes_;
440 
441  queue_mode queue_mode_ = queue_mode::blocking;
442 
443  constexpr static int default_queue_size = 100;
444  int queue_size_ = default_queue_size;
445 
446 };
447 
448 template <typename Generator, typename ... Transformers>
449 pipeline_impl::pipeline_impl(
450  int nworkers,
451  bool ordered,
452  Generator && gen_op,
453  Transformers && ... transform_ops)
454  :
455  nworkers_{nworkers},
456  ordered_{ordered},
457  nodes_{}
458 {
459  using result_type = std::decay_t<typename std::result_of<Generator()>::type>;
460  using generator_value_type = typename result_type::value_type;
461  using node_type = node_impl<void,generator_value_type,Generator>;
462 
463  auto first_stage = std::make_unique<node_type>(
464  std::forward<Generator>(gen_op));
465 
466  add_node(std::move(first_stage));
467 
468  add_stages<generator_value_type>(std::forward<Transformers>(transform_ops)...);
469 }
470 
471 
472 } // namespace detail_ff
473 
474 } // namespace grppi
475 
476 #else
477 
478 #endif // GRPPI_FF
479 
480 #endif
Definition: callable_traits.h:26
queue_mode
Definition: mpmc_queue.h:35
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of appliying a function on a input type.
Definition: patterns.h:110