21 #ifndef GRPPI_OMP_PIPELINE_H 22 #define GRPPI_OMP_PIPELINE_H 28 #include <experimental/optional> 33 template <
typename InQueue,
typename Consumer>
35 Consumer && consume_op)
38 using gen_value_type =
typename InQueue::value_type;
41 vector<gen_value_type> elements;
43 auto item = input_queue.pop( );
45 if (current == item.second) {
46 consume_op(*item.first);
50 elements.push_back(item);
52 for (
auto it=elements.begin(); it!=elements.end(); it++) {
53 if (it->second == current) {
54 consume_op(*it->first);
60 item = input_queue.pop( );
62 while(elements.size()>0){
63 for(
auto it = elements.begin(); it != elements.end(); it++){
64 if(it->second == current) {
65 consume_op(*it->first);
74 auto item = input_queue.pop();
76 consume_op(*item.first);
77 item = input_queue.pop();
85 template <
typename Combiner,
typename Identity,
typename InQueue,
typename ...MoreTransformers>
89 using reduction_type =
92 pipeline_impl(ex, input_queue, std::forward<reduction_type&&>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
95 template <
typename Combiner,
typename Identity,
typename InQueue,
typename ...MoreTransformers>
99 using reduction_type =
101 pipeline_impl_ordered(ex, input_queue, std::forward<reduction_type>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
104 template <
typename Combiner,
typename Identity,
typename InQueue,
typename ...MoreTransformers>
109 using namespace std::experimental;
110 using gen_value_type =
typename InQueue::value_type;
111 using input_value_type =
typename gen_value_type::first_type::value_type;
113 using result_value_type =
typename result_of<Combiner(input_value_type, input_value_type)>::type;
114 using result_type = pair<optional<result_value_type>,
long>;
116 auto output_queue = ex.
make_queue<result_type>();
118 #pragma omp task shared(output_queue, input_queue, reduction_obj) 120 vector<input_value_type> values;
122 auto item {input_queue.pop()};
124 while (item.first && values.size() != reduction_obj.window_size) {
125 values.push_back(*item.first);
126 item = input_queue.pop();
128 if (values.size() > 0) {
129 auto reduced_value =
reduce(reduction_obj.exectype, values.begin(), values.end(), reduction_obj.identity,
130 std::forward<Combiner>(reduction_obj.combine_op));
131 output_queue.push({{reduced_value}, out_order});
134 if (reduction_obj.offset <= reduction_obj.window_size) {
135 values.erase(values.begin(), values.begin() + reduction_obj.offset);
138 values.erase(values.begin(), values.end());
139 auto diff = reduction_obj.offset - reduction_obj.window_size;
140 while (diff > 0 && item.first) {
141 item = input_queue.pop();
147 if (!item.first)
break;
149 output_queue.push({{},-1});
152 pipeline_impl(ex, output_queue, forward<MoreTransformers>(more_transform_ops) ... );
159 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
162 MoreTransformers && ... more_transform_ops)
166 pipeline_impl(ex,input_queue, std::forward<filter_type>(filter_obj),
167 std::forward<MoreTransformers>(more_transform_ops)...) ;
170 template <
typename Transformer,
typename InQueue,
171 typename... MoreTransformers>
173 InQueue & input_queue,
175 MoreTransformers && ... more_transform_ops)
178 using gen_value_type =
typename InQueue::value_type;
179 using input_value_type =
typename gen_value_type::first_type;
180 auto tmp_queue = ex.
make_queue<gen_value_type>();
182 atomic<int> done_threads{0};
183 for(
int th = 0; th<filter_obj.exectype.concurrency_degree(); th++) {
184 #pragma omp task shared(tmp_queue,filter_obj,input_queue,done_threads) 186 auto item{input_queue.pop()};
188 if(filter_obj.task(*item.first)) {
189 tmp_queue.push(item);
192 tmp_queue.push(make_pair(input_value_type{} ,item.second));
194 item = input_queue.pop();
197 if (done_threads==filter_obj.exectype.concurrency_degree()) {
198 tmp_queue.push (make_pair(input_value_type{}, -1));
201 input_queue.push(item);
206 auto output_queue = ex.
make_queue<gen_value_type>();
207 #pragma omp task shared (output_queue,tmp_queue) 209 vector<gen_value_type> elements;
212 auto item = tmp_queue.pop();
214 if (!item.first && item.second == -1)
break;
215 if (item.second == current) {
217 output_queue.push(make_pair(item.first, order++));
222 elements.push_back(item);
224 for (
auto it=elements.begin(); it<elements.end(); it++) {
225 if ((*it).second==current) {
227 output_queue.push(make_pair((*it).first,order++));
234 item = tmp_queue.pop();
236 while (elements.size()>0) {
237 for (
auto it=elements.begin(); it<elements.end(); it++) {
238 if ((*it).second == current) {
240 output_queue.push(make_pair((*it).first,order++));
248 output_queue.push(item);
251 forward<MoreTransformers>(more_transform_ops)...);
255 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
258 MoreTransformers && ... more_transform_ops)
260 using gen_value_type =
typename InQueue::value_type;
261 using input_value_type =
typename gen_value_type::first_type;
262 auto output_queue = ex.
make_queue<gen_value_type>();
264 std::atomic<int> done_threads{0};
265 for (
int th=0; th<farm_obj.exectype.concurrency_degree(); th++) {
266 #pragma omp task shared(output_queue,farm_obj,input_queue,done_threads) 268 auto item = input_queue.pop( ) ;
270 if (farm_obj.task(*item.first)) {
271 output_queue.push(item);
273 item = input_queue.pop();
276 if (done_threads==farm_obj.exectype.concurrency_degree()) {
277 output_queue.push(make_pair(input_value_type{}, -1));
280 input_queue.push(item);
285 std::forward<MoreTransformers>(more_transform_ops)...);
289 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
292 MoreTransformers && ... more_transform_ops)
298 std::forward<filter_type>(filter_obj),
299 std::forward<MoreTransformers>(more_transform_ops)...);
303 std::forward<filter_type>(filter_obj),
304 std::forward<MoreTransformers>(more_transform_ops)...);
309 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
312 MoreTransformers && ... more_transform_ops)
315 pipeline_impl(ex, input_queue, std::forward<farm_type>(farm_obj),
316 std::forward<MoreTransformers>(more_transform_ops)...) ;
319 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
322 MoreTransformers && ... sgs )
325 using gen_value_type =
typename InQueue::value_type;
326 using input_value_type =
typename gen_value_type::first_type::value_type;
327 using result_type =
typename result_of<Transformer(input_value_type)>::type;
328 using output_value_type = experimental::optional<result_type>;
329 using output_type = pair<output_value_type,long>;
331 auto output_queue = ex.
make_queue<output_type>();
332 atomic<int> done_threads{0};
333 for (
int th=0; th<farm_obj.exectype.concurrency_degree(); th++) {
334 #pragma omp task shared(done_threads,output_queue,farm_obj,input_queue) 336 auto item = input_queue.pop();
338 auto out = output_value_type{farm_obj.task(*item.first)};
339 output_queue.push(make_pair(out,item.second));
340 item = input_queue.pop();
342 input_queue.push(item);
344 if (done_threads==farm_obj.exectype.concurrency_degree()) {
345 output_queue.push(make_pair(output_value_type{}, -1));
349 pipeline_impl(ex, output_queue, forward<MoreTransformers>(sgs) ... );
354 template <
typename Transformer,
typename InQueue,
typename ... MoreTransformers>
356 Transformer && transform_op,
357 MoreTransformers && ... more_transform_ops)
360 using gen_value_type =
typename InQueue::value_type;
361 using input_value_type =
typename gen_value_type::first_type::value_type;
362 using result_type =
typename result_of<Transformer(input_value_type)>::type;
363 using output_value_type = experimental::optional<result_type>;
364 using output_type = pair<output_value_type,long>;
365 auto output_queue = ex.
make_queue<output_type>();
368 #pragma omp task shared(transform_op, input_queue, output_queue) 370 auto item = input_queue.pop();
372 auto out = output_value_type{transform_op(*item.first)};
373 output_queue.push(make_pair(out, item.second));
374 item = input_queue.pop() ;
376 output_queue.push(make_pair(output_value_type{}, -1));
381 forward<MoreTransformers>(more_transform_ops)...);
402 template <
typename Generator,
typename ... Transformers,
405 Transformers && ... transform_ops)
409 using result_type =
typename result_of<Generator()>::type;
410 auto output_queue = ex.
make_queue<pair<result_type,long>>();
414 #pragma omp single nowait 416 #pragma omp task shared(generate_op,output_queue) 420 auto item = generate_op();
421 output_queue.push(make_pair(item,order++)) ;
426 forward<Transformers>(transform_ops)...);
Definition: callable_traits.h:24
void pipeline_impl_ordered(parallel_execution_native &ex, InQueue &input_queue, reduction_info< parallel_execution_native, Combiner, Identity > &&reduction_obj, MoreTransformers...more_transform_ops)
Definition: native/pipeline.h:171
Definition: patterns.h:61
Definition: patterns.h:38
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_omp.h:102
typename std::enable_if_t<!internal::has_arguments< F >(), int > requires_no_arguments
Definition: callable_traits.h:86
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
void pipeline_impl_unordered(parallel_execution_native &ex, InQueue &input_queue, filter_info< parallel_execution_native, Transformer > &&filter_obj, MoreTransformers &&...more_transform_ops)
Definition: native/pipeline.h:327
Definition: patterns.h:51
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51