GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
reduce_nodes.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_FF_DETAIL_REDUCE_NODES_H
22 #define GRPPI_FF_DETAIL_REDUCE_NODES_H
23 
24 #include "fastflow_allocator.h"
25 #include "../../reduce.h"
26 
27 #include <ff/allocator.hpp>
28 #include <ff/node.hpp>
29 
30 namespace grppi {
31 
32 namespace detail_ff {
33 
37 template <typename Item, typename Reducer>
38 class reduce_emitter : public ff::ff_node {
39 public:
40  reduce_emitter(int window_size, int offset);
41  void * svc(void * p_value);
42 
43 private:
44  void advance_large_offset(Item * p_item);
45  void advance_small_offset();
46 
47 private:
48  int window_size_;
49  int offset_;
50  int skip_;
51  std::vector<Item> items_;
52 };
53 
54 template <typename Item, typename Reducer>
55 reduce_emitter<Item,Reducer>::reduce_emitter(int window_size, int offset) :
56  window_size_{window_size},
57  offset_{offset},
58  skip_{-1},
59  items_{}
60 {
61  items_.reserve(window_size);
62 }
63 
64 template <typename Item, typename Reducer>
65 void * reduce_emitter<Item,Reducer>::svc(void * p_value)
66 {
67  Item * p_item = static_cast<Item*>(p_value);
68 
69  if(items_.size() != window_size_)
70  items_.push_back(*p_item);
71 
72  if(items_.size() == window_size_) {
73  if(offset_ > window_size_) {
74  advance_large_offset(p_item);
75  }
76  else {
77  advance_small_offset();
78  }
79  }
80 
81  operator delete(p_item, ff_arena);
82  return GO_ON;
83 }
84 
85 template <typename Item, typename Reducer>
87 {
88  if (skip_==-1) {
89  auto * p_items_to_send = new std::vector<Item>{items_};
90  ff_send_out(p_items_to_send);
91  skip_++;
92  }
93  else if (skip_ == (offset_ - window_size_)) {
94  skip_ = -1;
95  items_.clear();
96  items_.push_back(*p_item);
97  }
98  else {
99  skip_++;
100  }
101 }
102 
103 template <typename Item, typename Reducer>
105 {
106  auto * p_items_to_send = new std::vector<Item>{
107  std::make_move_iterator(items_.begin()),
108  std::make_move_iterator(items_.end())};
109  auto it_last = (offset_ < window_size_) ?
110  std::next(items_.begin(), offset_) :
111  items_.end();
112  items_.erase(items_.begin(), it_last);
113  ff_send_out(p_items_to_send);
114 }
115 
119 template <typename Item, typename Combiner>
120 class reduce_worker : public ff::ff_node {
121 public:
122 
123  reduce_worker(Combiner && combine_op) : combine_op_{combine_op} {}
124  void * svc(void * p_value);
125 
126 private:
127  Combiner combine_op_;
128 };
129 
130 template <typename Item, typename Combiner>
131 void * reduce_worker<Item,Combiner>::svc(void * p_value) {
132  std::vector<Item> * p_items = static_cast<std::vector<Item>*>(p_value);
133 
134  Item identity{};
135  constexpr ::grppi::sequential_execution seq{};
136  Item * p_result = new (ff_arena) Item{
137  ::grppi::reduce(seq, p_items->begin(), p_items->end(),
138  identity, combine_op_)
139  };
140 
141  delete p_items;
142  return p_result;
143 }
144 
148 class reduce_collector : public ff::ff_node {
149 public:
150  reduce_collector() = default;
151 
152  void * svc(void * p_value) { return p_value; }
153 };
154 
155 
156 } // namespace detail_ff
157 
158 } // namespace grppi
159 
160 #endif
Definition: callable_traits.h:26
Reduce worker.
Definition: reduce_nodes.h:120
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:55
reduce_worker(Combiner &&combine_op)
Definition: reduce_nodes.h:123
constexpr ff_arena_t ff_arena
Fastflow arena object. This object will be passed to placement new/delete to use FastFlow allocation ...
Definition: fastflow_allocator.h:41
Reduce collector.
Definition: reduce_nodes.h:148
void * svc(void *p_value)
Definition: reduce_nodes.h:131
void * svc(void *p_value)
Definition: reduce_nodes.h:65
reduce_emitter(int window_size, int offset)
Definition: reduce_nodes.h:55
Reduce emitter.
Definition: reduce_nodes.h:38
void * svc(void *p_value)
Definition: reduce_nodes.h:152