21 #ifndef GRPPI_FF_DETAIL_REDUCE_NODES_H 22 #define GRPPI_FF_DETAIL_REDUCE_NODES_H 25 #include "../../reduce.h" 27 #include <ff/allocator.hpp> 28 #include <ff/node.hpp> 37 template <
typename Item,
typename Reducer>
41 void *
svc(
void * p_value);
44 void advance_large_offset(Item * p_item);
45 void advance_small_offset();
51 std::vector<Item> items_;
54 template <
typename Item,
typename Reducer>
56 window_size_{window_size},
61 items_.reserve(window_size);
64 template <
typename Item,
typename Reducer>
67 Item * p_item =
static_cast<Item*
>(p_value);
69 if(items_.size() != window_size_)
70 items_.push_back(*p_item);
72 if(items_.size() == window_size_) {
73 if(offset_ > window_size_) {
74 advance_large_offset(p_item);
77 advance_small_offset();
85 template <
typename Item,
typename Reducer>
89 auto * p_items_to_send =
new std::vector<Item>{items_};
90 ff_send_out(p_items_to_send);
93 else if (skip_ == (offset_ - window_size_)) {
96 items_.push_back(*p_item);
103 template <
typename Item,
typename Reducer>
106 auto * p_items_to_send =
new std::vector<Item>{
107 std::make_move_iterator(items_.begin()),
108 std::make_move_iterator(items_.end())};
109 auto it_last = (offset_ < window_size_) ?
110 std::next(items_.begin(), offset_) :
112 items_.erase(items_.begin(), it_last);
113 ff_send_out(p_items_to_send);
119 template <
typename Item,
typename Combiner>
124 void *
svc(
void * p_value);
127 Combiner combine_op_;
130 template <
typename Item,
typename Combiner>
132 std::vector<Item> * p_items =
static_cast<std::vector<Item>*
>(p_value);
135 constexpr ::grppi::sequential_execution seq{};
136 Item * p_result =
new (
ff_arena) Item{
138 identity, combine_op_)
152 void *
svc(
void * p_value) {
return p_value; }
Definition: callable_traits.h:26
Reduce worker.
Definition: reduce_nodes.h:120
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:55
reduce_worker(Combiner &&combine_op)
Definition: reduce_nodes.h:123
constexpr ff_arena_t ff_arena
Fastflow arena object. This object will be passed to placement new/delete to use FastFlow allocation ...
Definition: fastflow_allocator.h:41
Reduce collector.
Definition: reduce_nodes.h:148
void * svc(void *p_value)
Definition: reduce_nodes.h:131
void * svc(void *p_value)
Definition: reduce_nodes.h:65
reduce_emitter(int window_size, int offset)
Definition: reduce_nodes.h:55
Reduce emitter.
Definition: reduce_nodes.h:38
void * svc(void *p_value)
Definition: reduce_nodes.h:152