21 #ifndef GRPPI_NATIVE_STREAM_FILTER_H 22 #define GRPPI_NATIVE_STREAM_FILTER_H 49 template <
typename Generator,
typename Predicate,
typename Consumer>
51 Predicate predicate_op, Consumer consume_op)
54 using generated_type =
typename result_of<Generator()>::type;
55 using item_type = pair<generated_type,long>;
57 auto generated_queue = ex.
make_queue<item_type>();
58 auto filtered_queue = ex.
make_queue<item_type>();
63 tasks.emplace_back([&](){
67 auto item{generated_queue.pop()};
70 if(predicate_op(*item.first)) {
71 filtered_queue.push(item);
74 filtered_queue.push(make_pair(generated_type{}, item.second));
76 item = generated_queue.pop();
79 filtered_queue.push(make_pair(item.first, -1 ));
84 thread consumer([&](){
89 vector<item_type> item_buffer;
93 auto item{filtered_queue.pop()};
96 if (!item.first && item.second==-1) {
103 if (order==item.second) {
105 consume_op(*item.first);
111 item_buffer.push_back(item);
116 auto itrm = remove_if(begin(item_buffer), end(item_buffer),
117 [&order](
auto & item) {
118 bool res = item.second == order;
123 for_each (itrm, end(item_buffer),
124 [&consume_op](
auto & item) {
125 if (item.first) { consume_op(*item.first); }
128 item_buffer.erase(itrm, end(item_buffer));
130 item = filtered_queue.pop();
134 auto it_find = find_if(begin(item_buffer), end(item_buffer),
135 [order](
auto & item) {
return item.second == order; });
136 if (it_find == end(item_buffer))
break;
137 if (it_find->first) {
138 consume_op(*it_find->first);
140 item_buffer.erase(it_find);
148 auto item = generate_op();
149 generated_queue.push(make_pair(item,order));
153 generated_queue.push(make_pair(item,-1));
159 for (
auto && t : tasks) { t.join(); }
176 template <
typename Generator,
typename Predicate,
typename Consumer>
178 Predicate predicate_op, Consumer consume_op)
181 std::forward<Generator>(generate_op),
182 [&](
auto val) {
return !predicate_op(val); },
183 std::forward<Consumer>(consume_op)
Definition: callable_traits.h:24
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
void discard(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern on a data sequence with sequential execution policy. This function discards fro...
Definition: native/stream_filter.h:177
void keep(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern pattern on a data sequence with sequential execution policy. This function keeps in the stream only those items that satisfy the predicate.
Definition: native/stream_filter.h:50
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225