16 #ifndef GRPPI_FF_PARALLEL_EXECUTION_FF_H
17 #define GRPPI_FF_PARALLEL_EXECUTION_FF_H
23 #include "../common/iterator.h"
24 #include "../common/execution_traits.h"
26 #include <type_traits>
29 #include <experimental/optional>
31 #include <ff/parallel_for.hpp>
41 class parallel_execution_ff {
51 parallel_execution_ff() noexcept :
52 parallel_execution_ff{
53 static_cast<int>(std::thread::hardware_concurrency())}
64 parallel_execution_ff(
int concurrency_degree,
bool order =
true) noexcept :
65 concurrency_degree_{concurrency_degree},
73 void set_concurrency_degree(
int degree) noexcept {
74 concurrency_degree_ = degree;
80 int concurrency_degree() const noexcept {
81 return concurrency_degree_;
87 void enable_ordering() noexcept { ordering_=
true; }
92 void disable_ordering() noexcept { ordering_=
false; }
97 bool is_ordered() const noexcept {
return ordering_; }
113 template <
typename ... InputIterators,
typename OutputIterator,
114 typename Transformer>
115 void map(std::tuple<InputIterators...> firsts,
116 OutputIterator first_out,
117 std::size_t sequence_size, Transformer transform_op)
const;
132 template <
typename InputIterator,
typename Identity,
typename Combiner>
133 auto reduce(InputIterator first,
134 std::size_t sequence_size,
135 Identity && identity,
136 Combiner && combine_op)
const;
152 template <
typename ... InputIterators,
typename Identity,
153 typename Transformer,
typename Combiner>
154 auto map_reduce(std::tuple<InputIterators...> firsts,
155 std::size_t sequence_size,
156 Identity && identity,
157 Transformer && transform_op,
158 Combiner && combine_op)
const;
174 template <
typename ... InputIterators,
typename OutputIterator,
175 typename StencilTransformer,
typename Neighbourhood>
176 void stencil(std::tuple<InputIterators...> firsts,
177 OutputIterator first_out,
178 std::size_t sequence_size,
179 StencilTransformer && transform_op,
180 Neighbourhood && neighbour_op)
const;
189 template <
typename Generator,
typename ... Transformers>
190 void pipeline(Generator && generate_op,
191 Transformers && ... transform_op)
const;
204 template <
typename InputType,
typename Transformer,
typename OutputType>
205 void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
206 mpmc_queue<OutputType> & output_queue)
const
208 ::std::atomic<long> order {0};
211 auto item = input_queue.pop();
212 if(!item.first) input_queue.push(item);
215 std::forward<Transformer>(transform_op),
217 output_queue.push(make_pair(
typename OutputType::first_type{item}, order.load()));
221 output_queue.push(make_pair(
typename OutputType::first_type{}, order.load()));
235 template <
typename Input,
typename Divider,
typename Predicate,
236 typename Solver,
typename Combiner>
238 Divider && divide_op,
239 Predicate && condition_op,
241 Combiner && combine_op)
const;
245 int concurrency_degree_ =
246 static_cast<int>(std::thread::hardware_concurrency());
247 bool ordering_ =
true;
254 template <
typename E>
256 return std::is_same<E, parallel_execution_ff>::value;
264 constexpr
bool is_supported<parallel_execution_ff>() {
return true; }
271 constexpr
bool supports_map<parallel_execution_ff>() {
return true; }
278 constexpr
bool supports_reduce<parallel_execution_ff>() {
return true; }
285 constexpr
bool supports_map_reduce<parallel_execution_ff>() {
return true; }
292 constexpr
bool supports_stencil<parallel_execution_ff>() {
return true; }
299 constexpr
bool supports_divide_conquer<parallel_execution_ff>() {
return true; }
306 constexpr
bool supports_pipeline<parallel_execution_ff>() {
return true; }
309 template <
typename ... InputIterators,
typename OutputIterator,
310 typename Transformer>
312 std::tuple<InputIterators...> firsts,
313 OutputIterator first_out,
314 std::size_t sequence_size, Transformer transform_op)
const
316 ff::ParallelFor pf{concurrency_degree_,
true};
317 pf.parallel_for(0, sequence_size,
318 [=](
const long delta) {
321 concurrency_degree_);
324 template <
typename InputIterator,
typename Identity,
typename Combiner>
326 std::size_t sequence_size,
327 Identity && identity,
328 Combiner && combine_op)
const
330 ff::ParallelForReduce<Identity> pfr{concurrency_degree_,
true};
331 Identity result{identity};
333 pfr.parallel_reduce(result, identity, 0, sequence_size,
334 [combine_op,first](
long delta,
auto & value) {
335 value = combine_op(value, *std::next(first,delta));
337 [&result, combine_op](
auto a,
auto b) { result = combine_op(a,b); },
338 concurrency_degree_);
343 template <
typename ... InputIterators,
typename Identity,
344 typename Transformer,
typename Combiner>
346 std::size_t sequence_size,
347 Identity && identity,
348 Transformer && transform_op,
349 Combiner && combine_op)
const
351 std::vector<Identity> partial_outs(sequence_size);
352 map(firsts, partial_outs.begin(), sequence_size,
353 std::forward<Transformer>(transform_op));
355 return reduce(partial_outs.begin(), sequence_size,
356 std::forward<Identity>(identity),
357 std::forward<Combiner>(combine_op));
360 template <
typename ... InputIterators,
typename OutputIterator,
361 typename StencilTransformer,
typename Neighbourhood>
363 OutputIterator first_out,
364 std::size_t sequence_size,
365 StencilTransformer && transform_op,
366 Neighbourhood && neighbour_op)
const
368 ff::ParallelFor pf(concurrency_degree_,
true);
369 pf.parallel_for(0, sequence_size,
371 const auto first_it = std::get<0>(firsts);
373 *std::next(first_out,delta) = transform_op(std::next(first_it,delta),
376 concurrency_degree_);
379 template <
typename Generator,
typename ... Transformers>
381 Generator && generate_op,
382 Transformers && ... transform_ops)
const
384 detail_ff::pipeline_impl pipe{
387 std::forward<Generator>(generate_op),
388 std::forward<Transformers>(transform_ops)...};
390 pipe.setFixedSize(
false);
391 pipe.run_and_wait_end();
394 template <
typename Input,
typename Divider,
typename Predicate,
395 typename Solver,
typename Combiner>
397 Divider && divide_op,
398 Predicate && condition_op,
400 Combiner && combine_op)
const
402 using output_type =
typename std::result_of<Solver(Input)>::type;
405 auto divide_fn = [&](
const Input &in, std::vector<Input> &subin) {
406 subin = divide_op(in);
409 auto combine_fn = [&] (std::vector<output_type>& in, output_type& out) {
410 using index_t =
typename std::vector<output_type>::size_type;
412 for(index_t i = 1; i < in.size(); ++i)
413 out = combine_op(out, in[i]);
416 auto seq_fn = [&] (
const Input & in , output_type & out) {
420 auto cond_fn = [&] (
const Input &in) {
421 return condition_op(in);
423 output_type out_var{};
425 using dac_t = ff::ff_DC<Input,output_type>;
426 auto ncores =
static_cast<int>(std::thread::hardware_concurrency());
427 int max_nworkers = std::max(concurrency_degree_, ncores);
428 dac_t dac(divide_fn, combine_fn, seq_fn, cond_fn,
431 dac_t::DEFAULT_OUTSTANDING_TASKS, max_nworkers
435 dac.run_and_wait_end();
456 template <
typename E>
auto divide_conquer(const Execution &ex, Input &&input, Divider &÷r_op, Solver &&solver_op, Combiner &&combiner_op)
Invoke md_divide-conquer. \parapm Execution Execution type.
Definition: divideconquer.h:49
void map(const Execution &ex, std::tuple< InputIterators... > firsts, InputIt last, OutputIt first_out, Transformer transform_op)
Invoke Map pattern on a data sequence.
Definition: map.h:51
auto map_reduce(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke md_map-reduce on a data sequence.
Definition: mapreduce.h:52
void pipeline(const Execution &ex, Generator &&generate_op, Transformers &&... transform_ops)
Invoke Pipeline pattern on a data stream.
Definition: pipeline.h:46
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:50
void stencil(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, OutputIt out, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op)
Invoke Stencil pattern on a data sequence with sequential execution.
Definition: stencil.h:54
Definition: callable_traits.h:21
decltype(auto) apply_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the iterators in a tuple like-object and the increments those iterators....
Definition: iterator.h:101
constexpr bool is_parallel_execution_ff()
Metafunction that determines if type E is parallel_execution_ff This metafunction evaluates to false ...
Definition: parallel_execution_ff.h:457
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:170
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing....
Definition: iterator.h:142
Definition: parallel_execution_ff.h:449