GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
tbb/pipeline.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_TBB_PIPELINE_H
22 #define GRPPI_TBB_PIPELINE_H
23 
24 #ifdef GRPPI_TBB
25 
26 #include "parallel_execution_tbb.h"
27 
28 #include <experimental/optional>
29 
30 #include <tbb/tbb.h>
31 
32 namespace grppi {
33 
34 // TODO: Input could be only a template argument with no function argument.
35 
36 
37 //Last stage
38 template <typename Transformer, typename Input>
40  Input,
41  Transformer && transform_op)
42 {
43  return tbb::make_filter<std::experimental::optional<Input>, void>(
44  tbb::filter::serial_in_order,
45  [&](std::experimental::optional<Input> s) {
46  if(s) transform_op(*s);
47  }
48  );
49 }
50 
51 
52 //Intermediate stages
53 template <typename Transformer, typename ... MoreTransformers, typename Input>
55  Input in,
57  MoreTransformers && ... more_transform_ops)
58 {
60 
61  return pipeline_impl(ex, in, std::forward<farm_type>(farm_obj),
62  std::forward<MoreTransformers>(more_transform_ops)...);
63 }
64 
65 
66 template <typename Predicate, typename... MoreTransformers, typename Input>
68  Input in,
70  MoreTransformers && ... more_transform_ops)
71 {
73 
74  return pipeline_impl(ex, in, std::forward<filter_type>(filter_obj),
75  std::forward<MoreTransformers>(more_transform_ops)...);
76 }
77 
78 template <typename Predicate, typename... MoreTransformers, typename Input>
80  Input in,
82  MoreTransformers && ... more_transform_ops)
83 {
84  using namespace std;
85  using optional_input_type = experimental::optional<Input>;
86 
87  return
88  tbb::make_filter<optional_input_type,optional_input_type>(
89  tbb::filter::parallel,
90  [&](optional_input_type val) -> optional_input_type {
91  return (val && filter_obj.task(*val)) ?
92  (val) :
93  optional_input_type{};
94  }
95  )
96  &
97  pipeline_impl(ex, in,
98  std::forward<MoreTransformers>(more_transform_ops)...);
99 }
100 
101 
102 template <typename Transformer, typename ... MoreTransformers, typename Input>
104  Input,
106  MoreTransformers && ... more_transform_ops )
107 {
108  using namespace std;
109  using optional_input_type = experimental::optional<Input>;
110  using output_type = typename result_of<Transformer(Input)>::type;
111  using optional_output_type = experimental::optional<output_type>;
112 
113  return
114  tbb::make_filter<optional_input_type, optional_output_type>(
115  tbb::filter::parallel,
116  [&](optional_input_type val) -> optional_output_type {
117  return (val) ?
118  farm_obj.task(*val) :
119  optional_output_type{};
120  }
121  )
122  &
123  pipeline_impl(ex, output_type{},
124  forward<MoreTransformers>(more_transform_ops)...);
125 }
126 
127 template <typename Predicate, typename ... MoreTransformers, typename Input>
129  Input,
130  Predicate && predicate_op,
131  MoreTransformers && ... more_transform_ops)
132 {
133  using namespace std;
134  using optional_input_type = experimental::optional<Input>;
135  using output_type = typename result_of<Predicate(Input)>::type;
136  using optional_output_type = experimental::optional<output_type>;
137 
138  return
139  tbb::make_filter<optional_input_type, optional_output_type>(
140  tbb::filter::serial_in_order,
141  [&](optional_input_type val) -> optional_output_type {
142  return (val) ? predicate_op(*val) :
143  optional_output_type{};
144  }
145  )
146  &
147  pipeline_impl(ex, output_type{},
148  forward<MoreTransformers>(more_transform_ops)...);
149 }
150 
169 template <typename Generator, typename ... Transformers,
171 void pipeline(parallel_execution_tbb & ex, Generator generate_op,
172  Transformers && ... transform_ops)
173 {
174  using namespace std;
175  using result_type = typename result_of<Generator()>::type;
176  using output_value_type = typename result_type::value_type;
177  using output_type = experimental::optional<output_value_type>;
178 
179  const auto this_filter = tbb::make_filter<void, output_type>(
180  tbb::filter::serial_in_order,
181  [&](tbb::flow_control& fc) -> output_type {
182  auto item = generate_op();
183  if (!item) { fc.stop(); }
184  return (item) ? *item :output_type{};
185  }
186  );
187 
188  tbb::task_group_context context;
189  tbb::parallel_pipeline(ex.tokens(),
190  this_filter
191  &
192  pipeline_impl(ex, output_value_type{},
193  forward<Transformers>(transform_ops)...));
194 }
195 
201 }
202 
203 #endif
204 
205 #endif
Definition: callable_traits.h:24
Definition: patterns.h:61
int tokens() const noexcept
Definition: parallel_execution_tbb.h:112
typename std::enable_if_t<!internal::has_arguments< F >(), int > requires_no_arguments
Definition: callable_traits.h:86
STL namespace.
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:37
Definition: patterns.h:51