GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
ordered_stream_reduce.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_FF_DETAIL_ORDERED_STREAM_REDUCE_H
22 #define GRPPI_FF_DETAIL_ORDERED_STREAM_REDUCE_H
23 
24 #include "reduce_nodes.h"
25 
26 #include <ff/farm.hpp>
27 
28 
29 namespace grppi {
30 
31 namespace detail_ff {
32 
36 template <typename Item, typename Reducer, typename CombineOp>
37 class ordered_stream_reduce : public ff::ff_ofarm {
38 public:
39  ordered_stream_reduce(Reducer && red_obj, int num_workers);
40 
41 private:
42  std::vector<ff_node*> workers_;
43 
45  std::unique_ptr<emitter_type> p_emitter_;
46 };
47 
48 template <typename Item, typename Reducer, typename Combiner>
50  Reducer && red_obj,
51  int num_workers)
52 :
53  ff::ff_ofarm{false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES, true, num_workers},
54  workers_{},
55  p_emitter_{std::make_unique<emitter_type>(red_obj.window_size(), red_obj.offset())}
56 {
57  for(int i=0; i<num_workers; ++i) {
58  reduce_worker<Item,Combiner> * p_worker =
59  new reduce_worker<Item,Combiner>{red_obj.combiner()};
60  workers_.push_back(p_worker);
61  }
62 
63  add_workers(workers_);
64  setEmitterF(p_emitter_.get());
65 }
66 
67 
68 } // namespace detail_ff
69 
70 } // namespace grppi
71 
72 #endif
Definition: callable_traits.h:26
Reduce worker.
Definition: reduce_nodes.h:120
Reduce emitter.
Definition: reduce_nodes.h:38
Ordered stream reduce for FastFlow.
Definition: ordered_stream_reduce.h:37
ordered_stream_reduce(Reducer &&red_obj, int num_workers)
Definition: ordered_stream_reduce.h:49