21 #ifndef GRPPI_FF_PARALLEL_EXECUTION_FF_H 22 #define GRPPI_FF_PARALLEL_EXECUTION_FF_H 28 #include "../common/iterator.h" 29 #include "../common/execution_traits.h" 31 #include <type_traits> 34 #include <experimental/optional> 36 #include <ff/parallel_for.hpp> 46 class parallel_execution_ff {
56 parallel_execution_ff() noexcept :
57 parallel_execution_ff{
58 static_cast<int>(std::thread::hardware_concurrency())}
69 parallel_execution_ff(
int concurrency_degree,
bool order =
true) noexcept :
70 concurrency_degree_{concurrency_degree},
78 void set_concurrency_degree(
int degree) noexcept {
79 concurrency_degree_ = degree;
85 int concurrency_degree() const noexcept {
86 return concurrency_degree_;
92 void enable_ordering() noexcept { ordering_=
true; }
97 void disable_ordering() noexcept { ordering_=
false; }
102 bool is_ordered() const noexcept {
return ordering_; }
118 template <
typename ... InputIterators,
typename OutputIterator,
119 typename Transformer>
120 void map(std::tuple<InputIterators...> firsts,
121 OutputIterator first_out,
122 std::size_t sequence_size, Transformer transform_op)
const;
137 template <
typename InputIterator,
typename Identity,
typename Combiner>
138 auto reduce(InputIterator first,
139 std::size_t sequence_size,
140 Identity && identity,
141 Combiner && combine_op)
const;
157 template <
typename ... InputIterators,
typename Identity,
158 typename Transformer,
typename Combiner>
159 auto map_reduce(std::tuple<InputIterators...> firsts,
160 std::size_t sequence_size,
161 Identity && identity,
162 Transformer && transform_op,
163 Combiner && combine_op)
const;
179 template <
typename ... InputIterators,
typename OutputIterator,
180 typename StencilTransformer,
typename Neighbourhood>
181 void stencil(std::tuple<InputIterators...> firsts,
182 OutputIterator first_out,
183 std::size_t sequence_size,
184 StencilTransformer && transform_op,
185 Neighbourhood && neighbour_op)
const;
194 template <
typename Generator,
typename ... Transformers>
195 void pipeline(Generator && generate_op,
196 Transformers && ... transform_op)
const;
209 template <
typename InputType,
typename Transformer,
typename OutputType>
210 void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
211 mpmc_queue<OutputType> & output_queue)
const 213 ::std::atomic<long> order {0};
216 auto item = input_queue.pop();
217 if(!item.first) input_queue.push(item);
220 std::forward<Transformer>(transform_op),
222 output_queue.push(make_pair(
typename OutputType::first_type{item}, order.load()));
226 output_queue.push(make_pair(
typename OutputType::first_type{}, order.load()));
240 template <
typename Input,
typename Divider,
typename Predicate,
241 typename Solver,
typename Combiner>
243 Divider && divide_op,
244 Predicate && condition_op,
246 Combiner && combine_op)
const;
250 int concurrency_degree_ =
251 static_cast<int>(std::thread::hardware_concurrency());
252 bool ordering_ =
true;
259 template <
typename E>
261 return std::is_same<E, parallel_execution_ff>::value;
269 constexpr
bool is_supported<parallel_execution_ff>() {
return true; }
276 constexpr
bool supports_map<parallel_execution_ff>() {
return true; }
283 constexpr
bool supports_reduce<parallel_execution_ff>() {
return true; }
290 constexpr
bool supports_map_reduce<parallel_execution_ff>() {
return true; }
297 constexpr
bool supports_stencil<parallel_execution_ff>() {
return true; }
304 constexpr
bool supports_divide_conquer<parallel_execution_ff>() {
return true; }
311 constexpr
bool supports_pipeline<parallel_execution_ff>() {
return true; }
314 template <
typename ... InputIterators,
typename OutputIterator,
315 typename Transformer>
317 std::tuple<InputIterators...> firsts,
318 OutputIterator first_out,
319 std::size_t sequence_size, Transformer transform_op)
const 321 ff::ParallelFor pf{concurrency_degree_,
true};
322 pf.parallel_for(0, sequence_size,
323 [=](
const long delta) {
326 concurrency_degree_);
329 template <
typename InputIterator,
typename Identity,
typename Combiner>
331 std::size_t sequence_size,
332 Identity && identity,
333 Combiner && combine_op)
const 335 ff::ParallelForReduce<Identity> pfr{concurrency_degree_,
true};
336 Identity result{identity};
338 pfr.parallel_reduce(result, identity, 0, sequence_size,
339 [combine_op,first](
long delta,
auto & value) {
340 value = combine_op(value, *std::next(first,delta));
342 [&result, combine_op](
auto a,
auto b) { result = combine_op(a,b); },
343 concurrency_degree_);
348 template <
typename ... InputIterators,
typename Identity,
349 typename Transformer,
typename Combiner>
351 std::size_t sequence_size,
352 Identity && identity,
353 Transformer && transform_op,
354 Combiner && combine_op)
const 356 std::vector<Identity> partial_outs(sequence_size);
357 map(firsts, partial_outs.begin(), sequence_size,
358 std::forward<Transformer>(transform_op));
360 return reduce(partial_outs.begin(), sequence_size,
361 std::forward<Identity>(identity),
362 std::forward<Combiner>(combine_op));
365 template <
typename ... InputIterators,
typename OutputIterator,
366 typename StencilTransformer,
typename Neighbourhood>
368 OutputIterator first_out,
369 std::size_t sequence_size,
370 StencilTransformer && transform_op,
371 Neighbourhood && neighbour_op)
const 373 ff::ParallelFor pf(concurrency_degree_,
true);
374 pf.parallel_for(0, sequence_size,
376 const auto first_it = std::get<0>(firsts);
378 *std::next(first_out,delta) = transform_op(std::next(first_it,delta),
381 concurrency_degree_);
384 template <
typename Generator,
typename ... Transformers>
386 Generator && generate_op,
387 Transformers && ... transform_ops)
const 389 detail_ff::pipeline_impl pipe{
392 std::forward<Generator>(generate_op),
393 std::forward<Transformers>(transform_ops)...};
395 pipe.setFixedSize(
false);
396 pipe.run_and_wait_end();
399 template <
typename Input,
typename Divider,
typename Predicate,
400 typename Solver,
typename Combiner>
402 Divider && divide_op,
403 Predicate && condition_op,
405 Combiner && combine_op)
const 407 using output_type =
typename std::result_of<Solver(Input)>::type;
410 auto divide_fn = [&](
const Input &in, std::vector<Input> &subin) {
411 subin = divide_op(in);
414 auto combine_fn = [&] (std::vector<output_type>& in, output_type& out) {
415 using index_t =
typename std::vector<output_type>::size_type;
417 for(index_t i = 1; i < in.size(); ++i)
418 out = combine_op(out, in[i]);
421 auto seq_fn = [&] (
const Input & in , output_type & out) {
425 auto cond_fn = [&] (
const Input &in) {
426 return condition_op(in);
428 output_type out_var{};
430 using dac_t = ff::ff_DC<Input,output_type>;
431 auto ncores =
static_cast<int>(std::thread::hardware_concurrency());
432 int max_nworkers = std::max(concurrency_degree_, ncores);
433 dac_t dac(divide_fn, combine_fn, seq_fn, cond_fn,
436 dac_t::DEFAULT_OUTSTANDING_TASKS, max_nworkers
440 dac.run_and_wait_end();
447 #else // GRPPI_FF undefined 461 template <
typename E>
Definition: callable_traits.h:26
void pipeline(const Execution &ex, Generator &&generate_op, Transformers &&...transform_ops)
Invoke Pipeline pattern on a data stream.
Definition: pipeline.h:51
void stencil(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, OutputIt out, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op)
Invoke Stencil pattern on a data sequence with sequential execution.
Definition: stencil.h:59
constexpr bool is_parallel_execution_ff()
Metafunction that determines if type E is parallel_execution_ff This metafunction evaluates to false ...
Definition: parallel_execution_ff.h:462
auto reduce(const Execution &ex, InputIt first, std::size_t size, Result &&identity, Combiner &&combine_op)
Invoke Reduce pattern with identity value on a data sequence with sequential execution.
Definition: reduce.h:55
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:175
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing...
Definition: iterator.h:147
Definition: parallel_execution_ff.h:454
auto map_reduce(const Execution &ex, std::tuple< InputIterators... > firsts, std::size_t size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op)
Invoke Map/reduce pattern on a data sequence.
Definition: mapreduce.h:57
auto divide_conquer(const Execution &ex, Input &&input, Divider &÷r_op, Solver &&solver_op, Combiner &&combiner_op)
Invoke Divide/conquer pattern. Execution Execution type.
Definition: divideconquer.h:53
decltype(auto) apply_increment(F &&f, T< Iterators... > &iterators)
Applies a callable object to the iterators in a tuple like-object and the increments those iterators...
Definition: iterator.h:106
void map(const Execution &ex, std::tuple< InputIterators... > firsts, InputIt last, OutputIt first_out, Transformer transform_op)
Invoke Map pattern on a data sequence.
Definition: map.h:56