GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/stream_filter.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_STREAM_FILTER_H
22 #define GRPPI_NATIVE_STREAM_FILTER_H
23 
25 
26 namespace grppi {
27 
49 template <typename Generator, typename Predicate, typename Consumer>
50 void keep(parallel_execution_native & ex, Generator generate_op,
51  Predicate predicate_op, Consumer consume_op)
52 {
53  using namespace std;
54  using generated_type = typename result_of<Generator()>::type;
55  using item_type = pair<generated_type,long>;
56 
57  auto generated_queue = ex.make_queue<item_type>();
58  auto filtered_queue = ex.make_queue<item_type>();
59 
60  //THREAD 1-(N-1) EXECUTE FILTER AND PUSH THE VALUE IF TRUE
61  vector<thread> tasks;
62  for (int i=0; i<ex.concurrency_degree()-1; ++i) {
63  tasks.emplace_back([&](){
64  auto manager = ex.thread_manager();
65 
66  // queue a pair element - order
67  auto item{generated_queue.pop()};
68 
69  while (item.first) {
70  if(predicate_op(*item.first)) {
71  filtered_queue.push(item);
72  }
73  else {
74  filtered_queue.push(make_pair(generated_type{}, item.second));
75  }
76  item = generated_queue.pop();
77  }
78  //If is the last element
79  filtered_queue.push(make_pair(item.first, -1 ));
80  });
81  }
82 
83  //LAST THREAD CALL FUNCTION OUT WITH THE FILTERED ELEMENTS
84  thread consumer([&](){
85  auto manager = ex.thread_manager();
86 
87  int done_threads = 0;
88 
89  vector<item_type> item_buffer;
90  long order = 0;
91 
92  // queue an element
93  auto item{filtered_queue.pop()};
94  while (done_threads != ex.concurrency_degree()-1) {
95  //If is an end of stream element
96  if (!item.first && item.second==-1) {
97  done_threads++;
98  if (done_threads==ex.concurrency_degree()-1) break;
99  }
100  //If there is not an end element
101  else {
102  //If the element is the next one to be procesed
103  if (order==item.second) {
104  if (item.first) {
105  consume_op(*item.first);
106  }
107  order++;
108  }
109  else {
110  //If the incoming element is out of order
111  item_buffer.push_back(item);
112  }
113  }
114 
115  //Search in the buffer for next elements
116  auto itrm = remove_if(begin(item_buffer), end(item_buffer),
117  [&order](auto & item) {
118  bool res = item.second == order;
119  if (res) order++;
120  return res;
121  }
122  );
123  for_each (itrm, end(item_buffer),
124  [&consume_op](auto & item) {
125  if (item.first) { consume_op(*item.first); }
126  }
127  );
128  item_buffer.erase(itrm, end(item_buffer));
129 
130  item = filtered_queue.pop();
131  }
132 
133  for (;;) {
134  auto it_find = find_if(begin(item_buffer), end(item_buffer),
135  [order](auto & item) { return item.second == order; });
136  if (it_find == end(item_buffer)) break;
137  if (it_find->first) {
138  consume_op(*it_find->first);
139  }
140  item_buffer.erase(it_find);
141  order++;
142  }
143  });
144 
145  //THREAD 0 ENQUEUE ELEMENTS
146  long order = 0;
147  for (;;) {
148  auto item = generate_op();
149  generated_queue.push(make_pair(item,order));
150  order++;
151  if(!item) {
152  for (int i=0; i<ex.concurrency_degree()-1; ++i) {
153  generated_queue.push(make_pair(item,-1));
154  }
155  break;
156  }
157  }
158 
159  for (auto && t : tasks) { t.join(); }
160  consumer.join();
161 }
162 
176 template <typename Generator, typename Predicate, typename Consumer>
177 void discard(parallel_execution_native & ex, Generator generate_op,
178  Predicate predicate_op, Consumer consume_op)
179 {
180  keep(ex,
181  std::forward<Generator>(generate_op),
182  [&](auto val) { return !predicate_op(val); },
183  std::forward<Consumer>(consume_op)
184  );
185 }
186 
187 }
188 
189 #endif
Definition: callable_traits.h:24
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
STL namespace.
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
void discard(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern on a data sequence with sequential execution policy. This function discards fro...
Definition: native/stream_filter.h:177
void keep(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern pattern on a data sequence with sequential execution policy. This function keeps in the stream only those items that satisfy the predicate.
Definition: native/stream_filter.h:50
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225