21 #ifndef GRPPI_NATIVE_PIPELINE_H 22 #define GRPPI_NATIVE_PIPELINE_H 25 #include "../common/pack_traits.h" 26 #include "../common/callable_traits.h" 29 #include <experimental/optional> 33 template <
typename InQueue,
typename OutQueue,
int Index,
34 typename ... MoreTransformers,
38 OutQueue & output_queue, std::vector<std::thread> & tasks)
41 std::get<Index>(pipe.
stages), output_queue, tasks);
45 template <
typename InQueue,
typename OutQueue,
int Index,
46 typename ... MoreTransformers,
50 OutQueue & output_queue, std::vector<std::thread> & tasks)
53 using namespace experimental;
56 typename tuple_element<Index,decltype(pipeline_obj.
stages)>::type;
57 using gen_value_type =
typename InQueue::value_type;
58 using input_value_type =
typename gen_value_type::value_type;
59 using result_value_type =
60 typename result_of<stage_type(input_value_type)>::type;
61 using result_type = optional<result_value_type>;
64 static auto tmp_queue = ex.
make_queue<result_type>();
67 get<Index>(pipeline_obj.
stages), tmp_queue, tasks);
68 composed_pipeline<mpmc_queue<result_type>,
69 OutQueue, Index+1, MoreTransformers ...>(
70 tmp_queue,pipeline_obj,output_queue,tasks);
73 template <
typename InQueue,
typename Transformer,
typename OutQueue>
75 Transformer && transform_op, OutQueue & output_queue,
76 std::vector<std::thread> & tasks)
79 tasks.emplace_back([&]() {
81 auto item = input_queue.pop();
83 using output_type =
typename OutQueue::value_type;
85 output_queue.push(output_type{});
89 output_queue.push(transform_op(*item));
91 item = input_queue.pop();
97 template <
typename InQueue,
typename Consumer>
102 using gen_value_type =
typename InQueue::value_type;
106 vector<gen_value_type> elements;
109 auto item = input_queue.pop();
111 if(current == item.second){
112 consume(*item.first);
116 elements.push_back(item);
119 for (
auto it=elements.begin(); it!=elements.end(); it++) {
120 if(it->second == current) {
127 item = input_queue.pop( );
129 while (elements.size()>0) {
131 for (
auto it = elements.begin(); it != elements.end(); it++) {
132 if(it->second == current) {
142 auto item = input_queue.pop( );
144 consume(*item.first);
145 item = input_queue.pop();
151 template <
typename Combiner,
typename Identity,
typename InQueue,
typename ...MoreTransformers>
155 using reduction_type =
158 pipeline_impl(ex, input_queue, std::forward<reduction_type&&>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
161 template <
typename Combiner,
typename Identity,
typename InQueue,
typename ...MoreTransformers>
165 using reduction_type =
167 pipeline_impl_ordered(ex, input_queue, std::forward<reduction_type>(reduction_obj), std::forward<MoreTransformers...>(more_transform_ops...));
170 template <
typename Combiner,
typename Identity,
typename InQueue,
typename ...MoreTransformers>
175 using namespace std::experimental;
176 vector<thread> tasks;
177 using gen_value_type =
typename InQueue::value_type;
178 using input_value_type =
typename gen_value_type::first_type::value_type;
180 using result_value_type =
typename result_of<Combiner(input_value_type, input_value_type)>::type;
181 using result_type = pair<optional<result_value_type>,
long>;
183 auto output_queue = ex.
make_queue<result_type>();
185 thread windower_task([&](){
186 vector<input_value_type> values;
188 auto item {input_queue.pop()};
190 while (item.first && values.size() != reduction_obj.window_size) {
191 values.push_back(*item.first);
192 item = input_queue.pop();
194 if (values.size() > 0) {
195 auto reduced_value =
reduce(reduction_obj.exectype, values.begin(), values.end(), reduction_obj.identity,
196 std::forward<Combiner>(reduction_obj.combine_op));
197 output_queue.push({{reduced_value}, out_order});
200 if (reduction_obj.offset <= reduction_obj.window_size) {
201 values.erase(values.begin(), values.begin() + reduction_obj.offset);
204 values.erase(values.begin(), values.end());
205 auto diff = reduction_obj.offset - reduction_obj.window_size;
206 while (diff > 0 && item.first) {
207 item = input_queue.pop();
213 if (!item.first)
break;
215 output_queue.push({{},-1});
218 pipeline_impl(ex, output_queue, forward<MoreTransformers>(more_transform_ops) ... );
219 windower_task.join();
223 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
226 MoreTransformers && ... more_transform_ops)
230 pipeline_impl(ex,input_queue, std::forward<filter_type>(filter_obj),
231 std::forward<MoreTransformers>(more_transform_ops)... );
234 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
237 MoreTransformers && ... more_transform_ops )
240 vector<thread> tasks;
242 using gen_value_type =
typename InQueue::value_type;
243 using input_value_type =
typename gen_value_type::first_type;
244 auto tmp_queue = ex.
make_queue<gen_value_type>();
246 atomic<int> done_threads{0};
247 for (
int th=0; th<filter_obj.exectype.concurrency_degree(); th++) {
248 tasks.emplace_back([&]() {
249 auto manager = filter_obj.exectype.thread_manager();
251 auto item{input_queue.pop()};
253 if (filter_obj.task(*item.first)) {
254 tmp_queue.push(item);
257 tmp_queue.push(make_pair(input_value_type{},item.second) );
259 item = input_queue.pop();
262 if (done_threads==filter_obj.exectype.concurrency_degree()) {
263 tmp_queue.push(make_pair(input_value_type{}, -1));
266 input_queue.push(item);
271 auto output_queue = ex.
make_queue<gen_value_type>();
272 auto ordering_thread = thread{[&](){
274 vector<gen_value_type> elements;
277 auto item{tmp_queue.pop()};
279 if(!item.first && item.second == -1)
break;
280 if (item.second == current) {
282 output_queue.push(make_pair(item.first,order));
288 elements.push_back(item);
291 for (
auto it=elements.begin(); it<elements.end(); it++) {
292 if (it->second == current) {
294 output_queue.push(make_pair(it->first,order));
302 item = tmp_queue.pop();
304 while (elements.size()>0) {
306 for (
auto it=elements.begin(); it<elements.end(); it++) {
307 if (it->second == current) {
309 output_queue.push(make_pair(it->first,order));
318 output_queue.push(item);
321 pipeline_impl(ex, output_queue, forward<MoreTransformers>(more_transform_ops) ... );
322 ordering_thread.join();
323 for (
auto && t : tasks) { t.join(); }
326 template <
typename Transformer,
typename InQueue,
typename ... MoreTransformers>
329 MoreTransformers && ... more_transform_ops)
332 vector<thread> tasks;
334 using gen_value_type =
typename InQueue::value_type;
335 using input_value_type =
typename gen_value_type::first_type;
336 auto output_queue = ex.
make_queue<gen_value_type>();
338 atomic<int> done_threads{0};
340 for (
int th=0; th<filter_obj.exectype.concurrency_degree(); th++) {
341 tasks.emplace_back([&]() {
342 auto manager = filter_obj.exectype.thread_manager();
344 auto item{input_queue.pop()};
346 if (filter_obj.task(*item.first)) {
347 output_queue.push(item);
349 item = input_queue.pop();
352 if (done_threads==filter_obj.exectype.concurrency_degree()) {
353 output_queue.push( make_pair(input_value_type{}, -1) );
356 input_queue.push(item);
362 forward<MoreTransformers>(more_transform_ops) ... );
364 for (
auto && t : tasks) { t.join(); }
367 template <
typename Transformer,
typename InQueue,
typename ... MoreTransformers>
370 MoreTransformers && ... more_transform_ops)
375 std::forward<filter_type>(filter_obj),
376 std::forward<MoreTransformers>(more_transform_ops)...);
380 std::forward<filter_type>(filter_obj),
381 std::forward<MoreTransformers>(more_transform_ops)...);
385 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
388 MoreTransformers && ... more_transform_ops)
391 pipeline_impl(ex, input_queue, std::forward<farm_type>(farm_obj),
392 std::forward< MoreTransformers>(more_transform_ops) ... );
397 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
400 MoreTransformers && ... more_transform_ops)
404 using input_item_type =
typename InQueue::value_type;
405 using input_item_value_type =
406 typename input_item_type::first_type::value_type;
407 using transform_result_type =
408 typename result_of<Transformer(input_item_value_type)>::type;
409 using output_item_value_type =
410 experimental::optional<transform_result_type>;
411 using output_item_type =
412 pair<output_item_value_type,long>;
413 auto output_queue = p.
make_queue<output_item_type>();
415 atomic<int> done_threads{0};
416 vector<thread> tasks;
417 for(
int th = 0; th<farm_obj.exectype.concurrency_degree(); ++th){
418 tasks.emplace_back([&]() {
419 auto manager = farm_obj.exectype.thread_manager();
422 auto item{input_queue.pop()};
424 auto out = output_item_value_type{farm_obj.task(*item.first)};
425 output_queue.push(make_pair(out,item.second)) ;
426 item = input_queue.pop( );
428 input_queue.push(item);
430 if (done_threads == farm_obj.exectype.concurrency_degree()) {
431 output_queue.push(make_pair(output_item_value_type{}, -1));
436 forward<MoreTransformers>(more_transform_ops)... );
438 for (
auto && t : tasks) { t.join(); }
442 template <
typename Transformer,
typename InQueue,
typename... MoreTransformers>
444 Transformer && transform_op,
445 MoreTransformers && ... more_transform_ops)
449 using input_item_type =
typename InQueue::value_type;
450 using input_item_value_type =
typename input_item_type::first_type::value_type;
451 using transform_result_type =
452 typename result_of<Transformer(input_item_value_type)>::type;
453 using output_item_value_type =
454 experimental::optional<transform_result_type>;
455 using output_item_type =
456 pair<output_item_value_type,long>;
458 auto output_queue = ex.
make_queue<output_item_type>();
465 auto item{input_queue.pop()};
467 auto out = output_item_value_type{transform_op(*item.first)};
468 output_queue.push(make_pair(out, item.second));
469 item = input_queue.pop( ) ;
471 output_queue.push(make_pair(output_item_value_type{},-1));
476 forward<MoreTransformers>(more_transform_ops)...);
498 template <
typename Generator,
typename ... Transformers,
501 Transformers && ... transform_ops)
505 using result_type =
typename result_of<Generator()>::type;
506 using output_type = pair<result_type,long>;
507 auto first_queue = ex.
make_queue<output_type>();
509 thread generator_task(
515 auto item{generate_op()};
516 first_queue.push(make_pair(item, order));
523 pipeline_impl(ex, first_queue, forward<Transformers>(transform_ops)...);
524 generator_task.join();
Definition: callable_traits.h:24
void pipeline_impl_ordered(parallel_execution_native &ex, InQueue &input_queue, reduction_info< parallel_execution_native, Combiner, Identity > &&reduction_obj, MoreTransformers...more_transform_ops)
Definition: native/pipeline.h:171
Definition: patterns.h:61
Definition: patterns.h:38
Definition: patterns.h:29
typename std::enable_if_t<!internal::has_arguments< F >(), int > requires_no_arguments
Definition: callable_traits.h:86
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
E & exectype
Definition: patterns.h:31
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
std::enable_if_t<(Index< sizeof...(T)-1), int > requires_index_not_last
Definition: pack_traits.h:34
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
std::tuple< Stage, Stages... > stages
Definition: patterns.h:32
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
void pipeline_impl_unordered(parallel_execution_native &ex, InQueue &input_queue, filter_info< parallel_execution_native, Transformer > &&filter_obj, MoreTransformers &&...more_transform_ops)
Definition: native/pipeline.h:327
Definition: patterns.h:51
void composed_pipeline(InQueue &input_queue, const pipeline_info< parallel_execution_native, MoreTransformers... > &pipe, OutQueue &output_queue, std::vector< std::thread > &tasks)
Definition: native/pipeline.h:36
std::enable_if_t<(Index==sizeof...(T)-1), int > requires_index_last
Definition: pack_traits.h:30
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_native.h:193
auto reduce(parallel_execution_native &ex, InputIt first, InputIt last, Identity identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with parallel native execution...
Definition: native/reduce.h:51