GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
ordered_stream_reduce.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_FF_DETAIL_ORDERED_STREAM_REDUCE_H
17 #define GRPPI_FF_DETAIL_ORDERED_STREAM_REDUCE_H
18 
19 #include "reduce_nodes.h"
20 
21 #include <ff/farm.hpp>
22 
23 
24 namespace grppi {
25 
26 namespace detail_ff {
27 
31 template <typename Item, typename Reducer, typename CombineOp>
32 class ordered_stream_reduce : public ff::ff_ofarm {
33 public:
34  ordered_stream_reduce(Reducer && red_obj, int num_workers);
35 
36 private:
37  std::vector<ff_node*> workers_;
38 
40  std::unique_ptr<emitter_type> p_emitter_;
41 };
42 
43 template <typename Item, typename Reducer, typename Combiner>
45  Reducer && red_obj,
46  int num_workers)
47 :
48  ff::ff_ofarm{false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES, true, num_workers},
49  workers_{},
50  p_emitter_{std::make_unique<emitter_type>(red_obj.window_size(), red_obj.offset())}
51 {
52  for(int i=0; i<num_workers; ++i) {
53  reduce_worker<Item,Combiner> * p_worker =
54  new reduce_worker<Item,Combiner>{red_obj.combiner()};
55  workers_.push_back(p_worker);
56  }
57 
58  add_workers(workers_);
59  setEmitterF(p_emitter_.get());
60 }
61 
62 
63 } // namespace detail_ff
64 
65 } // namespace grppi
66 
67 #endif
Ordered stream reduce for FastFlow.
Definition: ordered_stream_reduce.h:32
ordered_stream_reduce(Reducer &&red_obj, int num_workers)
Definition: ordered_stream_reduce.h:44
Reduce emitter.
Definition: reduce_nodes.h:33
Reduce worker.
Definition: reduce_nodes.h:115
Definition: callable_traits.h:21