GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
pipeline_impl.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_FF_DETAIL_PIPELINE_IMPL_H
17 #define GRPPI_FF_DETAIL_PIPELINE_IMPL_H
18 
19 #ifdef GRPPI_FF
20 
21 #include "simple_node.h"
22 #include "ordered_stream_reduce.h"
24 #include "ordered_stream_filter.h"
26 #include "iteration_nodes.h"
27 #include "../../common/mpmc_queue.h"
28 
29 
30 #include <ff/allocator.hpp>
31 #include <ff/pipeline.hpp>
32 #include <ff/farm.hpp>
33 
34 #include <atomic>
35 
36 namespace grppi {
37 
38 namespace detail_ff {
39 
40 
41 class pipeline_impl : public ff::ff_pipeline {
42 public:
43 
44  template <typename Generator, typename ... Transformers>
45  pipeline_impl(int nworkers, bool ordered, Generator && gen,
46  Transformers && ... transform_ops);
47 
48 private:
49 
53  void set_queue_attributes(int size, queue_mode mode) noexcept {
54  queue_size_ = size;
55  queue_mode_ = mode;
56  }
57 
64  template <typename T>
65  mpmc_queue<T> make_queue() const {
66  return {queue_size_, queue_mode_};
67  }
68 
69  void add_node(std::unique_ptr<ff_node> && p_node) {
70  ff::ff_pipeline::add_stage(p_node.get());
71  nodes_.push_back(std::forward<std::unique_ptr<ff_node>>(p_node));
72  }
73 
74  template <typename Input, typename Transformer,
75  requires_no_pattern<Transformer> = 0>
76  auto add_stages(Transformer &&stage)
77  {
78  using gen_value_type = std::decay_t<Input>;
79  using node_type = node_impl<gen_value_type,void,Transformer>;
80 
81  auto p_stage = std::make_unique<node_type>(std::forward<Transformer>(stage));
82  add_node(std::move(p_stage));
83  }
84 
85  template <typename Input, typename Transformer, typename ... OtherTransformers,
86  requires_no_pattern<Transformer> = 0>
87  auto add_stages(Transformer && transform_op,
88  OtherTransformers && ... other_transform_ops)
89  {
90  static_assert(!std::is_void<Input>::value,
91  "Transformer must take non-void argument");
92  using output_type =
93  std::decay_t<typename std::result_of<Transformer(Input)>::type>;
94  static_assert(!std::is_void<output_type>::value,
95  "Transformer must return a non-void result");
96 
97  using node_type = node_impl<Input,output_type,Transformer>;
98  auto p_stage = std::make_unique<node_type>(
99  std::forward<Transformer>(transform_op));
100 
101  add_node(std::move(p_stage));
102  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
103  }
104 
105  template <typename Input, typename ... Transformers,
106  template <typename...> class Pipeline,
107  typename ... OtherTransformers,
108  requires_pipeline<Pipeline<Transformers...>> = 0>
109  auto add_stages(Pipeline<Transformers...> & pipeline_obj,
110  OtherTransformers && ... other_transform_ops)
111  {
112  return this->template add_stages<Input>(std::move(pipeline_obj),
113  std::forward<OtherTransformers>(other_transform_ops)...);
114  }
115 
116  template <typename Input, typename ... Transformers,
117  template <typename...> class Pipeline,
118  typename ... OtherTransformers,
119  requires_pipeline<Pipeline<Transformers...>> = 0>
120  auto add_stages(Pipeline<Transformers...> && pipeline_obj,
121  OtherTransformers && ... other_transform_ops)
122  {
123  return this->template add_stages_nested<Input>(
124  std::tuple_cat(
125  pipeline_obj.transformers(),
126  std::forward_as_tuple(other_transform_ops...)
127  ),
128  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
129  }
130 
131  template <typename Input, typename ... Transformers, std::size_t ... I>
132  auto add_stages_nested(std::tuple<Transformers...> && transform_ops,
133  std::index_sequence<I...>)
134  {
135  return add_stages<Input>(std::forward<Transformers>(std::get<I>(transform_ops))...);
136  }
137 
138  template <typename Input, typename FarmTransformer,
139  template <typename> class Farm,
140  requires_farm<Farm<FarmTransformer>> = 0>
141  auto add_stages(Farm<FarmTransformer> & farm_obj)
142  {
143  return this->template add_stages<Input>(std::move(farm_obj));
144  }
145 
146  template <typename Input, typename FarmTransformer,
147  template <typename> class Farm,
148  requires_farm<Farm<FarmTransformer>> = 0>
149  auto add_stages(Farm<FarmTransformer> && farm_obj)
150  {
151  static_assert(!std::is_void<Input>::value,
152  "Farm must take non-void argument");
153  using output_type = std::decay_t<typename std::result_of<
154  FarmTransformer(Input)>::type>;
155 
156  using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
157  std::vector<std::unique_ptr<ff::ff_node>> workers;
158  for(int i=0; i<nworkers_; ++i) {
159  workers.push_back(std::make_unique<worker_type>(
160  std::forward<Farm<FarmTransformer>>(farm_obj))
161  );
162  }
163 
164  if(ordered_) {
165  using node_type = ff::ff_OFarm<Input,output_type>;
166  auto p_farm = std::make_unique<node_type>(std::move(workers));
167  add_node(std::move(p_farm));
168  }
169  else {
170  using node_type = ff::ff_Farm<Input,output_type>;
171  auto p_farm = std::make_unique<node_type>(std::move(workers));
172  add_node(std::move(p_farm));
173  }
174  }
175 
176  // parallel stage -- Farm pattern ref with variadic
177  template <typename Input, typename FarmTransformer,
178  template <typename> class Farm,
179  typename ... OtherTransformers,
180  requires_farm<Farm<FarmTransformer>> = 0>
181  auto add_stages(Farm<FarmTransformer> & farm_obj,
182  OtherTransformers && ... other_transform_ops)
183  {
184  return this->template add_stages<Input>(std::move(farm_obj),
185  std::forward<OtherTransformers>(other_transform_ops)...);
186  }
187 
188  // parallel stage -- Farm pattern with variadic
189  template <typename Input, typename FarmTransformer,
190  template <typename> class Farm,
191  typename ... OtherTransformers,
192  requires_farm<Farm<FarmTransformer>> = 0>
193  auto add_stages( Farm<FarmTransformer> && farm_obj,
194  OtherTransformers && ... other_transform_ops)
195  {
196  static_assert(!std::is_void<Input>::value,
197  "Farm must take non-void argument");
198  using output_type =
199  std::decay_t<typename std::result_of<FarmTransformer(Input)>::type>;
200  static_assert(!std::is_void<output_type>::value,
201  "Farm must return a non-void result");
202 
203  using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
204  std::vector<std::unique_ptr<ff::ff_node>> workers;
205 
206  for(int i=0; i<nworkers_; ++i) {
207  workers.push_back(std::make_unique<worker_type>(
208  std::forward<Farm<FarmTransformer>>(farm_obj))
209  );
210  }
211 
212  if(ordered_) {
213  using node_type = ff::ff_OFarm<Input,output_type>;
214  auto p_farm = std::make_unique<node_type>(std::move(workers));
215  add_node(std::move(p_farm));
216  add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
217  }
218  else {
219  using node_type = ff::ff_Farm<Input,output_type>;
220  auto p_farm = std::make_unique<node_type>(std::move(workers));
221  add_node(std::move(p_farm));
222  add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
223  }
224  }
225 
226  // parallel stage -- Filter pattern ref
227  template <typename Input, typename Predicate,
228  template <typename> class Filter,
229  requires_filter<Filter<Predicate>> = 0>
230  auto add_stages(Filter<Predicate> & filter_obj)
231  {
232  return this->template add_stages<Input>(std::move(filter_obj));
233  }
234 
235  // parallel stage -- Filter pattern
236  template <typename Input, typename Predicate,
237  template <typename> class Filter,
238  requires_filter<Filter<Predicate>> = 0>
239  auto add_stages(Filter<Predicate> && filter_obj)
240 {
241  static_assert(!std::is_void<Input>::value,
242  "Filter must take non-void argument");
243 
244  if(ordered_) {
245  using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
246  auto p_farm = std::make_unique<node_type>(
247  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
248  add_node(std::move(p_farm));
249  }
250  else {
251  using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
252  auto p_farm = std::make_unique<node_type>(
253  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
254  add_node(std::move(p_farm));
255  }
256  }
257 
258  // parallel stage -- Filter pattern ref with variadics
259  template <typename Input, typename Predicate,
260  template <typename> class Filter,
261  typename ... OtherTransformers,
262  requires_filter<Filter<Predicate>> = 0>
263  auto add_stages(Filter<Predicate> & filter_obj,
264  OtherTransformers && ... other_transform_ops)
265  {
266  return this->template add_stages<Input>(std::move(filter_obj),
267  std::forward<OtherTransformers>(other_transform_ops)...);
268  }
269 
270  // parallel stage -- Filter pattern with variadics
271  template <typename Input, typename Predicate,
272  template <typename> class Filter,
273  typename ... OtherTransformers,
274  requires_filter<Filter<Predicate>> = 0>
275  auto add_stages(Filter<Predicate> && filter_obj,
276  OtherTransformers && ... other_transform_ops)
277  {
278  static_assert(!std::is_void<Input>::value,
279  "Filter must take non-void argument");
280 
281  if(ordered_) {
282  using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
283  auto p_farm = std::make_unique<node_type>(
284  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
285  add_node(std::move(p_farm));
286  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
287  }
288  else {
289  using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
290  auto p_farm = std::make_unique<node_type>(
291  std::forward<Filter<Predicate>>(filter_obj), nworkers_);
292  add_node(std::move(p_farm));
293  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
294  }
295  }
296 
297  template <typename Input, typename Combiner, typename Identity,
298  template <typename C, typename I> class Reduce,
299  typename ... OtherTransformers,
300  requires_reduce<Reduce<Combiner,Identity>> = 0>
301  auto add_stages(Reduce<Combiner,Identity> & reduce_obj,
302  OtherTransformers && ... other_transform_ops)
303  {
304  return this->template add_stages<Input>(std::move(reduce_obj),
305  std::forward<OtherTransformers>(other_transform_ops)...);
306  }
307 
308  template <typename Input, typename Combiner, typename Identity,
309  template <typename C, typename I> class Reduce,
310  typename ... OtherTransformers,
311  requires_reduce<Reduce<Combiner,Identity>> = 0>
312  auto add_stages(Reduce<Combiner,Identity> && reduce_obj,
313  OtherTransformers && ... other_transform_ops)
314  {
315  static_assert(!std::is_void<Input>::value,
316  "Reduce must take non-void argument");
317 
318  if(ordered_) {
319  using reducer_type = Reduce<Combiner,Identity>;
320  using node_type = ordered_stream_reduce<Input,reducer_type,Combiner>;
321  auto p_farm = std::make_unique<node_type>(
322  std::forward<reducer_type>(reduce_obj),
323  nworkers_);
324  add_node(std::move(p_farm));
325  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
326  }
327  else {
328  using reducer_type = Reduce<Combiner,Identity>;
329  using node_type = unordered_stream_reduce<Input,reducer_type,Combiner>;
330  auto p_farm = std::make_unique<node_type>(
331  std::forward<Reduce<Combiner,Identity>>(reduce_obj),
332  nworkers_);
333  add_node(std::move(p_farm));
334  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
335  }
336  }
337 
342  template <typename Input, typename Transformer, typename Predicate,
343  template <typename T, typename P> class Iteration,
344  typename ... OtherTransformers,
345  requires_iteration<Iteration<Transformer,Predicate>> =0,
346  requires_no_pattern<Transformer> =0>
347  auto add_stages(Iteration<Transformer,Predicate> & iteration_obj,
348  OtherTransformers && ... other_transform_ops)
349  {
350  return this->template add_stages<Input>(std::move(iteration_obj),
351  std::forward<OtherTransformers>(other_transform_ops)...);
352  }
353 
354 
359  template <typename Input, typename Transformer, typename Predicate,
360  template <typename T, typename P> class Iteration,
361  typename ... OtherTransformers,
362  requires_iteration<Iteration<Transformer,Predicate>> =0,
363  requires_no_pattern<Transformer> =0>
364  auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
365  OtherTransformers && ... other_transform_ops)
366  {
367  std::vector<std::unique_ptr<ff::ff_node>> workers;
368 
369  using iteration_type = Iteration<Transformer,Predicate>;
370  using worker_type = iteration_worker<Input,iteration_type>;
371  for (int i=0; i<nworkers_; ++i)
372  workers.push_back(
373  std::make_unique<worker_type>(
374  std::forward<iteration_type>(iteration_obj)));
375 
376  if (ordered_) {
377  using node_type = ff::ff_OFarm<Input>;
378  auto p_farm = std::make_unique<node_type>(std::move(workers));
379  add_node(std::move(p_farm));
380  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
381  }
382  else {
383  using node_type = ff::ff_Farm<Input>;
384  auto p_farm = std::make_unique<node_type>(std::move(workers));
385  add_node(std::move(p_farm));
386  add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
387  }
388  }
389 
395  template <typename Input, typename Transformer, typename Predicate,
396  template <typename T, typename P> class Iteration,
397  typename ... OtherTransformers,
398  requires_iteration<Iteration<Transformer,Predicate>> =0,
399  requires_pipeline<Transformer> =0>
400  auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
401  OtherTransformers && ... other_transform_ops)
402  {
403  static_assert(!is_pipeline<Transformer>, "Not implemented");
404  }
405 
406  template <typename Input, typename Execution, typename Transformer,
407  template <typename, typename> class Context,
408  typename ... OtherTransformers,
409  requires_context<Context<Execution,Transformer>> = 0>
410  auto add_stages(Context<Execution,Transformer> & context_op,
411  OtherTransformers &&... other_ops)
412  {
413  return this->template add_stages<Input>(std::move(context_op),
414  std::forward<OtherTransformers>(other_ops)...);
415  }
416 
417  template <typename Input, typename Execution, typename Transformer,
418  template <typename, typename> class Context,
419  typename ... OtherTransformers,
420  requires_context<Context<Execution,Transformer>> = 0>
421  auto add_stages(Context<Execution,Transformer> && context_op,
422  OtherTransformers &&... other_ops)
423  {
424 
425  return this->template add_stages<Input>(context_op.transformer(),
426  std::forward<OtherTransformers>(other_ops)...);
427  }
428 
429 
430 private:
431 
432  int nworkers_;
433  bool ordered_;
434  std::vector<std::unique_ptr<ff_node>> nodes_;
435 
436  queue_mode queue_mode_ = queue_mode::blocking;
437 
438  constexpr static int default_queue_size = 100;
439  int queue_size_ = default_queue_size;
440 
441 };
442 
443 template <typename Generator, typename ... Transformers>
444 pipeline_impl::pipeline_impl(
445  int nworkers,
446  bool ordered,
447  Generator && gen_op,
448  Transformers && ... transform_ops)
449  :
450  nworkers_{nworkers},
451  ordered_{ordered},
452  nodes_{}
453 {
454  using result_type = std::decay_t<typename std::result_of<Generator()>::type>;
455  using generator_value_type = typename result_type::value_type;
456  using node_type = node_impl<void,generator_value_type,Generator>;
457 
458  auto first_stage = std::make_unique<node_type>(
459  std::forward<Generator>(gen_op));
460 
461  add_node(std::move(first_stage));
462 
463  add_stages<generator_value_type>(std::forward<Transformers>(transform_ops)...);
464 }
465 
466 
467 } // namespace detail_ff
468 
469 } // namespace grppi
470 
471 #else
472 
473 #endif // GRPPI_FF
474 
475 #endif
Definition: callable_traits.h:21
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of applying a function on a input type.
Definition: patterns.h:105
queue_mode
Definition: mpmc_queue.h:30