21 #ifndef GRPPI_NATIVE_STREAM_ITERATION_H 22 #define GRPPI_NATIVE_STREAM_ITERATION_H 53 template<
typename Generator,
typename Predicate,
typename Consumer,
54 typename ...MoreTransformers>
56 Generator && generate_op,
58 MoreTransformers...> && pipe,
59 Predicate predicate_op, Consumer consume_op)
62 using generated_type =
typename result_of<Generator()>::type;
65 auto generated_queue = ex.
make_queue<generated_type>();
66 auto transformed_queue = ex.
make_queue<generated_type>();
67 atomic<int> num_elements{0};
68 atomic<bool> send_finish{
false};
70 thread generator_task([&](){
73 auto item{generate_op()};
79 generated_queue.push(item);
83 vector<thread> pipe_threads;
85 decltype(transformed_queue), 0, MoreTransformers ...>(
87 forward<pipeline_info_type>(pipe),
88 transformed_queue, pipe_threads);
93 if (send_finish && num_elements==0) {
94 generated_queue.push({});
99 auto item{transformed_queue.pop()};
100 if (predicate_op(*item)) {
106 generated_queue.push(item);
110 generator_task.join();
111 for (
auto && t : pipe_threads) { t.join(); }
127 template<
typename Generator,
typename Transformer,
typename Predicate,
130 Generator generate_op,
132 Predicate predicate_op, Consumer consume_op)
135 using generated_type =
typename result_of<Generator()>::type;
136 auto generated_queue = ex.
make_queue<generated_type>();
137 auto transformed_queue = ex.
make_queue<generated_type>();
138 atomic<int> done_threads{0};
139 vector<thread> tasks;
141 thread generator_task([&](){
142 auto manager =
farm.exectype.thread_manager();
144 auto item = generate_op();
145 generated_queue.push(item);
150 auto item{generated_queue.pop()};
154 out =
farm.task(out);
155 }
while (!predicate_op(out));
156 transformed_queue.push(out);
157 item = generated_queue.pop();
160 if(done_threads ==
farm.exectype.concurrency_degree()) {
161 transformed_queue.push({});
164 generated_queue.push(item);
168 for(
int th = 1; th <
farm.exectype.concurrency_degree(); th++) {
169 tasks.emplace_back([&]() {
170 auto manager =
farm.exectype.thread_manager();
171 auto item{generated_queue.pop()};
175 out =
farm.task(out);
176 }
while (!predicate_op(out));
177 transformed_queue.push(out);
178 item = generated_queue.pop();
181 if (done_threads ==
farm.exectype.concurrency_degree()) {
182 transformed_queue.push({});
185 generated_queue.push(item);
190 thread consumer_task([&](){
193 auto item{transformed_queue.pop()};
199 for(
auto && t : tasks) { t.join(); }
200 generator_task.join();
201 consumer_task.join();
218 template<
typename Generator,
typename Transformer,
typename Predicate,
221 Generator generate_op, Transformer transform_op,
222 Predicate predicate_op, Consumer consume_op)
225 using namespace experimental;
226 using generated_type =
typename result_of<Generator()>::type;
227 using generated_value_type =
typename generated_type::value_type;
228 using transformed_type =
229 typename result_of<Transformer(generated_value_type)>::type;
231 auto generated_queue = ex.
make_queue<generated_type>();
232 auto transformed_queue = ex.
make_queue<generated_type>();
234 thread producer_task([&generate_op, &generated_queue, &ex](){
237 auto item{generate_op()};
238 generated_queue.push(item);
243 thread transformer_task([&generated_queue,&transform_op,&predicate_op,
244 &transformed_queue, &ex](){
246 auto item{generated_queue.pop()};
250 val = transform_op(val);
251 }
while (!predicate_op(val));
252 transformed_queue.push(val);
253 item = generated_queue.pop();
255 transformed_queue.push({});
258 thread consumer_task([&consume_op,&transformed_queue,&ex](){
260 auto item{transformed_queue.pop()};
263 item=transformed_queue.pop();
267 producer_task.join();
268 transformer_task.join();
269 consumer_task.join();
Definition: callable_traits.h:24
Definition: patterns.h:29
void repeat_until(parallel_execution_native &ex, Generator &&generate_op, pipeline_info< parallel_execution_native, MoreTransformers... > &&pipe, Predicate predicate_op, Consumer consume_op)
Invoke Stream iteration pattern on a data stream with native parallel execution with a generator...
Definition: native/stream_iteration.h:55
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
Definition: patterns.h:51
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51
void composed_pipeline(InQueue &input_queue, const pipeline_info< parallel_execution_native, MoreTransformers... > &pipe, OutQueue &output_queue, std::vector< std::thread > &tasks)
Definition: native/pipeline.h:36
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225