21 #ifndef GRPPI_OMP_FARM_H 22 #define GRPPI_OMP_FARM_H 28 #include <experimental/optional> 49 template <
typename Generator,
typename Consumer>
54 using result_type =
typename result_of<Generator()>::type;
59 #pragma omp single nowait 62 #pragma omp task shared(queue) 64 auto item{queue.pop()};
73 auto item = generate_op();
99 template <
typename Generator,
typename Transformer,
typename Consumer>
101 Transformer transform_op , Consumer consume_op)
104 using namespace experimental;
105 using result_type =
typename result_of<Generator()>::type;
106 using result_value_type =
typename result_type::value_type;
107 using transformed_value_type =
108 typename result_of<Transformer(result_value_type)>::type;
109 using transformed_type = optional<transformed_value_type>;
111 auto generated_queue = ex.
make_queue<result_type>();
112 auto transformed_queue = ex.
make_queue<transformed_type>();
113 atomic<int> done_threads{0};
117 #pragma omp single nowait 120 #pragma omp task shared(generated_queue, transformed_queue, transform_op) 122 auto item{generated_queue.pop()};
124 transformed_queue.push(transformed_type{transform_op(*item)});
125 item = generated_queue.pop( ) ;
127 generated_queue.push(item);
130 transformed_queue.push(transformed_type{});
134 #pragma omp task shared(transformed_queue,consume_op) 136 auto item{transformed_queue.pop()};
138 consume_op( item.value() );
139 item = transformed_queue.pop( );
144 auto item = generate_op();
145 generated_queue.push(item);
Definition: callable_traits.h:24
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_omp.h:85