16 #ifndef GRPPI_FF_DETAIL_PIPELINE_IMPL_H
17 #define GRPPI_FF_DETAIL_PIPELINE_IMPL_H
27 #include "../../common/mpmc_queue.h"
30 #include <ff/allocator.hpp>
31 #include <ff/pipeline.hpp>
32 #include <ff/farm.hpp>
41 class pipeline_impl :
public ff::ff_pipeline {
44 template <
typename Generator,
typename ... Transformers>
45 pipeline_impl(
int nworkers,
bool ordered, Generator && gen,
46 Transformers && ... transform_ops);
53 void set_queue_attributes(
int size,
queue_mode mode) noexcept {
65 mpmc_queue<T> make_queue()
const {
66 return {queue_size_, queue_mode_};
69 void add_node(std::unique_ptr<ff_node> && p_node) {
70 ff::ff_pipeline::add_stage(p_node.get());
71 nodes_.push_back(std::forward<std::unique_ptr<ff_node>>(p_node));
74 template <
typename Input,
typename Transformer,
75 requires_no_pattern<Transformer> = 0>
76 auto add_stages(Transformer &&stage)
78 using gen_value_type = std::decay_t<Input>;
79 using node_type = node_impl<gen_value_type,void,Transformer>;
81 auto p_stage = std::make_unique<node_type>(std::forward<Transformer>(stage));
82 add_node(std::move(p_stage));
85 template <
typename Input,
typename Transformer,
typename ... OtherTransformers,
86 requires_no_pattern<Transformer> = 0>
87 auto add_stages(Transformer && transform_op,
88 OtherTransformers && ... other_transform_ops)
90 static_assert(!std::is_void<Input>::value,
91 "Transformer must take non-void argument");
93 std::decay_t<
typename std::result_of<Transformer(Input)>::type>;
94 static_assert(!std::is_void<output_type>::value,
95 "Transformer must return a non-void result");
97 using node_type = node_impl<Input,output_type,Transformer>;
98 auto p_stage = std::make_unique<node_type>(
99 std::forward<Transformer>(transform_op));
101 add_node(std::move(p_stage));
102 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
105 template <
typename Input,
typename ... Transformers,
106 template <
typename...>
class Pipeline,
107 typename ... OtherTransformers,
109 auto add_stages(Pipeline<Transformers...> & pipeline_obj,
110 OtherTransformers && ... other_transform_ops)
112 return this->
template add_stages<Input>(std::move(pipeline_obj),
113 std::forward<OtherTransformers>(other_transform_ops)...);
116 template <
typename Input,
typename ... Transformers,
117 template <
typename...>
class Pipeline,
118 typename ... OtherTransformers,
120 auto add_stages(Pipeline<Transformers...> && pipeline_obj,
121 OtherTransformers && ... other_transform_ops)
123 return this->
template add_stages_nested<Input>(
125 pipeline_obj.transformers(),
126 std::forward_as_tuple(other_transform_ops...)
128 std::make_index_sequence<
sizeof...(Transformers)+
sizeof...(OtherTransformers)>());
131 template <
typename Input,
typename ... Transformers, std::size_t ... I>
132 auto add_stages_nested(std::tuple<Transformers...> && transform_ops,
133 std::index_sequence<I...>)
135 return add_stages<Input>(std::forward<Transformers>(std::get<I>(transform_ops))...);
138 template <
typename Input,
typename FarmTransformer,
139 template <
typename>
class Farm,
140 requires_farm<Farm<FarmTransformer>> = 0>
141 auto add_stages(Farm<FarmTransformer> & farm_obj)
143 return this->
template add_stages<Input>(std::move(farm_obj));
146 template <
typename Input,
typename FarmTransformer,
147 template <
typename>
class Farm,
148 requires_farm<Farm<FarmTransformer>> = 0>
149 auto add_stages(Farm<FarmTransformer> && farm_obj)
151 static_assert(!std::is_void<Input>::value,
152 "Farm must take non-void argument");
153 using output_type = std::decay_t<
typename std::result_of<
154 FarmTransformer(Input)>::type>;
156 using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
157 std::vector<std::unique_ptr<ff::ff_node>> workers;
158 for(
int i=0; i<nworkers_; ++i) {
159 workers.push_back(std::make_unique<worker_type>(
160 std::forward<Farm<FarmTransformer>>(farm_obj))
165 using node_type = ff::ff_OFarm<Input,output_type>;
166 auto p_farm = std::make_unique<node_type>(std::move(workers));
167 add_node(std::move(p_farm));
170 using node_type = ff::ff_Farm<Input,output_type>;
171 auto p_farm = std::make_unique<node_type>(std::move(workers));
172 add_node(std::move(p_farm));
177 template <
typename Input,
typename FarmTransformer,
178 template <
typename>
class Farm,
179 typename ... OtherTransformers,
180 requires_farm<Farm<FarmTransformer>> = 0>
181 auto add_stages(Farm<FarmTransformer> & farm_obj,
182 OtherTransformers && ... other_transform_ops)
184 return this->
template add_stages<Input>(std::move(farm_obj),
185 std::forward<OtherTransformers>(other_transform_ops)...);
189 template <
typename Input,
typename FarmTransformer,
190 template <
typename>
class Farm,
191 typename ... OtherTransformers,
192 requires_farm<Farm<FarmTransformer>> = 0>
193 auto add_stages( Farm<FarmTransformer> && farm_obj,
194 OtherTransformers && ... other_transform_ops)
196 static_assert(!std::is_void<Input>::value,
197 "Farm must take non-void argument");
199 std::decay_t<
typename std::result_of<FarmTransformer(Input)>::type>;
200 static_assert(!std::is_void<output_type>::value,
201 "Farm must return a non-void result");
203 using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
204 std::vector<std::unique_ptr<ff::ff_node>> workers;
206 for(
int i=0; i<nworkers_; ++i) {
207 workers.push_back(std::make_unique<worker_type>(
208 std::forward<Farm<FarmTransformer>>(farm_obj))
213 using node_type = ff::ff_OFarm<Input,output_type>;
214 auto p_farm = std::make_unique<node_type>(std::move(workers));
215 add_node(std::move(p_farm));
216 add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
219 using node_type = ff::ff_Farm<Input,output_type>;
220 auto p_farm = std::make_unique<node_type>(std::move(workers));
221 add_node(std::move(p_farm));
222 add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
227 template <
typename Input,
typename Predicate,
228 template <
typename>
class Filter,
229 requires_filter<Filter<Predicate>> = 0>
230 auto add_stages(Filter<Predicate> & filter_obj)
232 return this->
template add_stages<Input>(std::move(filter_obj));
236 template <
typename Input,
typename Predicate,
237 template <
typename>
class Filter,
238 requires_filter<Filter<Predicate>> = 0>
239 auto add_stages(Filter<Predicate> && filter_obj)
241 static_assert(!std::is_void<Input>::value,
242 "Filter must take non-void argument");
245 using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
246 auto p_farm = std::make_unique<node_type>(
247 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
248 add_node(std::move(p_farm));
251 using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
252 auto p_farm = std::make_unique<node_type>(
253 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
254 add_node(std::move(p_farm));
259 template <
typename Input,
typename Predicate,
260 template <
typename>
class Filter,
261 typename ... OtherTransformers,
262 requires_filter<Filter<Predicate>> = 0>
263 auto add_stages(Filter<Predicate> & filter_obj,
264 OtherTransformers && ... other_transform_ops)
266 return this->
template add_stages<Input>(std::move(filter_obj),
267 std::forward<OtherTransformers>(other_transform_ops)...);
271 template <
typename Input,
typename Predicate,
272 template <
typename>
class Filter,
273 typename ... OtherTransformers,
274 requires_filter<Filter<Predicate>> = 0>
275 auto add_stages(Filter<Predicate> && filter_obj,
276 OtherTransformers && ... other_transform_ops)
278 static_assert(!std::is_void<Input>::value,
279 "Filter must take non-void argument");
282 using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
283 auto p_farm = std::make_unique<node_type>(
284 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
285 add_node(std::move(p_farm));
286 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
289 using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
290 auto p_farm = std::make_unique<node_type>(
291 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
292 add_node(std::move(p_farm));
293 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
297 template <
typename Input,
typename Combiner,
typename Identity,
298 template <
typename C,
typename I>
class Reduce,
299 typename ... OtherTransformers,
300 requires_reduce<Reduce<Combiner,Identity>> = 0>
301 auto add_stages(Reduce<Combiner,Identity> & reduce_obj,
302 OtherTransformers && ... other_transform_ops)
304 return this->
template add_stages<Input>(std::move(reduce_obj),
305 std::forward<OtherTransformers>(other_transform_ops)...);
308 template <
typename Input,
typename Combiner,
typename Identity,
309 template <
typename C,
typename I>
class Reduce,
310 typename ... OtherTransformers,
311 requires_reduce<Reduce<Combiner,Identity>> = 0>
312 auto add_stages(Reduce<Combiner,Identity> && reduce_obj,
313 OtherTransformers && ... other_transform_ops)
315 static_assert(!std::is_void<Input>::value,
316 "Reduce must take non-void argument");
319 using reducer_type = Reduce<Combiner,Identity>;
320 using node_type = ordered_stream_reduce<Input,reducer_type,Combiner>;
321 auto p_farm = std::make_unique<node_type>(
322 std::forward<reducer_type>(reduce_obj),
324 add_node(std::move(p_farm));
325 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
328 using reducer_type = Reduce<Combiner,Identity>;
329 using node_type = unordered_stream_reduce<Input,reducer_type,Combiner>;
330 auto p_farm = std::make_unique<node_type>(
331 std::forward<Reduce<Combiner,Identity>>(reduce_obj),
333 add_node(std::move(p_farm));
334 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
342 template <
typename Input,
typename Transformer,
typename Predicate,
343 template <
typename T,
typename P>
class Iteration,
344 typename ... OtherTransformers,
345 requires_iteration<Iteration<Transformer,Predicate>> =0,
346 requires_no_pattern<Transformer> =0>
347 auto add_stages(Iteration<Transformer,Predicate> & iteration_obj,
348 OtherTransformers && ... other_transform_ops)
350 return this->
template add_stages<Input>(std::move(iteration_obj),
351 std::forward<OtherTransformers>(other_transform_ops)...);
359 template <
typename Input,
typename Transformer,
typename Predicate,
360 template <
typename T,
typename P>
class Iteration,
361 typename ... OtherTransformers,
362 requires_iteration<Iteration<Transformer,Predicate>> =0,
363 requires_no_pattern<Transformer> =0>
364 auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
365 OtherTransformers && ... other_transform_ops)
367 std::vector<std::unique_ptr<ff::ff_node>> workers;
369 using iteration_type = Iteration<Transformer,Predicate>;
370 using worker_type = iteration_worker<Input,iteration_type>;
371 for (
int i=0; i<nworkers_; ++i)
373 std::make_unique<worker_type>(
374 std::forward<iteration_type>(iteration_obj)));
377 using node_type = ff::ff_OFarm<Input>;
378 auto p_farm = std::make_unique<node_type>(std::move(workers));
379 add_node(std::move(p_farm));
380 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
383 using node_type = ff::ff_Farm<Input>;
384 auto p_farm = std::make_unique<node_type>(std::move(workers));
385 add_node(std::move(p_farm));
386 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
395 template <
typename Input,
typename Transformer,
typename Predicate,
396 template <
typename T,
typename P>
class Iteration,
397 typename ... OtherTransformers,
398 requires_iteration<Iteration<Transformer,Predicate>> =0,
399 requires_pipeline<Transformer> =0>
400 auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
401 OtherTransformers && ... other_transform_ops)
403 static_assert(!is_pipeline<Transformer>,
"Not implemented");
406 template <
typename Input,
typename Execution,
typename Transformer,
407 template <
typename,
typename>
class Context,
408 typename ... OtherTransformers,
409 requires_context<Context<Execution,Transformer>> = 0>
410 auto add_stages(Context<Execution,Transformer> & context_op,
411 OtherTransformers &&... other_ops)
413 return this->
template add_stages<Input>(std::move(context_op),
414 std::forward<OtherTransformers>(other_ops)...);
417 template <
typename Input,
typename Execution,
typename Transformer,
418 template <
typename,
typename>
class Context,
419 typename ... OtherTransformers,
420 requires_context<Context<Execution,Transformer>> = 0>
421 auto add_stages(Context<Execution,Transformer> && context_op,
422 OtherTransformers &&... other_ops)
425 return this->
template add_stages<Input>(context_op.transformer(),
426 std::forward<OtherTransformers>(other_ops)...);
434 std::vector<std::unique_ptr<ff_node>> nodes_;
438 constexpr
static int default_queue_size = 100;
439 int queue_size_ = default_queue_size;
443 template <
typename Generator,
typename ... Transformers>
444 pipeline_impl::pipeline_impl(
448 Transformers && ... transform_ops)
454 using result_type = std::decay_t<
typename std::result_of<Generator()>::type>;
455 using generator_value_type =
typename result_type::value_type;
456 using node_type = node_impl<void,generator_value_type,Generator>;
458 auto first_stage = std::make_unique<node_type>(
459 std::forward<Generator>(gen_op));
461 add_node(std::move(first_stage));
463 add_stages<generator_value_type>(std::forward<Transformers>(transform_ops)...);
Definition: callable_traits.h:21
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of applying a function on a input type.
Definition: patterns.h:105
queue_mode
Definition: mpmc_queue.h:30