GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
omp/stream_filter.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_OMP_STREAM_FILTER_H
22 #define GRPPI_OMP_STREAM_FILTER_H
23 
24 #ifdef GRPPI_OMP
25 
26 #include "parallel_execution_omp.h"
27 
28 namespace grppi {
29 
51 template <typename Generator, typename Predicate, typename Consumer>
52 void keep(parallel_execution_omp & ex, Generator generate_op,
53  Predicate predicate_op, Consumer consume_op)
54 {
55  using namespace std;
56  using generated_type = typename result_of<Generator()>::type;
57  using item_type = pair<generated_type,long>;
58 
59  auto generated_queue = ex.make_queue<item_type>();
60  auto filtered_queue = ex.make_queue<item_type>();
61 
62  #pragma omp parallel
63  {
64  #pragma omp single nowait
65  {
66  // Generate the task for the filter threads
67  for (int i=0; i< ex.concurrency_degree() - 1; i++) {
68  #pragma omp task shared(generated_queue, filtered_queue)
69  {
70  // Dequeue a pair element - order
71  auto item{generated_queue.pop()};
72  while (item.first) {
73  if (predicate_op(*item.first)) {
74  filtered_queue.push(item);
75  }
76  else {
77  filtered_queue.push(make_pair(generated_type{}, item.second));
78  }
79  item = generated_queue.pop();
80  }
81  // If is the last item
82  filtered_queue.push(make_pair(item.first, -1 ));
83  }
84  }
85 
86  // Generate the task for the consumer thread
87  #pragma omp task shared(filtered_queue)
88  {
89  int done_threads = 0;
90 
91  vector<item_type> item_buffer;
92  long order = 0;
93  // Dequeue an element
94  auto item{filtered_queue.pop()};
95  while (done_threads!=ex.concurrency_degree()-1) {
96  // If the item is an end of stream
97  if (!item.first && item.second == -1) {
98  done_threads++;
99  if(done_threads == ex.concurrency_degree() - 1) break;
100  }
101  else {
102  // If the element is the next to be consumed
103  if(order == item.second) {
104  if(item.first) {
105  consume_op(*item.first);
106  }
107  order++;
108  }
109  else {
110  item_buffer.push_back(item);
111  }
112  }
113 
114  // Search in the buffer for the next element
115  auto itrm = remove_if(begin(item_buffer), end(item_buffer),
116  [&order](auto & item) {
117  bool res = item.second == order;
118  if (res) order++;
119  return res;
120  }
121  );
122  for_each (itrm, end(item_buffer),
123  [&consume_op](auto & item) {
124  if (item.first) { consume_op(*item.first); }
125  }
126  );
127  item_buffer.erase(itrm, end(item_buffer));
128 
129  item = filtered_queue.pop();
130  }
131  // Consume the last elements
132  for (;;) {
133  auto it_find = find_if(begin(item_buffer), end(item_buffer),
134  [order](auto & item) { return item.second == order; });
135  if (it_find == end(item_buffer)) break;
136  if (it_find->first) {
137  consume_op(*it_find->first);
138  }
139  item_buffer.erase(it_find);
140  order++;
141  }
142  }
143 
144  // Main thread acts as generator
145  long order = 0;
146  for (;;) {
147  auto item{generate_op()};
148  generated_queue.push(make_pair(item,order));
149  order++;
150  if (!item) {
151  for (int i = 0; i< ex.concurrency_degree()-1; i++) {
152  generated_queue.push(make_pair(item,-1));
153  }
154  break;
155  }
156  }
157 
158  #pragma omp taskwait
159  }
160  }
161 }
162 
176 template <typename Generator, typename Predicate, typename Consumer>
177 void discard(parallel_execution_omp & ex, Generator generate_op,
178  Predicate predicate_op, Consumer consume_op)
179 {
180  keep(ex,
181  std::forward<Generator>(generate_op),
182  [&](auto val) { return !predicate_op(val); },
183  std::forward<Consumer>(consume_op)
184  );
185 }
186 
187 }
188 
189 #endif
190 
191 #endif
Definition: callable_traits.h:24
STL namespace.
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
void discard(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern on a data sequence with sequential execution policy. This function discards fro...
Definition: native/stream_filter.h:177
void keep(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern pattern on a data sequence with sequential execution policy. This function keeps in the stream only those items that satisfy the predicate.
Definition: native/stream_filter.h:50
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_omp.h:85