21 #ifndef GRPPI_TBB_STREAM_FILTER_H 22 #define GRPPI_TBB_STREAM_FILTER_H 57 template <
typename Generator,
typename Predicate,
typename Consumer>
59 Predicate predicate_op, Consumer consume_op)
62 using generated_type =
typename result_of<Generator()>::type;
63 using item_type = pair<generated_type,long>;
65 auto generated_queue = ex.
make_queue<item_type>();
66 auto filtered_queue =ex.
make_queue<item_type>();
69 tbb::task_group filterers;
73 auto item = generated_queue.pop();
75 if (predicate_op(item.first.value())) {
76 filtered_queue.push(item);
79 filtered_queue.push({{}, item.second});
81 item = generated_queue.pop();
84 filtered_queue.push({item.first,-1});
91 vector<item_type> item_buffer;
93 auto item{filtered_queue.pop()};
96 if (!item.first && item.second==-1){
102 if (order==item.second) {
104 consume_op(*item.first);
110 item_buffer.push_back(item);
115 for (
auto it=item_buffer.begin(); it<item_buffer.end(); ++it) {
116 if(it->second==order) {
118 consume_op((*it).first.value());
120 item_buffer.erase(it);
124 item = filtered_queue.pop();
126 while (item_buffer.size()>0) {
128 for (
auto it=item_buffer.begin(); it<item_buffer.end(); ++it) {
129 if(it->second==order) {
131 consume_op(it->first.value());
133 item_buffer.erase(it);
144 auto item = generate_op();
145 generated_queue.push(make_pair(item,order));
149 generated_queue.push({item,-1});
172 template <
typename Generator,
typename Predicate,
typename Consumer>
174 Predicate predicate_op, Consumer consume_op)
177 std::forward<Generator>(generate_op),
178 [&](
auto val) {
return !predicate_op(val); },
179 std::forward<Consumer>(consume_op)
Definition: callable_traits.h:24
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_tbb.h:105
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:37
void discard(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern on a data sequence with sequential execution policy. This function discards fro...
Definition: native/stream_filter.h:177
void keep(parallel_execution_native &ex, Generator generate_op, Predicate predicate_op, Consumer consume_op)
Invoke Filter pattern pattern on a data sequence with sequential execution policy. This function keeps in the stream only those items that satisfy the predicate.
Definition: native/stream_filter.h:50
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_tbb.h:73