GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
omp/farm.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_OMP_FARM_H
22 #define GRPPI_OMP_FARM_H
23 
24 #ifdef GRPPI_OMP
25 
26 #include "parallel_execution_omp.h"
27 
28 #include <experimental/optional>
29 
30 namespace grppi {
31 
49 template <typename Generator, typename Consumer>
50 void farm(parallel_execution_omp & ex, Generator generate_op,
51  Consumer consume_op)
52 {
53  using namespace std;
54  using result_type = typename result_of<Generator()>::type;
55  auto queue = ex.make_queue<result_type>();
56 
57  #pragma omp parallel
58  {
59  #pragma omp single nowait
60  {
61  for (int i = 0; i<ex.concurrency_degree(); i++) {
62  #pragma omp task shared(queue)
63  {
64  auto item{queue.pop()};
65  while (item) {
66  consume_op(*item);
67  item = queue.pop() ;
68  }
69  }
70  }
71 
72  for (;;) {
73  auto item = generate_op();
74  queue.push(item) ;
75  if (!item) {
76  for (int i=1; i<ex.concurrency_degree(); ++i) {
77  queue.push(item);
78  }
79  break;
80  }
81  }
82 
83  #pragma omp taskwait
84  }
85  }
86 }
87 
99 template <typename Generator, typename Transformer, typename Consumer>
100 void farm(parallel_execution_omp & ex, Generator generate_op,
101  Transformer transform_op , Consumer consume_op)
102 {
103  using namespace std;
104  using namespace experimental;
105  using result_type = typename result_of<Generator()>::type;
106  using result_value_type = typename result_type::value_type;
107  using transformed_value_type =
108  typename result_of<Transformer(result_value_type)>::type;
109  using transformed_type = optional<transformed_value_type>;
110 
111  auto generated_queue = ex.make_queue<result_type>();
112  auto transformed_queue = ex.make_queue<transformed_type>();
113  atomic<int> done_threads{0};
114 
115  #pragma omp parallel
116  {
117  #pragma omp single nowait
118  {
119  for (int i=0; i<ex.concurrency_degree(); ++i) {
120  #pragma omp task shared(generated_queue, transformed_queue, transform_op)
121  {
122  auto item{generated_queue.pop()};
123  while (item) {
124  transformed_queue.push(transformed_type{transform_op(*item)});
125  item = generated_queue.pop( ) ;
126  }
127  generated_queue.push(item);
128  done_threads++;
129  if (done_threads == ex.concurrency_degree())
130  transformed_queue.push(transformed_type{});
131  }
132  }
133 
134  #pragma omp task shared(transformed_queue,consume_op)
135  {
136  auto item{transformed_queue.pop()};
137  while (item) {
138  consume_op( item.value() );
139  item = transformed_queue.pop( );
140  }
141  }
142 
143  for (;;) {
144  auto item = generate_op();
145  generated_queue.push(item);
146  if (!item) break;
147  }
148 
149  #pragma omp taskwait
150  }
151  }
152 }
153 
159 }
160 #endif
161 
162 #endif
Definition: callable_traits.h:24
STL namespace.
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_omp.h:85