21 #ifndef GRPPI_NATIVE_FARM_H 22 #define GRPPI_NATIVE_FARM_H 29 #include <experimental/optional> 50 template <
typename Generator,
typename Consumer>
55 using result_type =
typename result_of<Generator()>::type;
60 tasks.emplace_back([&](){
63 auto item{queue.pop()};
73 auto item{generate_op()};
78 for (
auto && t : tasks) { t.join(); }
92 template <
typename Generator,
typename Transformer,
typename Consumer>
94 Transformer transform_op , Consumer consume_op)
97 using namespace experimental;
98 using generated_type =
typename result_of<Generator()>::type;
99 using generated_value_type =
typename generated_type::value_type;
100 using transformed_value_type =
101 typename result_of<Transformer(generated_value_type)>::type;
102 using transformed_type = optional<transformed_value_type>;
104 auto generated_queue = ex.
make_queue<generated_type>();
105 auto transformed_queue = ex.
make_queue<transformed_type>();
107 atomic<int> done_threads(0);
108 vector<thread> tasks;
111 tasks.emplace_back([&](){
114 auto item{generated_queue.pop()};
116 transformed_queue.push(transformed_type{transform_op(*item)});
117 item = generated_queue.pop();
119 generated_queue.push(item);
122 transformed_queue.push(transformed_type{});
127 tasks.emplace_back([&](){
130 auto item{transformed_queue.pop()};
132 consume_op( item.value() );
133 item = transformed_queue.pop( );
138 auto item{generate_op()};
139 generated_queue.push(item);
143 for (
auto && t : tasks) { t.join(); }
Definition: callable_traits.h:24
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225