16 #ifndef GRPPI_FF_DETAIL_REDUCE_NODES_H
17 #define GRPPI_FF_DETAIL_REDUCE_NODES_H
20 #include "../../reduce.h"
22 #include <ff/allocator.hpp>
23 #include <ff/node.hpp>
32 template <
typename Item,
typename Reducer>
36 void *
svc(
void * p_value);
39 void advance_large_offset(Item * p_item);
40 void advance_small_offset();
46 std::vector<Item> items_;
49 template <
typename Item,
typename Reducer>
51 window_size_{window_size},
56 items_.reserve(window_size);
59 template <
typename Item,
typename Reducer>
62 Item * p_item =
static_cast<Item*
>(p_value);
64 if(
static_cast<int>(items_.size()) != window_size_)
65 items_.push_back(*p_item);
67 if(
static_cast<int>(items_.size()) == window_size_) {
68 if(offset_ > window_size_) {
69 advance_large_offset(p_item);
72 advance_small_offset();
80 template <
typename Item,
typename Reducer>
84 auto * p_items_to_send =
new std::vector<Item>{items_};
85 ff_send_out(p_items_to_send);
88 else if (skip_ == (offset_ - window_size_)) {
91 items_.push_back(*p_item);
98 template <
typename Item,
typename Reducer>
99 void reduce_emitter<Item,Reducer>::advance_small_offset()
101 auto * p_items_to_send =
new std::vector<Item>{
102 std::make_move_iterator(items_.begin()),
103 std::make_move_iterator(items_.end())};
104 auto it_last = (offset_ < window_size_) ?
105 std::next(items_.begin(), offset_) :
107 items_.erase(items_.begin(), it_last);
108 ff_send_out(p_items_to_send);
114 template <
typename Item,
typename Combiner>
119 void *
svc(
void * p_value);
122 Combiner combine_op_;
125 template <
typename Item,
typename Combiner>
127 std::vector<Item> * p_items =
static_cast<std::vector<Item>*
>(p_value);
130 constexpr ::grppi::sequential_execution seq{};
131 Item * p_result =
new (
ff_arena) Item{
133 identity, combine_op_)
147 void *
svc(
void * p_value) {
return p_value; }
Reduce collector.
Definition: reduce_nodes.h:143
void * svc(void *p_value)
Definition: reduce_nodes.h:147
reduce_collector()=default
Reduce emitter.
Definition: reduce_nodes.h:33
void * svc(void *p_value)
Definition: reduce_nodes.h:60
reduce_emitter(int window_size, int offset)
Definition: reduce_nodes.h:50
Reduce worker.
Definition: reduce_nodes.h:115
void * svc(void *p_value)
Definition: reduce_nodes.h:126
reduce_worker(Combiner &&combine_op)
Definition: reduce_nodes.h:118
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:50
constexpr ff_arena_t ff_arena
Fastflow arena object. This object will be passed to placement new/delete to use FastFlow allocation ...
Definition: fastflow_allocator.h:36
Definition: callable_traits.h:21