16 #ifndef GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H
17 #define GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H
21 #include <ff/farm.hpp>
27 template <
typename Item,
typename Reducer,
typename Combiner>
34 std::vector<ff::ff_node*> workers_;
37 std::unique_ptr<emitter_type> p_emitter_;
39 std::unique_ptr<reduce_collector> p_collector_;
42 template <
typename Item,
typename Reducer,
typename Combiner>
47 ff_farm<>{
false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES,
true,
48 static_cast<std::size_t
>(num_workers)},
50 p_emitter_{std::make_unique<emitter_type>(reducer.window_size(), reducer.offset())},
51 p_collector_{std::make_unique<reduce_collector>()}
53 for (
int i=0; i<num_workers; ++i) {
54 workers.push_back(
new reduce_worker<Item,Combiner>{
55 std::forward<Combiner>(reducer.combiner())});
57 this->add_workers(workers_);
58 this->add_emitter(p_emitter_.get());
59 this->add_collector(p_collector_.get());
Reduce emitter.
Definition: reduce_nodes.h:33
Definition: unordered_stream_reduce.h:28
unordered_stream_reduce(Reducer &&reducer, int num_workers)
Definition: unordered_stream_reduce.h:43
Definition: callable_traits.h:21