21 #ifndef GRPPI_OMP_STREAM_FILTER_H 22 #define GRPPI_OMP_STREAM_FILTER_H 51 template <
typename Generator,
typename Predicate,
typename Consumer>
53 Predicate predicate_op, Consumer consume_op)
56 using generated_type =
typename result_of<Generator()>::type;
57 using item_type = pair<generated_type,long>;
59 auto generated_queue = ex.
make_queue<item_type>();
60 auto filtered_queue = ex.
make_queue<item_type>();
64 #pragma omp single nowait 68 #pragma omp task shared(generated_queue, filtered_queue) 71 auto item{generated_queue.pop()};
73 if (predicate_op(*item.first)) {
74 filtered_queue.push(item);
77 filtered_queue.push(make_pair(generated_type{}, item.second));
79 item = generated_queue.pop();
82 filtered_queue.push(make_pair(item.first, -1 ));
87 #pragma omp task shared(filtered_queue) 91 vector<item_type> item_buffer;
94 auto item{filtered_queue.pop()};
97 if (!item.first && item.second == -1) {
103 if(order == item.second) {
105 consume_op(*item.first);
110 item_buffer.push_back(item);
115 auto itrm = remove_if(begin(item_buffer), end(item_buffer),
116 [&order](
auto & item) {
117 bool res = item.second == order;
122 for_each (itrm, end(item_buffer),
123 [&consume_op](
auto & item) {
124 if (item.first) { consume_op(*item.first); }
127 item_buffer.erase(itrm, end(item_buffer));
129 item = filtered_queue.pop();
133 auto it_find = find_if(begin(item_buffer), end(item_buffer),
134 [order](
auto & item) {
return item.second == order; });
135 if (it_find == end(item_buffer))
break;
136 if (it_find->first) {
137 consume_op(*it_find->first);
139 item_buffer.erase(it_find);
147 auto item{generate_op()};
148 generated_queue.push(make_pair(item,order));
152 generated_queue.push(make_pair(item,-1));
176 template <
typename Generator,
typename Predicate,
typename Consumer>
178 Predicate predicate_op, Consumer consume_op)
181 std::forward<Generator>(generate_op),
182 [&](
auto val) {
return !predicate_op(val); },
183 std::forward<Consumer>(consume_op)
Definition: callable_traits.h:24
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T.
Definition: parallel_execution_omp.h:119
OpenMP parallel execution policy.
Definition: parallel_execution_omp.h:40
void discard(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern on a data sequence with sequential execution policy. This function discards fro...
Definition: native/stream_filter.h:177
void keep(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern pattern on a data sequence with sequential execution policy. This function keeps in the stream only those items that satisfy the predicate.
Definition: native/stream_filter.h:50
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_omp.h:85