GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
tbb/stream_filter.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_TBB_STREAM_FILTER_H
22 #define GRPPI_TBB_STREAM_FILTER_H
23 
24 #ifdef GRPPI_TBB
25 
26 #include "parallel_execution_tbb.h"
27 
28 #include <tbb/tbb.h>
29 
30 namespace grppi {
31 
57 template <typename Generator, typename Predicate, typename Consumer>
58 void keep(parallel_execution_tbb & ex, Generator generate_op,
59  Predicate predicate_op, Consumer consume_op)
60 {
61  using namespace std;
62  using generated_type = typename result_of<Generator()>::type;
63  using item_type = pair<generated_type,long>;
64 
65  auto generated_queue = ex.make_queue<item_type>();
66  auto filtered_queue =ex.make_queue<item_type>();
67 
68  //THREAD 1-(N-1) EXECUTE FILTER AND PUSH THE VALUE IF TRUE
69  tbb::task_group filterers;
70  for (int i=1; i< ex.concurrency_degree()-1; ++i) {
71  filterers.run([&](){
72  //dequeue a pair element - order
73  auto item = generated_queue.pop();
74  while (item.first) {
75  if (predicate_op(item.first.value())) {
76  filtered_queue.push(item);
77  }
78  else {
79  filtered_queue.push({{}, item.second});
80  }
81  item = generated_queue.pop();
82  }
83  //If is the last element
84  filtered_queue.push({item.first,-1});
85  });
86  }
87 
88  //LAST THREAD CALL FUNCTION OUT WITH THE FILTERED ELEMENTS
89  filterers.run([&](){
90  int done_tasks = 0;
91  vector<item_type> item_buffer;
92  long order = 0;
93  auto item{filtered_queue.pop()};
94  while (done_tasks!=ex.concurrency_degree()-1) {
95  //If is an end of stream element
96  if (!item.first && item.second==-1){
97  done_tasks++;
98  if (done_tasks==ex.concurrency_degree()-2) break;
99  }
100  else {
101  //If the element is the next one to be procesed
102  if (order==item.second) {
103  if(item.first) {
104  consume_op(*item.first);
105  }
106  order++;
107  }
108  else {
109  //If the incoming element is disordered
110  item_buffer.push_back(item);
111  }
112  }
113  //Search in the vector for next elements
114  // TODO: find+erase
115  for (auto it=item_buffer.begin(); it<item_buffer.end(); ++it) {
116  if(it->second==order) {
117  if (it->first) {
118  consume_op((*it).first.value());
119  }
120  item_buffer.erase(it);
121  order++;
122  }
123  }
124  item = filtered_queue.pop();
125  }
126  while (item_buffer.size()>0) {
127  // TODO: find+erase
128  for (auto it=item_buffer.begin(); it<item_buffer.end(); ++it) {
129  if(it->second==order) {
130  if(it->first) {
131  consume_op(it->first.value());
132  }
133  item_buffer.erase(it);
134  order++;
135  }
136  }
137  }
138 
139  });
140 
141  //THREAD 0 ENQUEUE ELEMENTS
142  long order = 0;
143  for (;;) {
144  auto item = generate_op();
145  generated_queue.push(make_pair(item,order));
146  order++;
147  if (!item) {
148  for (int i=0; i<ex.concurrency_degree()-2; ++i) {
149  generated_queue.push({item,-1});
150  }
151  break;
152  }
153  }
154 
155  filterers.wait();
156 }
157 
172 template <typename Generator, typename Predicate, typename Consumer>
173 void discard(parallel_execution_tbb & ex, Generator generate_op,
174  Predicate predicate_op, Consumer consume_op)
175 {
176  keep(ex,
177  std::forward<Generator>(generate_op),
178  [&](auto val) { return !predicate_op(val); },
179  std::forward<Consumer>(consume_op)
180  );
181 }
182 
183 }
184 
185 #endif
186 
187 #endif
Definition: callable_traits.h:24
STL namespace.
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_tbb.h:105
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:37
void discard(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern on a data sequence with sequential execution policy. This function discards fro...
Definition: native/stream_filter.h:177
void keep(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern pattern on a data sequence with sequential execution policy. This function keeps in the stream only those items that satisfy the predicate.
Definition: native/stream_filter.h:50
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_tbb.h:73