21 #ifndef GRPPI_SEQ_PIPELINE_H 22 #define GRPPI_SEQ_PIPELINE_H 26 #include "../common/pack_traits.h" 27 #include "../common/patterns.h" 31 template <
typename Input,
int Index,
typename ... MoreTransformers,
36 MoreTransformers...> & pipeline_obj)
38 return std::get<Index>(pipeline_obj.stages)(in);
41 template <
typename Input,
int Index,
typename ... MoreTransformers,
46 MoreTransformers...> & pipeline_obj)
49 std::get<Index>(pipeline_obj.stages)(in),
54 template <
typename Item,
typename Transformer>
56 Transformer && transform_op)
58 transform_op(std::forward<Item>(item));
62 template <
typename Item,
typename Filter,
typename... MoreTransformers>
65 MoreTransformers && ... more_transform_ops)
69 std::forward<MoreTransformers>(more_transform_ops)...);
72 template <
typename Item,
typename Transformer,
typename... MoreTransformers>
75 MoreTransformers && ... more_transform_ops)
77 if(transf_filter.task(item))
79 std::forward<MoreTransformers>(more_transform_ops)...);
83 template <
typename Item,
typename Transformer,
typename... MoreTransformers>
86 MoreTransformers && ... more_transform_ops)
90 std::forward<MoreTransformers>(more_transform_ops)... );
93 template <
typename Item,
typename Transformer,
typename... MoreTransformers>
96 MoreTransformers && ... more_transform_ops)
99 std::forward<MoreTransformers>(more_transform_ops)...);
105 template <
typename Item,
typename Transformer,
typename ... MoreTransformers>
107 Transformer && transform_op,
108 MoreTransformers && ... more_transform_ops)
111 std::forward<MoreTransformers>(more_transform_ops)...);
131 template <
typename Generator,
132 typename ... Transformers,
133 typename =
typename std::result_of<Generator()>::type>
135 Transformers && ... transform_ops)
138 auto item = generator_op();
140 pipeline_impl(ex, *item, std::forward<Transformers>(transform_ops) ... );
Definition: callable_traits.h:24
Definition: patterns.h:61
Definition: patterns.h:29
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
std::enable_if_t<(Index< sizeof...(T)-1), int > requires_index_not_last
Definition: pack_traits.h:34
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
Definition: patterns.h:51
Sequential execution policy.
Definition: sequential_execution.h:31
void composed_pipeline(InQueue &input_queue, const pipeline_info< parallel_execution_native, MoreTransformers... > &pipe, OutQueue &output_queue, std::vector< std::thread > &tasks)
Definition: native/pipeline.h:36
std::enable_if_t<(Index==sizeof...(T)-1), int > requires_index_last
Definition: pack_traits.h:30