GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
unordered_stream_reduce.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H
22 #define GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H
23 
24 #include "reduce_nodes.h"
25 
26 #include <ff/farm.hpp>
27 
28 namespace grppi {
29 
30 namespace detail_ff {
31 
32 template <typename Item, typename Reducer, typename Combiner>
33 class unordered_stream_reduce : public ff::ff_farm<> {
34 public:
35 
36  unordered_stream_reduce(Reducer && reducer, int num_workers);
37 
38 private:
39  std::vector<ff::ff_node*> workers_;
40 
42  std::unique_ptr<emitter_type> p_emitter_;
43 
44  std::unique_ptr<reduce_collector> p_collector_;
45 };
46 
47 template <typename Item, typename Reducer, typename Combiner>
49  Reducer && reducer,
50  int num_workers)
51 :
52  ff_farm<>{false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES, true,
53  static_cast<std::size_t>(num_workers)},
54  workers_{},
55  p_emitter_{std::make_unique<emitter_type>(reducer.window_size(), reducer.offset())},
56  p_collector_{std::make_unique<reduce_collector>()}
57 {
58  for (int i=0; i<num_workers; ++i) {
59  workers.push_back(new reduce_worker<Item,Combiner>{
60  std::forward<Combiner>(reducer.combiner())});
61  }
62  this->add_workers(workers_);
63  this->add_emitter(p_emitter_.get());
64  this->add_collector(p_collector_.get());
65  }
66 
67 } // namespace detail_ff
68 
69 } // namespace grppi
70 
71 #endif
Definition: callable_traits.h:26
Reduce worker.
Definition: reduce_nodes.h:120
unordered_stream_reduce(Reducer &&reducer, int num_workers)
Definition: unordered_stream_reduce.h:48
Reduce emitter.
Definition: reduce_nodes.h:38
Definition: unordered_stream_reduce.h:33