21 #ifndef GRPPI_TBB_FARM_H 22 #define GRPPI_TBB_FARM_H 28 #include <experimental/optional> 51 template <
typename Generator,
typename Consumer>
57 using generated_type =
typename result_of<Generator()>::type;
63 auto item{queue.pop()};
73 auto item{generate_op()};
98 template <
typename Generator,
typename Transformer,
typename Consumer>
100 Transformer transform_op, Consumer consume_op)
103 using namespace experimental;
104 using generated_type =
typename result_of<Generator()>::type;
105 using generated_value_type =
typename generated_type::value_type;
106 using transformed_value_type =
107 typename result_of<Transformer(generated_value_type)>::type;
108 using transformed_type = optional<transformed_value_type>;
110 auto generated_queue = ex.
make_queue<generated_type>();
111 auto transformed_queue = ex.
make_queue<transformed_type>();
113 atomic<int>done_threads{0};
114 tbb::task_group generators;
116 generators.run([&](){
117 auto item{generated_queue.pop()};
119 auto result = transform_op(*item);
120 transformed_queue.push(transformed_type{result});
121 item = generated_queue.pop();
125 transformed_queue.push(transformed_type{});
130 thread consumer_thread([&](){
131 auto item {transformed_queue.pop()};
134 item = transformed_queue.pop( );
140 auto item = generate_op();
141 generated_queue.push(item) ;
144 generated_queue.push(item) ;
151 consumer_thread.join();
Definition: callable_traits.h:24
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_tbb.h:105
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:37
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_tbb.h:73
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51