GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/stream_iteration.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_STREAM_ITERATION_H
22 #define GRPPI_NATIVE_STREAM_ITERATION_H
23 
25 
26 #include <thread>
27 #include <utility>
28 #include <memory>
29 
30 namespace grppi {
31 
53 template<typename Generator, typename Predicate, typename Consumer,
54  typename ...MoreTransformers>
56  Generator && generate_op,
58  MoreTransformers...> && pipe,
59  Predicate predicate_op, Consumer consume_op)
60 {
61  using namespace std;
62  using generated_type = typename result_of<Generator()>::type;
63  using pipeline_info_type = pipeline_info<parallel_execution_native, MoreTransformers...>;
64 
65  auto generated_queue = ex.make_queue<generated_type>();
66  auto transformed_queue = ex.make_queue<generated_type>();
67  atomic<int> num_elements{0};
68  atomic<bool> send_finish{false};
69 
70  thread generator_task([&](){
71  auto manager = ex.thread_manager();
72  for (;;) {
73  auto item{generate_op()};
74  if (!item) {
75  send_finish=true;
76  break;
77  }
78  num_elements++;
79  generated_queue.push(item);
80  }
81  });
82 
83  vector<thread> pipe_threads;
84  composed_pipeline<decltype(generated_queue),
85  decltype(transformed_queue), 0, MoreTransformers ...>(
86  generated_queue,
87  forward<pipeline_info_type>(pipe),
88  transformed_queue, pipe_threads);
89 
90  auto manager = ex.thread_manager();
91  for (;;) {
92  //If every element has been processed
93  if (send_finish && num_elements==0) {
94  generated_queue.push({});
95  send_finish = false;
96  break;
97  }
98 
99  auto item{transformed_queue.pop()};
100  if (predicate_op(*item)) {
101  num_elements--;
102  consume_op(*item);
103  }
104  else {
105  //If the condition is not met reintroduce the element in the input queue
106  generated_queue.push(item);
107  }
108  }
109 
110  generator_task.join();
111  for (auto && t : pipe_threads) { t.join(); }
112 }
113 
127 template<typename Generator, typename Transformer, typename Predicate,
128  typename Consumer>
130  Generator generate_op,
132  Predicate predicate_op, Consumer consume_op)
133 {
134  using namespace std;
135  using generated_type = typename result_of<Generator()>::type;
136  auto generated_queue = ex.make_queue<generated_type>();
137  auto transformed_queue = ex.make_queue<generated_type>();
138  atomic<int> done_threads{0};
139  vector<thread> tasks;
140 
141  thread generator_task([&](){
142  auto manager = farm.exectype.thread_manager();
143  for (;;) {
144  auto item = generate_op();
145  generated_queue.push(item);
146  if (!item) break;
147  }
148 
149  //When generation is finished start working on the farm
150  auto item{generated_queue.pop()};
151  while (item) {
152  auto out = *item;
153  do {
154  out = farm.task(out);
155  } while (!predicate_op(out));
156  transformed_queue.push(out);
157  item = generated_queue.pop();
158  }
159  done_threads++;
160  if(done_threads == farm.exectype.concurrency_degree()) {
161  transformed_queue.push({});
162  }
163  else {
164  generated_queue.push(item);
165  }
166  });
167  //Farm workers
168  for(int th = 1; th < farm.exectype.concurrency_degree(); th++) {
169  tasks.emplace_back([&]() {
170  auto manager = farm.exectype.thread_manager();
171  auto item{generated_queue.pop()};
172  while (item) {
173  auto out = *item;
174  do {
175  out = farm.task(out);
176  } while (!predicate_op(out));
177  transformed_queue.push(out);
178  item = generated_queue.pop();
179  }
180  done_threads++;
181  if (done_threads == farm.exectype.concurrency_degree()) {
182  transformed_queue.push({});
183  }
184  else {
185  generated_queue.push(item);
186  }
187  });
188  }
189 
190  thread consumer_task([&](){
191  auto manager = ex.thread_manager();
192  for (;;){
193  auto item{transformed_queue.pop()};
194  if(!item) break;
195  consume_op(*item);
196  }
197  });
198 
199  for(auto && t : tasks) { t.join(); }
200  generator_task.join();
201  consumer_task.join();
202 }
203 
204 
218 template<typename Generator, typename Transformer, typename Predicate,
219  typename Consumer>
221  Generator generate_op, Transformer transform_op,
222  Predicate predicate_op, Consumer consume_op)
223 {
224  using namespace std;
225  using namespace experimental;
226  using generated_type = typename result_of<Generator()>::type;
227  using generated_value_type = typename generated_type::value_type;
228  using transformed_type =
229  typename result_of<Transformer(generated_value_type)>::type;
230 
231  auto generated_queue = ex.make_queue<generated_type>();
232  auto transformed_queue = ex.make_queue<generated_type>();
233 
234  thread producer_task([&generate_op, &generated_queue, &ex](){
235  auto manager = ex.thread_manager();
236  for(;;) {
237  auto item{generate_op()};
238  generated_queue.push(item);
239  if (!item) break;
240  }
241  });
242 
243  thread transformer_task([&generated_queue,&transform_op,&predicate_op,
244  &transformed_queue, &ex](){
245  auto manager = ex.thread_manager();
246  auto item{generated_queue.pop()};
247  while (item) {
248  auto val = *item;
249  do {
250  val = transform_op(val);
251  } while (!predicate_op(val));
252  transformed_queue.push(val);
253  item = generated_queue.pop();
254  }
255  transformed_queue.push({});
256  });
257 
258  thread consumer_task([&consume_op,&transformed_queue,&ex](){
259  auto manager = ex.thread_manager();
260  auto item{transformed_queue.pop()};
261  while (item) {
262  consume_op(*item);
263  item=transformed_queue.pop();
264  }
265  });
266 
267  producer_task.join();
268  transformer_task.join();
269  consumer_task.join();
270 }
271 
272 }
273 
274 #endif
Definition: callable_traits.h:24
Definition: patterns.h:29
void repeat_until(parallel_execution_native &ex, Generator &&generate_op, pipeline_info< parallel_execution_native, MoreTransformers... > &&pipe, Predicate predicate_op, Consumer consume_op)
Invoke Stream iteration pattern on a data stream with native parallel execution with a generator...
Definition: native/stream_iteration.h:55
STL namespace.
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
Definition: patterns.h:51
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51
void composed_pipeline(InQueue &input_queue, const pipeline_info< parallel_execution_native, MoreTransformers... > &pipe, OutQueue &output_queue, std::vector< std::thread > &tasks)
Definition: native/pipeline.h:36
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225