GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
parallel_execution_ff.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_FF_PARALLEL_EXECUTION_FF_H
17 #define GRPPI_FF_PARALLEL_EXECUTION_FF_H
18 
19 #ifdef GRPPI_FF
20 
21 #include "detail/pipeline_impl.h"
22 
23 #include "../common/iterator.h"
24 #include "../common/execution_traits.h"
25 
26 #include <type_traits>
27 #include <tuple>
28 #include <thread>
29 #include <experimental/optional>
30 
31 #include <ff/parallel_for.hpp>
32 #include <ff/dc.hpp>
33 
34 namespace grppi {
35 
41 class parallel_execution_ff {
42 
43 public:
51  parallel_execution_ff() noexcept :
52  parallel_execution_ff{
53  static_cast<int>(std::thread::hardware_concurrency())}
54  {}
55 
64  parallel_execution_ff(int concurrency_degree, bool order = true) noexcept :
65  concurrency_degree_{concurrency_degree},
66  ordering_{order}
67  {
68  }
69 
73  void set_concurrency_degree(int degree) noexcept {
74  concurrency_degree_ = degree;
75  }
76 
80  int concurrency_degree() const noexcept {
81  return concurrency_degree_;
82  }
83 
87  void enable_ordering() noexcept { ordering_=true; }
88 
92  void disable_ordering() noexcept { ordering_=false; }
93 
97  bool is_ordered() const noexcept { return ordering_; }
98 
113  template <typename ... InputIterators, typename OutputIterator,
114  typename Transformer>
115  void map(std::tuple<InputIterators...> firsts,
116  OutputIterator first_out,
117  std::size_t sequence_size, Transformer transform_op) const;
118 
132  template <typename InputIterator, typename Identity, typename Combiner>
133  auto reduce(InputIterator first,
134  std::size_t sequence_size,
135  Identity && identity,
136  Combiner && combine_op) const;
137 
152  template <typename ... InputIterators, typename Identity,
153  typename Transformer, typename Combiner>
154  auto map_reduce(std::tuple<InputIterators...> firsts,
155  std::size_t sequence_size,
156  Identity && identity,
157  Transformer && transform_op,
158  Combiner && combine_op) const;
159 
174  template <typename ... InputIterators, typename OutputIterator,
175  typename StencilTransformer, typename Neighbourhood>
176  void stencil(std::tuple<InputIterators...> firsts,
177  OutputIterator first_out,
178  std::size_t sequence_size,
179  StencilTransformer && transform_op,
180  Neighbourhood && neighbour_op) const;
181 
189  template <typename Generator, typename ... Transformers>
190  void pipeline(Generator && generate_op,
191  Transformers && ... transform_op) const;
192 
193 
204  template <typename InputType, typename Transformer, typename OutputType>
205  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
206  mpmc_queue<OutputType> & output_queue) const
207  {
208  ::std::atomic<long> order {0};
209  pipeline(
210  [&](){
211  auto item = input_queue.pop();
212  if(!item.first) input_queue.push(item);
213  return item.first;
214  },
215  std::forward<Transformer>(transform_op),
216  [&](auto & item ){
217  output_queue.push(make_pair(typename OutputType::first_type{item}, order.load()));
218  order++;
219  }
220  );
221  output_queue.push(make_pair(typename OutputType::first_type{}, order.load()));
222  }
223 
235  template <typename Input, typename Divider,typename Predicate,
236  typename Solver, typename Combiner>
237  auto divide_conquer(Input & input,
238  Divider && divide_op,
239  Predicate && condition_op,
240  Solver && solve_op,
241  Combiner && combine_op) const;
242 
243 private:
244 
245  int concurrency_degree_ =
246  static_cast<int>(std::thread::hardware_concurrency());
247  bool ordering_ = true;
248 };
249 
254 template <typename E>
255 constexpr bool is_parallel_execution_ff() {
256  return std::is_same<E, parallel_execution_ff>::value;
257 }
258 
263 template <>
264 constexpr bool is_supported<parallel_execution_ff>() { return true; }
265 
270 template <>
271 constexpr bool supports_map<parallel_execution_ff>() { return true; }
272 
277 template <>
278 constexpr bool supports_reduce<parallel_execution_ff>() { return true; }
279 
284 template <>
285 constexpr bool supports_map_reduce<parallel_execution_ff>() { return true; }
286 
291 template <>
292 constexpr bool supports_stencil<parallel_execution_ff>() { return true; }
293 
294 /*
295 \brief Determines if an execution policy supports the divide_conquer pattern.
296 \note Specialization for parallel_execution_ff when GRPPI_FF is enabled.
297 */
298 template <>
299 constexpr bool supports_divide_conquer<parallel_execution_ff>() { return true; }
300 
305 template <>
306 constexpr bool supports_pipeline<parallel_execution_ff>() { return true; }
307 
308 
309 template <typename ... InputIterators, typename OutputIterator,
310  typename Transformer>
312  std::tuple<InputIterators...> firsts,
313  OutputIterator first_out,
314  std::size_t sequence_size, Transformer transform_op) const
315 {
316  ff::ParallelFor pf{concurrency_degree_, true};
317  pf.parallel_for(0, sequence_size,
318  [=](const long delta) {
319  *std::next(first_out, delta) = apply_iterators_indexed(transform_op, firsts, delta);
320  },
321  concurrency_degree_);
322 }
323 
324 template <typename InputIterator, typename Identity, typename Combiner>
325 auto parallel_execution_ff::reduce(InputIterator first,
326  std::size_t sequence_size,
327  Identity && identity,
328  Combiner && combine_op) const
329 {
330  ff::ParallelForReduce<Identity> pfr{concurrency_degree_, true};
331  Identity result{identity};
332 
333  pfr.parallel_reduce(result, identity, 0, sequence_size,
334  [combine_op,first](long delta, auto & value) {
335  value = combine_op(value, *std::next(first,delta));
336  },
337  [&result, combine_op](auto a, auto b) { result = combine_op(a,b); },
338  concurrency_degree_);
339 
340  return result;
341 }
342 
343 template <typename ... InputIterators, typename Identity,
344  typename Transformer, typename Combiner>
345 auto parallel_execution_ff::map_reduce(std::tuple<InputIterators...> firsts,
346  std::size_t sequence_size,
347  Identity && identity,
348  Transformer && transform_op,
349  Combiner && combine_op) const
350 {
351  std::vector<Identity> partial_outs(sequence_size);
352  map(firsts, partial_outs.begin(), sequence_size,
353  std::forward<Transformer>(transform_op));
354 
355  return reduce(partial_outs.begin(), sequence_size,
356  std::forward<Identity>(identity),
357  std::forward<Combiner>(combine_op));
358 }
359 
360 template <typename ... InputIterators, typename OutputIterator,
361  typename StencilTransformer, typename Neighbourhood>
362 void parallel_execution_ff::stencil(std::tuple<InputIterators...> firsts,
363  OutputIterator first_out,
364  std::size_t sequence_size,
365  StencilTransformer && transform_op,
366  Neighbourhood && neighbour_op) const
367 {
368  ff::ParallelFor pf(concurrency_degree_, true);
369  pf.parallel_for(0, sequence_size,
370  [&](long delta) {
371  const auto first_it = std::get<0>(firsts);
372  auto next_chunks = iterators_next(firsts, delta);
373  *std::next(first_out,delta) = transform_op(std::next(first_it,delta),
374  apply_increment(neighbour_op, next_chunks) );
375  },
376  concurrency_degree_);
377 }
378 
379 template <typename Generator, typename ... Transformers>
381  Generator && generate_op,
382  Transformers && ... transform_ops) const
383 {
384  detail_ff::pipeline_impl pipe{
385  concurrency_degree_,
386  ordering_,
387  std::forward<Generator>(generate_op),
388  std::forward<Transformers>(transform_ops)...};
389 
390  pipe.setFixedSize(false);
391  pipe.run_and_wait_end();
392 }
393 
394 template <typename Input, typename Divider,typename Predicate,
395  typename Solver, typename Combiner>
396 auto parallel_execution_ff::divide_conquer(Input & input,
397  Divider && divide_op,
398  Predicate && condition_op,
399  Solver && solve_op,
400  Combiner && combine_op) const
401 {
402  using output_type = typename std::result_of<Solver(Input)>::type;
403 
404  // divide
405  auto divide_fn = [&](const Input &in, std::vector<Input> &subin) {
406  subin = divide_op(in);
407  };
408  // combine
409  auto combine_fn = [&] (std::vector<output_type>& in, output_type& out) {
410  using index_t = typename std::vector<output_type>::size_type;
411  out = in[0];
412  for(index_t i = 1; i < in.size(); ++i)
413  out = combine_op(out, in[i]);
414  };
415  // sequential solver (base-case)
416  auto seq_fn = [&] (const Input & in , output_type & out) {
417  out = solve_op(in);
418  };
419  // condition
420  auto cond_fn = [&] (const Input &in) {
421  return condition_op(in);
422  };
423  output_type out_var{};
424 
425  using dac_t = ff::ff_DC<Input,output_type>;
426  auto ncores = static_cast<int>(std::thread::hardware_concurrency());
427  int max_nworkers = std::max(concurrency_degree_, ncores);
428  dac_t dac(divide_fn, combine_fn, seq_fn, cond_fn, //kernel functions
429  input, out_var, //input/output variables
430  concurrency_degree_, //parallelism degree
431  dac_t::DEFAULT_OUTSTANDING_TASKS, max_nworkers //ff-specific params
432  );
433 
434  // run
435  dac.run_and_wait_end();
436 
437  return out_var;
438 }
439 
440 } // end namespace grppi
441 
442 #else // GRPPI_FF undefined
443 
444 namespace grppi {
445 
446 
450 
456 template <typename E>
457 constexpr bool is_parallel_execution_ff() {
458  return false;
459 }
460 
461 }
462 
463 #endif // GRPPI_FF
464 
465 #endif
auto divide_conquer(const Execution &ex, Input &&input, Divider &&divider_op, Solver &&solver_op, Combiner &&combiner_op)
Invoke md_divide-conquer. \parapm Execution Execution type.
Definition: divideconquer.h:49
void map(const Execution &ex, std::tuple< InputIterators... > firsts, InputIt last, OutputIt first_out, Transformer transform_op)
Invoke Map pattern on a data sequence.
Definition: map.h:51
auto map_reduce(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke md_map-reduce on a data sequence.
Definition: mapreduce.h:52
void pipeline(const Execution &ex, Generator &&generate_op, Transformers &&... transform_ops)
Invoke Pipeline pattern on a data stream.
Definition: pipeline.h:46
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:50
void stencil(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, OutputIt out, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op)
Invoke Stencil pattern on a data sequence with sequential execution.
Definition: stencil.h:54
Definition: callable_traits.h:21
decltype(auto) apply_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the iterators in a tuple like-object and the increments those iterators....
Definition: iterator.h:101
constexpr bool is_parallel_execution_ff()
Metafunction that determines if type E is parallel_execution_ff This metafunction evaluates to false ...
Definition: parallel_execution_ff.h:457
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:170
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing....
Definition: iterator.h:142
Definition: parallel_execution_ff.h:449