21 #ifndef GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H 22 #define GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H 26 #include <ff/farm.hpp> 32 template <
typename Item,
typename Reducer,
typename Combiner>
39 std::vector<ff::ff_node*> workers_;
42 std::unique_ptr<emitter_type> p_emitter_;
44 std::unique_ptr<reduce_collector> p_collector_;
47 template <
typename Item,
typename Reducer,
typename Combiner>
52 ff_farm<>{
false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES,
true,
53 static_cast<std::size_t
>(num_workers)},
55 p_emitter_{std::make_unique<emitter_type>(reducer.window_size(), reducer.offset())},
56 p_collector_{std::make_unique<reduce_collector>()}
58 for (
int i=0; i<num_workers; ++i) {
60 std::forward<Combiner>(reducer.combiner())});
62 this->add_workers(workers_);
63 this->add_emitter(p_emitter_.get());
64 this->add_collector(p_collector_.get());
Definition: callable_traits.h:26
Reduce worker.
Definition: reduce_nodes.h:120
unordered_stream_reduce(Reducer &&reducer, int num_workers)
Definition: unordered_stream_reduce.h:48
Reduce emitter.
Definition: reduce_nodes.h:38
Definition: unordered_stream_reduce.h:33