GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
unordered_stream_filter.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_FF_DETAIL_UNORDERED_STREAM_FILTER_H
22 #define GRPPI_FF_DETAIL_UNORDERED_STREAM_FILTER_H
23 
24 #include "filter_nodes.h"
25 
26 #include <ff/farm.hpp>
27 
28 namespace grppi {
29 
30 namespace detail_ff {
31 
32 template <typename Item, typename Filter>
33 class unordered_stream_filter : public ff::ff_ofarm {
34 public:
35  unordered_stream_filter(Filter && filter, int num_workers);
36 
37 private:
38  Filter filter_;
39  std::vector<ff::ff_node *> workers_;
40  std::unique_ptr<filter_emitter<Item>> p_emitter_;
41  std::unique_ptr<filter_collector<Item>> p_collector_;
42 };
43 
44 template <typename Item, typename Filter>
46  Filter && filter,
47  int num_workers)
48 :
49  ff::ff_ofarm{false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES, true, num_workers},
50  filter_{std::move(filter)},
51  workers_{},
52  p_emitter_{std::make_unique<filter_emitter<Item>>()},
53  p_collector_{std::make_unique<filter_collector<Item>>()}
54 {
55  for(int i=0;i<num_workers;++i) {
56  workers_.push_back(new filter_worker<Item,Filter>{
57  std::forward<Filter>(filter_)});
58  }
59  add_workers(workers_);
60  add_emitter(p_emitter_.get());
61  add_collector(p_collector_.get());
62 }
63 
64 } // namespace detail_ff
65 
66 } // namespace grppi
67 
68 
69 #endif
Definition: callable_traits.h:26
Worker that passes a value to next stage if the predicate is satisfied or the filtered_value constant...
Definition: filter_nodes.h:47
Definition: unordered_stream_filter.h:33
unordered_stream_filter(Filter &&filter, int num_workers)
Definition: unordered_stream_filter.h:45