GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
parallel_execution_ff.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_FF_PARALLEL_EXECUTION_FF_H
22 #define GRPPI_FF_PARALLEL_EXECUTION_FF_H
23 
24 #ifdef GRPPI_FF
25 
26 #include "detail/pipeline_impl.h"
27 
28 #include "../common/iterator.h"
29 #include "../common/execution_traits.h"
30 
31 #include <type_traits>
32 #include <tuple>
33 #include <thread>
34 #include <experimental/optional>
35 
36 #include <ff/parallel_for.hpp>
37 #include <ff/dc.hpp>
38 
39 namespace grppi {
40 
46 class parallel_execution_ff {
47 
48 public:
56  parallel_execution_ff() noexcept :
57  parallel_execution_ff{
58  static_cast<int>(std::thread::hardware_concurrency())}
59  {}
60 
69  parallel_execution_ff(int concurrency_degree, bool order = true) noexcept :
70  concurrency_degree_{concurrency_degree},
71  ordering_{order}
72  {
73  }
74 
78  void set_concurrency_degree(int degree) noexcept {
79  concurrency_degree_ = degree;
80  }
81 
85  int concurrency_degree() const noexcept {
86  return concurrency_degree_;
87  }
88 
92  void enable_ordering() noexcept { ordering_=true; }
93 
97  void disable_ordering() noexcept { ordering_=false; }
98 
102  bool is_ordered() const noexcept { return ordering_; }
103 
118  template <typename ... InputIterators, typename OutputIterator,
119  typename Transformer>
120  void map(std::tuple<InputIterators...> firsts,
121  OutputIterator first_out,
122  std::size_t sequence_size, Transformer transform_op) const;
123 
137  template <typename InputIterator, typename Identity, typename Combiner>
138  auto reduce(InputIterator first,
139  std::size_t sequence_size,
140  Identity && identity,
141  Combiner && combine_op) const;
142 
157  template <typename ... InputIterators, typename Identity,
158  typename Transformer, typename Combiner>
159  auto map_reduce(std::tuple<InputIterators...> firsts,
160  std::size_t sequence_size,
161  Identity && identity,
162  Transformer && transform_op,
163  Combiner && combine_op) const;
164 
179  template <typename ... InputIterators, typename OutputIterator,
180  typename StencilTransformer, typename Neighbourhood>
181  void stencil(std::tuple<InputIterators...> firsts,
182  OutputIterator first_out,
183  std::size_t sequence_size,
184  StencilTransformer && transform_op,
185  Neighbourhood && neighbour_op) const;
186 
194  template <typename Generator, typename ... Transformers>
195  void pipeline(Generator && generate_op,
196  Transformers && ... transform_op) const;
197 
198 
209  template <typename InputType, typename Transformer, typename OutputType>
210  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
211  mpmc_queue<OutputType> & output_queue) const
212  {
213  ::std::atomic<long> order {0};
214  pipeline(
215  [&](){
216  auto item = input_queue.pop();
217  if(!item.first) input_queue.push(item);
218  return item.first;
219  },
220  std::forward<Transformer>(transform_op),
221  [&](auto & item ){
222  output_queue.push(make_pair(typename OutputType::first_type{item}, order.load()));
223  order++;
224  }
225  );
226  output_queue.push(make_pair(typename OutputType::first_type{}, order.load()));
227  }
228 
240  template <typename Input, typename Divider,typename Predicate,
241  typename Solver, typename Combiner>
242  auto divide_conquer(Input & input,
243  Divider && divide_op,
244  Predicate && condition_op,
245  Solver && solve_op,
246  Combiner && combine_op) const;
247 
248 private:
249 
250  int concurrency_degree_ =
251  static_cast<int>(std::thread::hardware_concurrency());
252  bool ordering_ = true;
253 };
254 
259 template <typename E>
260 constexpr bool is_parallel_execution_ff() {
261  return std::is_same<E, parallel_execution_ff>::value;
262 }
263 
268 template <>
269 constexpr bool is_supported<parallel_execution_ff>() { return true; }
270 
275 template <>
276 constexpr bool supports_map<parallel_execution_ff>() { return true; }
277 
282 template <>
283 constexpr bool supports_reduce<parallel_execution_ff>() { return true; }
284 
289 template <>
290 constexpr bool supports_map_reduce<parallel_execution_ff>() { return true; }
291 
296 template <>
297 constexpr bool supports_stencil<parallel_execution_ff>() { return true; }
298 
299 /*
300 \brief Determines if an execution policy supports the divide_conquer pattern.
301 \note Specialization for parallel_execution_ff when GRPPI_FF is enabled.
302 */
303 template <>
304 constexpr bool supports_divide_conquer<parallel_execution_ff>() { return true; }
305 
310 template <>
311 constexpr bool supports_pipeline<parallel_execution_ff>() { return true; }
312 
313 
314 template <typename ... InputIterators, typename OutputIterator,
315  typename Transformer>
317  std::tuple<InputIterators...> firsts,
318  OutputIterator first_out,
319  std::size_t sequence_size, Transformer transform_op) const
320 {
321  ff::ParallelFor pf{concurrency_degree_, true};
322  pf.parallel_for(0, sequence_size,
323  [=](const long delta) {
324  *std::next(first_out, delta) = apply_iterators_indexed(transform_op, firsts, delta);
325  },
326  concurrency_degree_);
327 }
328 
329 template <typename InputIterator, typename Identity, typename Combiner>
330 auto parallel_execution_ff::reduce(InputIterator first,
331  std::size_t sequence_size,
332  Identity && identity,
333  Combiner && combine_op) const
334 {
335  ff::ParallelForReduce<Identity> pfr{concurrency_degree_, true};
336  Identity result{identity};
337 
338  pfr.parallel_reduce(result, identity, 0, sequence_size,
339  [combine_op,first](long delta, auto & value) {
340  value = combine_op(value, *std::next(first,delta));
341  },
342  [&result, combine_op](auto a, auto b) { result = combine_op(a,b); },
343  concurrency_degree_);
344 
345  return result;
346 }
347 
348 template <typename ... InputIterators, typename Identity,
349  typename Transformer, typename Combiner>
350 auto parallel_execution_ff::map_reduce(std::tuple<InputIterators...> firsts,
351  std::size_t sequence_size,
352  Identity && identity,
353  Transformer && transform_op,
354  Combiner && combine_op) const
355 {
356  std::vector<Identity> partial_outs(sequence_size);
357  map(firsts, partial_outs.begin(), sequence_size,
358  std::forward<Transformer>(transform_op));
359 
360  return reduce(partial_outs.begin(), sequence_size,
361  std::forward<Identity>(identity),
362  std::forward<Combiner>(combine_op));
363 }
364 
365 template <typename ... InputIterators, typename OutputIterator,
366  typename StencilTransformer, typename Neighbourhood>
367 void parallel_execution_ff::stencil(std::tuple<InputIterators...> firsts,
368  OutputIterator first_out,
369  std::size_t sequence_size,
370  StencilTransformer && transform_op,
371  Neighbourhood && neighbour_op) const
372 {
373  ff::ParallelFor pf(concurrency_degree_, true);
374  pf.parallel_for(0, sequence_size,
375  [&](long delta) {
376  const auto first_it = std::get<0>(firsts);
377  auto next_chunks = iterators_next(firsts, delta);
378  *std::next(first_out,delta) = transform_op(std::next(first_it,delta),
379  apply_increment(neighbour_op, next_chunks) );
380  },
381  concurrency_degree_);
382 }
383 
384 template <typename Generator, typename ... Transformers>
386  Generator && generate_op,
387  Transformers && ... transform_ops) const
388 {
389  detail_ff::pipeline_impl pipe{
390  concurrency_degree_,
391  ordering_,
392  std::forward<Generator>(generate_op),
393  std::forward<Transformers>(transform_ops)...};
394 
395  pipe.setFixedSize(false);
396  pipe.run_and_wait_end();
397 }
398 
399 template <typename Input, typename Divider,typename Predicate,
400  typename Solver, typename Combiner>
401 auto parallel_execution_ff::divide_conquer(Input & input,
402  Divider && divide_op,
403  Predicate && condition_op,
404  Solver && solve_op,
405  Combiner && combine_op) const
406 {
407  using output_type = typename std::result_of<Solver(Input)>::type;
408 
409  // divide
410  auto divide_fn = [&](const Input &in, std::vector<Input> &subin) {
411  subin = divide_op(in);
412  };
413  // combine
414  auto combine_fn = [&] (std::vector<output_type>& in, output_type& out) {
415  using index_t = typename std::vector<output_type>::size_type;
416  out = in[0];
417  for(index_t i = 1; i < in.size(); ++i)
418  out = combine_op(out, in[i]);
419  };
420  // sequential solver (base-case)
421  auto seq_fn = [&] (const Input & in , output_type & out) {
422  out = solve_op(in);
423  };
424  // condition
425  auto cond_fn = [&] (const Input &in) {
426  return condition_op(in);
427  };
428  output_type out_var{};
429 
430  using dac_t = ff::ff_DC<Input,output_type>;
431  auto ncores = static_cast<int>(std::thread::hardware_concurrency());
432  int max_nworkers = std::max(concurrency_degree_, ncores);
433  dac_t dac(divide_fn, combine_fn, seq_fn, cond_fn, //kernel functions
434  input, out_var, //input/output variables
435  concurrency_degree_, //parallelism degree
436  dac_t::DEFAULT_OUTSTANDING_TASKS, max_nworkers //ff-specific params
437  );
438 
439  // run
440  dac.run_and_wait_end();
441 
442  return out_var;
443 }
444 
445 } // end namespace grppi
446 
447 #else // GRPPI_FF undefined
448 
449 namespace grppi {
450 
451 
455 
461 template <typename E>
462 constexpr bool is_parallel_execution_ff() {
463  return false;
464 }
465 
466 }
467 
468 #endif // GRPPI_FF
469 
470 #endif
Definition: callable_traits.h:26
void pipeline(const Execution &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream.
Definition: pipeline.h:51
void stencil(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, OutputIt out, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op)
Invoke Stencil pattern on a data sequence with sequential execution.
Definition: stencil.h:59
constexpr bool is_parallel_execution_ff()
Metafunction that determines if type E is parallel_execution_ff This metafunction evaluates to false ...
Definition: parallel_execution_ff.h:462
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:55
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:175
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing...
Definition: iterator.h:147
Definition: parallel_execution_ff.h:454
auto map_reduce(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke Map/reduce pattern on a data sequence.
Definition: mapreduce.h:57
auto divide_conquer(const Execution &ex, Input &&input, Divider &&divider_op, Solver &&solver_op, Combiner &&combiner_op)
Invoke Divide/conquer pattern. Execution Execution type.
Definition: divideconquer.h:53
decltype(auto) apply_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the iterators in a tuple like-object and the increments those iterators...
Definition: iterator.h:106
void map(const Execution &ex, std::tuple< InputIterators... > firsts, InputIt last, OutputIt first_out, Transformer transform_op)
Invoke Map pattern on a data sequence.
Definition: map.h:56