GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
unordered_stream_reduce.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H
17 #define GRPPI_FF_DETAIL_UNORDERED_STREAM_REDUCE_H
18 
19 #include "reduce_nodes.h"
20 
21 #include <ff/farm.hpp>
22 
23 namespace grppi {
24 
25 namespace detail_ff {
26 
27 template <typename Item, typename Reducer, typename Combiner>
28 class unordered_stream_reduce : public ff::ff_farm<> {
29 public:
30 
31  unordered_stream_reduce(Reducer && reducer, int num_workers);
32 
33 private:
34  std::vector<ff::ff_node*> workers_;
35 
37  std::unique_ptr<emitter_type> p_emitter_;
38 
39  std::unique_ptr<reduce_collector> p_collector_;
40 };
41 
42 template <typename Item, typename Reducer, typename Combiner>
44  Reducer && reducer,
45  int num_workers)
46 :
47  ff_farm<>{false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES, true,
48  static_cast<std::size_t>(num_workers)},
49  workers_{},
50  p_emitter_{std::make_unique<emitter_type>(reducer.window_size(), reducer.offset())},
51  p_collector_{std::make_unique<reduce_collector>()}
52 {
53  for (int i=0; i<num_workers; ++i) {
54  workers.push_back(new reduce_worker<Item,Combiner>{
55  std::forward<Combiner>(reducer.combiner())});
56  }
57  this->add_workers(workers_);
58  this->add_emitter(p_emitter_.get());
59  this->add_collector(p_collector_.get());
60  }
61 
62 } // namespace detail_ff
63 
64 } // namespace grppi
65 
66 #endif
Reduce emitter.
Definition: reduce_nodes.h:33
Definition: unordered_stream_reduce.h:28
unordered_stream_reduce(Reducer &&reducer, int num_workers)
Definition: unordered_stream_reduce.h:43
Definition: callable_traits.h:21