GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/farm.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_FARM_H
22 #define GRPPI_NATIVE_FARM_H
23 
25 
26 #include <thread>
27 #include <utility>
28 #include <memory>
29 #include <experimental/optional>
30 
31 namespace grppi {
32 
50 template <typename Generator, typename Consumer>
51 void farm(parallel_execution_native & ex, Generator generate_op,
52  Consumer consume_op)
53 {
54  using namespace std;
55  using result_type = typename result_of<Generator()>::type;
56  auto queue = ex.make_queue<result_type>();
57 
58  vector<thread> tasks;
59  for (int i=0; i<ex.concurrency_degree(); ++i) {
60  tasks.emplace_back([&](){
61  auto manager = ex.thread_manager();
62 
63  auto item{queue.pop()};
64  while(item) {
65  consume_op(*item);
66  item = queue.pop();
67  }
68  queue.push(item);
69  });
70  }
71 
72  for (;;) {
73  auto item{generate_op()};
74  queue.push(item);
75  if (!item) break;
76  }
77 
78  for (auto && t : tasks) { t.join(); }
79 }
80 
92 template <typename Generator, typename Transformer, typename Consumer>
93 void farm(parallel_execution_native & ex, Generator generate_op,
94  Transformer transform_op , Consumer consume_op)
95 {
96  using namespace std;
97  using namespace experimental;
98  using generated_type = typename result_of<Generator()>::type;
99  using generated_value_type = typename generated_type::value_type;
100  using transformed_value_type =
101  typename result_of<Transformer(generated_value_type)>::type;
102  using transformed_type = optional<transformed_value_type>;
103 
104  auto generated_queue = ex.make_queue<generated_type>();
105  auto transformed_queue = ex.make_queue<transformed_type>();
106 
107  atomic<int> done_threads(0);
108  vector<thread> tasks;
109 
110  for (int i=0; i<ex.concurrency_degree(); ++i) {
111  tasks.emplace_back([&](){
112  auto manager = ex.thread_manager();
113 
114  auto item{generated_queue.pop()};
115  while (item) {
116  transformed_queue.push(transformed_type{transform_op(*item)});
117  item = generated_queue.pop();
118  }
119  generated_queue.push(item);
120  done_threads++;
121  if (done_threads==ex.concurrency_degree()) {
122  transformed_queue.push(transformed_type{});
123  }
124  });
125  }
126 
127  tasks.emplace_back([&](){
128  auto manager = ex.thread_manager();
129 
130  auto item{transformed_queue.pop()};
131  while (item) {
132  consume_op( item.value() );
133  item = transformed_queue.pop( );
134  }
135  });
136 
137  for (;;) {
138  auto item{generate_op()};
139  generated_queue.push(item);
140  if(!item) break;
141  }
142 
143  for (auto && t : tasks) { t.join(); }
144 }
145 
146 }
147 
148 #endif
Definition: callable_traits.h:24
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
STL namespace.
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
auto farm(Execution &ex, Transformer &&transform_op)
Invoke Farm pattern on a data stream that can be composed in other streaming patterns.
Definition: farm.h:51
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225