GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/mapreduce.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_MAPREDUCE_H
22 #define GRPPI_NATIVE_MAPREDUCE_H
23 
25 
26 namespace grppi {
27 
51 template <typename InputIt, typename Result, typename Transformer,
52  typename Combiner>
54  InputIt first, InputIt last, Result identity,
55  Transformer && transform_op, Combiner &&combine_op)
56 {
57  using namespace std;
58 
59  vector<Result> partial_results(ex.concurrency_degree());
60 
61  const int num_elements = last - first;
62  const int elements_per_thread = num_elements/ex.concurrency_degree();
64 
65  vector<thread> tasks;
66  for(int i=1;i<ex.concurrency_degree();i++){
67  const auto begin = first + (elements_per_thread * i);
68  const auto end = (i==ex.concurrency_degree()-1) ?
69  last :
70  (first + elements_per_thread * (i+1));
71 
72  tasks.emplace_back([&,begin,end,i](){
73  auto manager = ex.thread_manager();
74  partial_results[i] = map_reduce(seq, begin, end, partial_results[i],
75  forward<Transformer>(transform_op), forward<Combiner>(combine_op));
76  });
77  }
78 
79  partial_results[0] = map_reduce(seq,
80  first,( first+elements_per_thread ), partial_results[0],
81  forward<Transformer>(transform_op),
82  forward<Combiner>(combine_op));
83 
84  for (auto && t : tasks) { t.join(); }
85 
86  Result result = identity;
87  for (auto && p : partial_results) { result = combine_op(result, p); }
88 
89  return result;
90 }
91 
97 }
98 
99 #endif
Definition: callable_traits.h:24
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
STL namespace.
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
Result map_reduce(parallel_execution_native &ex, InputIt first, InputIt last, Result identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke Map/reduce pattern on a data sequence with native parallel execution.
Definition: native/mapreduce.h:53
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
Sequential execution policy.
Definition: sequential_execution.h:31