GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
unordered_stream_filter.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_FF_DETAIL_UNORDERED_STREAM_FILTER_H
17 #define GRPPI_FF_DETAIL_UNORDERED_STREAM_FILTER_H
18 
19 #include "filter_nodes.h"
20 
21 #include <ff/farm.hpp>
22 
23 namespace grppi {
24 
25 namespace detail_ff {
26 
27 template <typename Item, typename Filter>
28 class unordered_stream_filter : public ff::ff_ofarm {
29 public:
30  unordered_stream_filter(Filter && filter, int num_workers);
31 
32 private:
33  Filter filter_;
34  std::vector<ff::ff_node *> workers_;
35  std::unique_ptr<filter_emitter<Item>> p_emitter_;
36  std::unique_ptr<filter_collector<Item>> p_collector_;
37 };
38 
39 template <typename Item, typename Filter>
41  Filter && filter,
42  int num_workers)
43 :
44  ff::ff_ofarm{false, DEF_IN_BUFF_ENTRIES, DEF_OUT_BUFF_ENTRIES, true, num_workers},
45  filter_{std::move(filter)},
46  workers_{},
47  p_emitter_{std::make_unique<filter_emitter<Item>>()},
48  p_collector_{std::make_unique<filter_collector<Item>>()}
49 {
50  for(int i=0;i<num_workers;++i) {
51  workers_.push_back(new filter_worker<Item,Filter>{
52  std::forward<Filter>(filter_)});
53  }
54  add_workers(workers_);
55  add_emitter(p_emitter_.get());
56  add_collector(p_collector_.get());
57 }
58 
59 } // namespace detail_ff
60 
61 } // namespace grppi
62 
63 
64 #endif
Collector node for a filter.
Definition: filter_nodes.h:67
Emitter for a filter stage.
Definition: filter_nodes.h:85
Worker that passes a value to next stage if the predicate is satisfied or the filtered_value constant...
Definition: filter_nodes.h:43
Definition: unordered_stream_filter.h:28
unordered_stream_filter(Filter &&filter, int num_workers)
Definition: unordered_stream_filter.h:40
Definition: callable_traits.h:21