21 #ifndef GRPPI_FF_DETAIL_PIPELINE_IMPL_H 22 #define GRPPI_FF_DETAIL_PIPELINE_IMPL_H 32 #include "../../common/mpmc_queue.h" 35 #include <ff/allocator.hpp> 36 #include <ff/pipeline.hpp> 37 #include <ff/farm.hpp> 46 class pipeline_impl :
public ff::ff_pipeline {
49 template <
typename Generator,
typename ... Transformers>
50 pipeline_impl(
int nworkers,
bool ordered, Generator && gen,
51 Transformers && ... transform_ops);
58 void set_queue_attributes(
int size,
queue_mode mode) noexcept {
70 mpmc_queue<T> make_queue()
const {
71 return {queue_size_, queue_mode_};
74 void add_node(std::unique_ptr<ff_node> && p_node) {
75 ff::ff_pipeline::add_stage(p_node.get());
76 nodes_.push_back(std::forward<std::unique_ptr<ff_node>>(p_node));
79 template <
typename Input,
typename Transformer,
80 requires_no_pattern<Transformer> = 0>
81 auto add_stages(Transformer &&stage)
83 using gen_value_type = std::decay_t<Input>;
84 using node_type = node_impl<gen_value_type,void,Transformer>;
86 auto p_stage = std::make_unique<node_type>(std::forward<Transformer>(stage));
87 add_node(std::move(p_stage));
90 template <
typename Input,
typename Transformer,
typename ... OtherTransformers,
91 requires_no_pattern<Transformer> = 0>
92 auto add_stages(Transformer && transform_op,
93 OtherTransformers && ... other_transform_ops)
95 static_assert(!std::is_void<Input>::value,
96 "Transformer must take non-void argument");
98 std::decay_t<typename std::result_of<Transformer(Input)>::type>;
99 static_assert(!std::is_void<output_type>::value,
100 "Transformer must return a non-void result");
102 using node_type = node_impl<Input,output_type,Transformer>;
103 auto p_stage = std::make_unique<node_type>(
104 std::forward<Transformer>(transform_op));
106 add_node(std::move(p_stage));
107 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
110 template <
typename Input,
typename ... Transformers,
111 template <
typename...>
class Pipeline,
112 typename ... OtherTransformers,
114 auto add_stages(Pipeline<Transformers...> & pipeline_obj,
115 OtherTransformers && ... other_transform_ops)
117 return this->
template add_stages<Input>(std::move(pipeline_obj),
118 std::forward<OtherTransformers>(other_transform_ops)...);
121 template <
typename Input,
typename ... Transformers,
122 template <
typename...>
class Pipeline,
123 typename ... OtherTransformers,
125 auto add_stages(Pipeline<Transformers...> && pipeline_obj,
126 OtherTransformers && ... other_transform_ops)
128 return this->
template add_stages_nested<Input>(
130 pipeline_obj.transformers(),
131 std::forward_as_tuple(other_transform_ops...)
133 std::make_index_sequence<
sizeof...(Transformers)+
sizeof...(OtherTransformers)>());
136 template <
typename Input,
typename ... Transformers, std::size_t ... I>
137 auto add_stages_nested(std::tuple<Transformers...> && transform_ops,
138 std::index_sequence<I...>)
140 return add_stages<Input>(std::forward<Transformers>(std::get<I>(transform_ops))...);
143 template <
typename Input,
typename FarmTransformer,
144 template <
typename>
class Farm,
145 requires_farm<Farm<FarmTransformer>> = 0>
146 auto add_stages(Farm<FarmTransformer> & farm_obj)
148 return this->
template add_stages<Input>(std::move(farm_obj));
151 template <
typename Input,
typename FarmTransformer,
152 template <
typename>
class Farm,
153 requires_farm<Farm<FarmTransformer>> = 0>
154 auto add_stages(Farm<FarmTransformer> && farm_obj)
156 static_assert(!std::is_void<Input>::value,
157 "Farm must take non-void argument");
158 using output_type = std::decay_t<
typename std::result_of<
159 FarmTransformer(Input)>::type>;
161 using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
162 std::vector<std::unique_ptr<ff::ff_node>> workers;
163 for(
int i=0; i<nworkers_; ++i) {
164 workers.push_back(std::make_unique<worker_type>(
165 std::forward<Farm<FarmTransformer>>(farm_obj))
170 using node_type = ff::ff_OFarm<Input,output_type>;
171 auto p_farm = std::make_unique<node_type>(std::move(workers));
172 add_node(std::move(p_farm));
175 using node_type = ff::ff_Farm<Input,output_type>;
176 auto p_farm = std::make_unique<node_type>(std::move(workers));
177 add_node(std::move(p_farm));
182 template <
typename Input,
typename FarmTransformer,
183 template <
typename>
class Farm,
184 typename ... OtherTransformers,
185 requires_farm<Farm<FarmTransformer>> = 0>
186 auto add_stages(Farm<FarmTransformer> & farm_obj,
187 OtherTransformers && ... other_transform_ops)
189 return this->
template add_stages<Input>(std::move(farm_obj),
190 std::forward<OtherTransformers>(other_transform_ops)...);
194 template <
typename Input,
typename FarmTransformer,
195 template <
typename>
class Farm,
196 typename ... OtherTransformers,
197 requires_farm<Farm<FarmTransformer>> = 0>
198 auto add_stages( Farm<FarmTransformer> && farm_obj,
199 OtherTransformers && ... other_transform_ops)
201 static_assert(!std::is_void<Input>::value,
202 "Farm must take non-void argument");
204 std::decay_t<typename std::result_of<FarmTransformer(Input)>::type>;
205 static_assert(!std::is_void<output_type>::value,
206 "Farm must return a non-void result");
208 using worker_type = node_impl<Input,output_type,Farm<FarmTransformer>>;
209 std::vector<std::unique_ptr<ff::ff_node>> workers;
211 for(
int i=0; i<nworkers_; ++i) {
212 workers.push_back(std::make_unique<worker_type>(
213 std::forward<Farm<FarmTransformer>>(farm_obj))
218 using node_type = ff::ff_OFarm<Input,output_type>;
219 auto p_farm = std::make_unique<node_type>(std::move(workers));
220 add_node(std::move(p_farm));
221 add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
224 using node_type = ff::ff_Farm<Input,output_type>;
225 auto p_farm = std::make_unique<node_type>(std::move(workers));
226 add_node(std::move(p_farm));
227 add_stages<output_type>(std::forward<OtherTransformers>(other_transform_ops)...);
232 template <
typename Input,
typename Predicate,
233 template <
typename>
class Filter,
234 requires_filter<Filter<Predicate>> = 0>
235 auto add_stages(Filter<Predicate> & filter_obj)
237 return this->
template add_stages<Input>(std::move(filter_obj));
241 template <
typename Input,
typename Predicate,
242 template <
typename>
class Filter,
243 requires_filter<Filter<Predicate>> = 0>
244 auto add_stages(Filter<Predicate> && filter_obj)
246 static_assert(!std::is_void<Input>::value,
247 "Filter must take non-void argument");
250 using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
251 auto p_farm = std::make_unique<node_type>(
252 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
253 add_node(std::move(p_farm));
256 using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
257 auto p_farm = std::make_unique<node_type>(
258 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
259 add_node(std::move(p_farm));
264 template <
typename Input,
typename Predicate,
265 template <
typename>
class Filter,
266 typename ... OtherTransformers,
267 requires_filter<Filter<Predicate>> = 0>
268 auto add_stages(Filter<Predicate> & filter_obj,
269 OtherTransformers && ... other_transform_ops)
271 return this->
template add_stages<Input>(std::move(filter_obj),
272 std::forward<OtherTransformers>(other_transform_ops)...);
276 template <
typename Input,
typename Predicate,
277 template <
typename>
class Filter,
278 typename ... OtherTransformers,
279 requires_filter<Filter<Predicate>> = 0>
280 auto add_stages(Filter<Predicate> && filter_obj,
281 OtherTransformers && ... other_transform_ops)
283 static_assert(!std::is_void<Input>::value,
284 "Filter must take non-void argument");
287 using node_type = ordered_stream_filter<Input,Filter<Predicate>>;
288 auto p_farm = std::make_unique<node_type>(
289 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
290 add_node(std::move(p_farm));
291 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
294 using node_type = unordered_stream_filter<Input,Filter<Predicate>>;
295 auto p_farm = std::make_unique<node_type>(
296 std::forward<Filter<Predicate>>(filter_obj), nworkers_);
297 add_node(std::move(p_farm));
298 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
302 template <
typename Input,
typename Combiner,
typename Identity,
303 template <
typename C,
typename I>
class Reduce,
304 typename ... OtherTransformers,
305 requires_reduce<Reduce<Combiner,Identity>> = 0>
306 auto add_stages(Reduce<Combiner,Identity> & reduce_obj,
307 OtherTransformers && ... other_transform_ops)
309 return this->
template add_stages<Input>(std::move(reduce_obj),
310 std::forward<OtherTransformers>(other_transform_ops)...);
313 template <
typename Input,
typename Combiner,
typename Identity,
314 template <
typename C,
typename I>
class Reduce,
315 typename ... OtherTransformers,
316 requires_reduce<Reduce<Combiner,Identity>> = 0>
317 auto add_stages(Reduce<Combiner,Identity> && reduce_obj,
318 OtherTransformers && ... other_transform_ops)
320 static_assert(!std::is_void<Input>::value,
321 "Reduce must take non-void argument");
324 using reducer_type = Reduce<Combiner,Identity>;
325 using node_type = ordered_stream_reduce<Input,reducer_type,Combiner>;
326 auto p_farm = std::make_unique<node_type>(
327 std::forward<reducer_type>(reduce_obj),
329 add_node(std::move(p_farm));
330 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
333 using reducer_type = Reduce<Combiner,Identity>;
334 using node_type = unordered_stream_reduce<Input,reducer_type,Combiner>;
335 auto p_farm = std::make_unique<node_type>(
336 std::forward<Reduce<Combiner,Identity>>(reduce_obj),
338 add_node(std::move(p_farm));
339 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
347 template <
typename Input,
typename Transformer,
typename Predicate,
348 template <
typename T,
typename P>
class Iteration,
349 typename ... OtherTransformers,
350 requires_iteration<Iteration<Transformer,Predicate>> =0,
351 requires_no_pattern<Transformer> =0>
352 auto add_stages(Iteration<Transformer,Predicate> & iteration_obj,
353 OtherTransformers && ... other_transform_ops)
355 return this->
template add_stages<Input>(std::move(iteration_obj),
356 std::forward<OtherTransformers>(other_transform_ops)...);
364 template <
typename Input,
typename Transformer,
typename Predicate,
365 template <
typename T,
typename P>
class Iteration,
366 typename ... OtherTransformers,
367 requires_iteration<Iteration<Transformer,Predicate>> =0,
368 requires_no_pattern<Transformer> =0>
369 auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
370 OtherTransformers && ... other_transform_ops)
372 std::vector<std::unique_ptr<ff::ff_node>> workers;
374 using iteration_type = Iteration<Transformer,Predicate>;
375 using worker_type = iteration_worker<Input,iteration_type>;
376 for (
int i=0; i<nworkers_; ++i)
378 std::make_unique<worker_type>(
379 std::forward<iteration_type>(iteration_obj)));
382 using node_type = ff::ff_OFarm<Input>;
383 auto p_farm = std::make_unique<node_type>(std::move(workers));
384 add_node(std::move(p_farm));
385 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
388 using node_type = ff::ff_Farm<Input>;
389 auto p_farm = std::make_unique<node_type>(std::move(workers));
390 add_node(std::move(p_farm));
391 add_stages<Input>(std::forward<OtherTransformers>(other_transform_ops)...);
400 template <
typename Input,
typename Transformer,
typename Predicate,
401 template <
typename T,
typename P>
class Iteration,
402 typename ... OtherTransformers,
403 requires_iteration<Iteration<Transformer,Predicate>> =0,
404 requires_pipeline<Transformer> =0>
405 auto add_stages(Iteration<Transformer,Predicate> && iteration_obj,
406 OtherTransformers && ... other_transform_ops)
408 static_assert(!is_pipeline<Transformer>,
"Not implemented");
411 template <
typename Input,
typename Execution,
typename Transformer,
412 template <
typename,
typename>
class Context,
413 typename ... OtherTransformers,
414 requires_context<Context<Execution,Transformer>> = 0>
415 auto add_stages(Context<Execution,Transformer> & context_op,
416 OtherTransformers &&... other_ops)
418 return this->
template add_stages<Input>(std::move(context_op),
419 std::forward<OtherTransformers>(other_ops)...);
422 template <
typename Input,
typename Execution,
typename Transformer,
423 template <
typename,
typename>
class Context,
424 typename ... OtherTransformers,
425 requires_context<Context<Execution,Transformer>> = 0>
426 auto add_stages(Context<Execution,Transformer> && context_op,
427 OtherTransformers &&... other_ops)
430 return this->
template add_stages<Input>(context_op.transformer(),
431 std::forward<OtherTransformers>(other_ops)...);
439 std::vector<std::unique_ptr<ff_node>> nodes_;
443 constexpr
static int default_queue_size = 100;
444 int queue_size_ = default_queue_size;
448 template <
typename Generator,
typename ... Transformers>
449 pipeline_impl::pipeline_impl(
453 Transformers && ... transform_ops)
459 using result_type = std::decay_t<typename std::result_of<Generator()>::type>;
460 using generator_value_type =
typename result_type::value_type;
461 using node_type = node_impl<void,generator_value_type,Generator>;
463 auto first_stage = std::make_unique<node_type>(
464 std::forward<Generator>(gen_op));
466 add_node(std::move(first_stage));
468 add_stages<generator_value_type>(std::forward<Transformers>(transform_ops)...);
Definition: callable_traits.h:26
queue_mode
Definition: mpmc_queue.h:35
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:111
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of appliying a function on a input type.
Definition: patterns.h:110