21 #ifndef GRPPI_TBB_PIPELINE_H 22 #define GRPPI_TBB_PIPELINE_H 28 #include <experimental/optional> 38 template <
typename Transformer,
typename Input>
41 Transformer && transform_op)
43 return tbb::make_filter<std::experimental::optional<Input>,
void>(
44 tbb::filter::serial_in_order,
45 [&](std::experimental::optional<Input> s) {
46 if(s) transform_op(*s);
53 template <
typename Transformer,
typename ... MoreTransformers,
typename Input>
57 MoreTransformers && ... more_transform_ops)
61 return pipeline_impl(ex, in, std::forward<farm_type>(farm_obj),
62 std::forward<MoreTransformers>(more_transform_ops)...);
66 template <
typename Predicate,
typename... MoreTransformers,
typename Input>
70 MoreTransformers && ... more_transform_ops)
74 return pipeline_impl(ex, in, std::forward<filter_type>(filter_obj),
75 std::forward<MoreTransformers>(more_transform_ops)...);
78 template <
typename Predicate,
typename... MoreTransformers,
typename Input>
82 MoreTransformers && ... more_transform_ops)
85 using optional_input_type = experimental::optional<Input>;
88 tbb::make_filter<optional_input_type,optional_input_type>(
89 tbb::filter::parallel,
90 [&](optional_input_type val) -> optional_input_type {
91 return (val && filter_obj.task(*val)) ?
93 optional_input_type{};
98 std::forward<MoreTransformers>(more_transform_ops)...);
102 template <
typename Transformer,
typename ... MoreTransformers,
typename Input>
106 MoreTransformers && ... more_transform_ops )
109 using optional_input_type = experimental::optional<Input>;
110 using output_type =
typename result_of<Transformer(Input)>::type;
111 using optional_output_type = experimental::optional<output_type>;
114 tbb::make_filter<optional_input_type, optional_output_type>(
115 tbb::filter::parallel,
116 [&](optional_input_type val) -> optional_output_type {
118 farm_obj.task(*val) :
119 optional_output_type{};
124 forward<MoreTransformers>(more_transform_ops)...);
127 template <
typename Predicate,
typename ... MoreTransformers,
typename Input>
130 Predicate && predicate_op,
131 MoreTransformers && ... more_transform_ops)
134 using optional_input_type = experimental::optional<Input>;
135 using output_type =
typename result_of<Predicate(Input)>::type;
136 using optional_output_type = experimental::optional<output_type>;
139 tbb::make_filter<optional_input_type, optional_output_type>(
140 tbb::filter::serial_in_order,
141 [&](optional_input_type val) -> optional_output_type {
142 return (val) ? predicate_op(*val) :
143 optional_output_type{};
148 forward<MoreTransformers>(more_transform_ops)...);
169 template <
typename Generator,
typename ... Transformers,
172 Transformers && ... transform_ops)
175 using result_type =
typename result_of<Generator()>::type;
176 using output_value_type =
typename result_type::value_type;
177 using output_type = experimental::optional<output_value_type>;
179 const auto this_filter = tbb::make_filter<void, output_type>(
180 tbb::filter::serial_in_order,
181 [&](tbb::flow_control& fc) -> output_type {
182 auto item = generate_op();
183 if (!item) { fc.stop(); }
184 return (item) ? *item :output_type{};
188 tbb::task_group_context context;
189 tbb::parallel_pipeline(ex.
tokens(),
193 forward<Transformers>(transform_ops)...));
Definition: callable_traits.h:24
Definition: patterns.h:61
int tokens() const noexcept
Definition: parallel_execution_tbb.h:112
typename std::enable_if_t<!internal::has_arguments< F >(), int > requires_no_arguments
Definition: callable_traits.h:86
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:37
Definition: patterns.h:51