GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
tbb/farm.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_TBB_FARM_H
22 #define GRPPI_TBB_FARM_H
23 
24 #ifdef GRPPI_TBB
25 
26 #include "parallel_execution_tbb.h"
27 
28 #include <experimental/optional>
29 
30 #include <tbb/tbb.h>
31 
32 namespace grppi {
33 
51 template <typename Generator, typename Consumer>
52 void farm(parallel_execution_tbb & ex, Generator generate_op,
53  Consumer consume_op)
54 {
55  using namespace std;
56 
57  using generated_type = typename result_of<Generator()>::type;
58  auto queue = ex.make_queue<generated_type>();
59 
60  tbb::task_group g;
61  for (int i=0; i<ex.concurrency_degree(); ++i) {
62  g.run([&](){
63  auto item{queue.pop()};
64  while (item) {
65  consume_op(*item);
66  item = queue.pop();
67  }
68  });
69  }
70 
71  //Generate elements
72  for (;;) {
73  auto item{generate_op()};
74  queue.push(item);
75  if (!item) {
76  for (int i=1; i<ex.concurrency_degree(); i++) {
77  queue.push(item);
78  }
79  break;
80  }
81  }
82 
83  //Join threads
84  g.wait();
85 }
86 
98 template <typename Generator, typename Transformer, typename Consumer>
99 void farm(parallel_execution_tbb & ex, Generator generate_op,
100  Transformer transform_op, Consumer consume_op)
101 {
102  using namespace std;
103  using namespace experimental;
104  using generated_type = typename result_of<Generator()>::type;
105  using generated_value_type = typename generated_type::value_type;
106  using transformed_value_type =
107  typename result_of<Transformer(generated_value_type)>::type;
108  using transformed_type = optional<transformed_value_type>;
109 
110  auto generated_queue = ex.make_queue<generated_type>();
111  auto transformed_queue = ex.make_queue<transformed_type>();
112 
113  atomic<int>done_threads{0};
114  tbb::task_group generators;
115  for (int i=0; i<ex.concurrency_degree(); ++i) {
116  generators.run([&](){
117  auto item{generated_queue.pop()};
118  while (item) {
119  auto result = transform_op(*item);
120  transformed_queue.push(transformed_type{result});
121  item = generated_queue.pop();
122  }
123  done_threads++;
124  if (done_threads==ex.concurrency_degree()) {
125  transformed_queue.push(transformed_type{});
126  }
127  });
128  }
129 
130  thread consumer_thread([&](){
131  auto item {transformed_queue.pop()};
132  while (item) {
133  consume_op(*item);
134  item = transformed_queue.pop( );
135  }
136  });
137 
138  //Generate elements
139  for (;;) {
140  auto item = generate_op();
141  generated_queue.push(item) ;
142  if(!item) {
143  for (int i=1; i<ex.concurrency_degree(); ++i) {
144  generated_queue.push(item) ;
145  }
146  break;
147  }
148  }
149 
150  generators.wait();
151  consumer_thread.join();
152 }
153 
159 }
160 #endif
161 
162 #endif
Definition: callable_traits.h:24
STL namespace.
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_tbb.h:105
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:37
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_tbb.h:73
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51