GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
native/map.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_MAP_H
22 #define GRPPI_NATIVE_MAP_H
23 
25 #include "../common/iterator.h"
26 
27 namespace grppi {
28 
49 template <typename InputIt, typename OutputIt, typename Transformer>
51  InputIt first, InputIt last, OutputIt first_out,
52  Transformer && transf_op)
53 {
54  std::vector<std::thread> tasks;
55  int numElements = last - first;
56  int elemperthr = numElements / ex.concurrency_degree();
57 
58  for(int i=1;i<ex.concurrency_degree();i++){
59  auto begin = first + (elemperthr * i);
60  auto end = first + (elemperthr * (i+1));
61 
62  if(i == ex.concurrency_degree()-1 ) end= last;
63 
64  auto out = first_out + (elemperthr * i);
65  tasks.emplace_back([&](InputIt begin, InputIt end, OutputIt out) {
66  auto manager = ex.thread_manager();
67 
68  while (begin!=end) {
69  *out = transf_op(*begin);
70  begin++;
71  out++;
72  }
73  }, begin, end, out);
74  }
75  //Map main threads
76  auto end = first+elemperthr;
77  while(first!=end) {
78  *first_out = transf_op(*first);
79  first++;
80  first_out++;
81  }
82 
83  //Join threads
84  for(int i=0;i<ex.concurrency_degree()-1;i++){
85  tasks[i].join();
86  }
87 }
88 
103 template <typename InputIt, typename OutputIt, typename Transformer,
104  typename ... OtherInputIts>
106  InputIt first, InputIt last, OutputIt first_out,
107  Transformer && transf_op,
108  OtherInputIts ... more_inputs)
109 {
110  std::vector<std::thread> tasks;
111 
112  //Calculate number of elements per thread
113  int numElements = last - first;
114  int elemperthr = numElements / ex.concurrency_degree();
115 
116  //Create tasks
117  for(int i=1;i<ex.concurrency_degree();i++){
118  //Calculate local input and output iterator
119  auto begin = first + (elemperthr * i);
120  auto end = first + (elemperthr * (i+1));
121  if( i == ex.concurrency_degree()-1) end = last;
122  auto out = first_out + (elemperthr * i);
123  //Begin task
124  tasks.emplace_back([&](InputIt begin, InputIt end, OutputIt out,
125  int tid, int nelem, OtherInputIts ... more_inputs) {
126  auto manager = ex.thread_manager();
127  advance_iterators(nelem*tid, more_inputs ...);
128  while (begin!=end) {
129  *out = transf_op(*begin, *more_inputs ...);
130  advance_iterators(more_inputs ...);
131  begin++;
132  out++;
133  }
134  }, begin, end, out, i, elemperthr, more_inputs...);
135  //End task
136  }
137 
138  //Map main thread
139  auto end = first + elemperthr;
140  while(first!=end) {
141  *first_out = transf_op(*first, *more_inputs ...);
142  advance_iterators(more_inputs ...);
143  first++;
144  first_out++;
145  }
146 
147  //Join threads
148  for(int i=0;i<ex.concurrency_degree()-1;i++) {
149  tasks[i].join();
150  }
151 }
152 
157 }
158 
159 #endif
Definition: callable_traits.h:24
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
void advance_iterators(size_t delta, InputIt &...in)
Definition: iterator.h:29
void map(parallel_execution_native &ex, InputIt first, InputIt last, OutputIt first_out, Transformer &&transf_op)
Invoke Map pattern on a data sequence with native paralell execution.
Definition: native/map.h:50