GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
seq/pipeline.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_SEQ_PIPELINE_H
22 #define GRPPI_SEQ_PIPELINE_H
23 
24 #include "sequential_execution.h"
25 
26 #include "../common/pack_traits.h"
27 #include "../common/patterns.h"
28 
29 namespace grppi {
30 
31 template <typename Input, int Index, typename ... MoreTransformers,
32  internal::requires_index_last<Index,MoreTransformers...> = 0>
33 auto composed_pipeline(Input in,
34  const pipeline_info<
36  MoreTransformers...> & pipeline_obj)
37 {
38  return std::get<Index>(pipeline_obj.stages)(in);
39 }
40 
41 template <typename Input, int Index, typename ... MoreTransformers,
42  internal::requires_index_not_last<Index,MoreTransformers...> = 0>
43 auto composed_pipeline(Input in,
44  const pipeline_info<
46  MoreTransformers...> & pipeline_obj)
47 {
48  return composed_pipeline<Input, Index+1, MoreTransformers...>(
49  std::get<Index>(pipeline_obj.stages)(in),
50  pipeline_obj);
51 }
52 
53 //Last stage
54 template <typename Item, typename Transformer>
55 void pipeline_impl(sequential_execution &s, Item && item,
56  Transformer && transform_op)
57 {
58  transform_op(std::forward<Item>(item));
59 }
60 
61 //Filter stage
62 template <typename Item, typename Filter, typename... MoreTransformers>
63 void pipeline_impl(sequential_execution & ex, Item && item,
64  const filter_info<sequential_execution,Filter> & filter_obj,
65  MoreTransformers && ... more_transform_ops)
66 {
67  pipeline_impl(ex, std::forward<Item>(item),
68  std::forward<filter_info<sequential_execution,Filter>>(filter_obj),
69  std::forward<MoreTransformers>(more_transform_ops)...);
70 }
71 
72 template <typename Item, typename Transformer, typename... MoreTransformers>
73 void pipeline_impl(sequential_execution & ex, Item && item,
75  MoreTransformers && ... more_transform_ops)
76 {
77  if(transf_filter.task(item))
78  pipeline_impl(ex, std::forward<Item>(item),
79  std::forward<MoreTransformers>(more_transform_ops)...);
80 }
81 
82 //Farm stage
83 template <typename Item, typename Transformer, typename... MoreTransformers>
84 void pipeline_impl(sequential_execution & ex, Item && item,
86  MoreTransformers && ... more_transform_ops)
87 {
88  pipeline_impl(ex, std::forward<Item>(item),
89  std::forward<farm_info<sequential_execution,Transformer>>(transf_farm),
90  std::forward<MoreTransformers>(more_transform_ops)... );
91 }
92 
93 template <typename Item, typename Transformer, typename... MoreTransformers>
94 void pipeline_impl(sequential_execution & ex, Item && item,
96  MoreTransformers && ... more_transform_ops)
97 {
98  pipeline_impl(ex, transform_farm.task(item),
99  std::forward<MoreTransformers>(more_transform_ops)...);
100 }
101 
102 
103 
104 //Intermediate stages
105 template <typename Item, typename Transformer, typename ... MoreTransformers>
106 void pipeline_impl(sequential_execution & ex, Item && item,
107  Transformer && transform_op,
108  MoreTransformers && ... more_transform_ops)
109 {
110  pipeline_impl(ex, transform_op(item),
111  std::forward<MoreTransformers>(more_transform_ops)...);
112 }
113 
131 template <typename Generator,
132  typename ... Transformers,
133  typename = typename std::result_of<Generator()>::type> // TODO: Intention?
134 void pipeline(sequential_execution & ex, Generator && generator_op,
135  Transformers && ... transform_ops)
136 {
137  for (;;) {
138  auto item = generator_op();
139  if(!item) break;
140  pipeline_impl(ex, *item, std::forward<Transformers>(transform_ops) ... );
141  }
142 }
143 
149 }
150 
151 #endif
Definition: callable_traits.h:24
Definition: patterns.h:61
Definition: patterns.h:29
void pipeline_impl(parallel_execution_native &ex, InQueue &input_queue, Consumer &&consume)
Definition: native/pipeline.h:98
std::enable_if_t<(Index< sizeof...(T)-1), int > requires_index_not_last
Definition: pack_traits.h:34
void pipeline(parallel_execution_native &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream with native parallel execution.
Definition: native/pipeline.h:500
Definition: patterns.h:51
Sequential execution policy.
Definition: sequential_execution.h:31
void composed_pipeline(InQueue &input_queue, const pipeline_info< parallel_execution_native, MoreTransformers... > &pipe, OutQueue &output_queue, std::vector< std::thread > &tasks)
Definition: native/pipeline.h:36
std::enable_if_t<(Index==sizeof...(T)-1), int > requires_index_last
Definition: pack_traits.h:30