GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
reduce_nodes.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_FF_DETAIL_REDUCE_NODES_H
17 #define GRPPI_FF_DETAIL_REDUCE_NODES_H
18 
19 #include "fastflow_allocator.h"
20 #include "../../reduce.h"
21 
22 #include <ff/allocator.hpp>
23 #include <ff/node.hpp>
24 
25 namespace grppi {
26 
27 namespace detail_ff {
28 
32 template <typename Item, typename Reducer>
33 class reduce_emitter : public ff::ff_node {
34 public:
35  reduce_emitter(int window_size, int offset);
36  void * svc(void * p_value);
37 
38 private:
39  void advance_large_offset(Item * p_item);
40  void advance_small_offset();
41 
42 private:
43  int window_size_;
44  int offset_;
45  int skip_;
46  std::vector<Item> items_;
47 };
48 
49 template <typename Item, typename Reducer>
50 reduce_emitter<Item,Reducer>::reduce_emitter(int window_size, int offset) :
51  window_size_{window_size},
52  offset_{offset},
53  skip_{-1},
54  items_{}
55 {
56  items_.reserve(window_size);
57 }
58 
59 template <typename Item, typename Reducer>
60 void * reduce_emitter<Item,Reducer>::svc(void * p_value)
61 {
62  Item * p_item = static_cast<Item*>(p_value);
63 
64  if(static_cast<int>(items_.size()) != window_size_)
65  items_.push_back(*p_item);
66 
67  if(static_cast<int>(items_.size()) == window_size_) {
68  if(offset_ > window_size_) {
69  advance_large_offset(p_item);
70  }
71  else {
72  advance_small_offset();
73  }
74  }
75 
76  operator delete(p_item, ff_arena);
77  return GO_ON;
78 }
79 
80 template <typename Item, typename Reducer>
82 {
83  if (skip_==-1) {
84  auto * p_items_to_send = new std::vector<Item>{items_};
85  ff_send_out(p_items_to_send);
86  skip_++;
87  }
88  else if (skip_ == (offset_ - window_size_)) {
89  skip_ = -1;
90  items_.clear();
91  items_.push_back(*p_item);
92  }
93  else {
94  skip_++;
95  }
96 }
97 
98 template <typename Item, typename Reducer>
99 void reduce_emitter<Item,Reducer>::advance_small_offset()
100 {
101  auto * p_items_to_send = new std::vector<Item>{
102  std::make_move_iterator(items_.begin()),
103  std::make_move_iterator(items_.end())};
104  auto it_last = (offset_ < window_size_) ?
105  std::next(items_.begin(), offset_) :
106  items_.end();
107  items_.erase(items_.begin(), it_last);
108  ff_send_out(p_items_to_send);
109 }
110 
114 template <typename Item, typename Combiner>
115 class reduce_worker : public ff::ff_node {
116 public:
117 
118  reduce_worker(Combiner && combine_op) : combine_op_{combine_op} {}
119  void * svc(void * p_value);
120 
121 private:
122  Combiner combine_op_;
123 };
124 
125 template <typename Item, typename Combiner>
126 void * reduce_worker<Item,Combiner>::svc(void * p_value) {
127  std::vector<Item> * p_items = static_cast<std::vector<Item>*>(p_value);
128 
129  Item identity{};
130  constexpr ::grppi::sequential_execution seq{};
131  Item * p_result = new (ff_arena) Item{
132  ::grppi::reduce(seq, p_items->begin(), p_items->end(),
133  identity, combine_op_)
134  };
135 
136  delete p_items;
137  return p_result;
138 }
139 
143 class reduce_collector : public ff::ff_node {
144 public:
145  reduce_collector() = default;
146 
147  void * svc(void * p_value) { return p_value; }
148 };
149 
150 
151 } // namespace detail_ff
152 
153 } // namespace grppi
154 
155 #endif
Reduce collector.
Definition: reduce_nodes.h:143
void * svc(void *p_value)
Definition: reduce_nodes.h:147
Reduce emitter.
Definition: reduce_nodes.h:33
void * svc(void *p_value)
Definition: reduce_nodes.h:60
reduce_emitter(int window_size, int offset)
Definition: reduce_nodes.h:50
Reduce worker.
Definition: reduce_nodes.h:115
void * svc(void *p_value)
Definition: reduce_nodes.h:126
reduce_worker(Combiner &&combine_op)
Definition: reduce_nodes.h:118
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:50
constexpr ff_arena_t ff_arena
Fastflow arena object. This object will be passed to placement new/delete to use FastFlow allocation ...
Definition: fastflow_allocator.h:36
Definition: callable_traits.h:21