GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
parallel_execution_tbb.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_TBB_PARALLEL_EXECUTION_TBB_H
17 #define GRPPI_TBB_PARALLEL_EXECUTION_TBB_H
18 
19 #ifdef GRPPI_TBB
20 
21 #include "../common/mpmc_queue.h"
22 #include "../common/iterator.h"
23 #include "../common/patterns.h"
24 #include "../common/farm_pattern.h"
25 #include "../common/execution_traits.h"
26 
27 #include <type_traits>
28 #include <tuple>
29 
30 #include <tbb/tbb.h>
31 
32 namespace grppi {
33 
40 public:
41 
51  parallel_execution_tbb{default_concurrency_degree}
52  {}
53 
62  parallel_execution_tbb(int concurrency_degree, bool order = true) noexcept :
63  concurrency_degree_{concurrency_degree},
64  ordering_{order}
65  {}
66 
70  void set_concurrency_degree(int degree) noexcept { concurrency_degree_ = degree; }
71 
75  int concurrency_degree() const noexcept { return concurrency_degree_; }
76 
80  void enable_ordering() noexcept { ordering_=true; }
81 
85  void disable_ordering() noexcept { ordering_=false; }
86 
90  bool is_ordered() const noexcept { return ordering_; }
91 
95  void set_queue_attributes(int size, queue_mode mode, int tokens) noexcept {
96  queue_size_ = size;
97  queue_mode_ = mode;
98  num_tokens_ = tokens;
99  }
100 
106  template <typename T>
108  return {queue_size_, queue_mode_};
109  }
110 
114  int tokens() const noexcept { return num_tokens_; }
115 
130  template <typename ... InputIterators, typename OutputIterator,
131  typename Transformer>
132  void map(std::tuple<InputIterators...> firsts,
133  OutputIterator first_out,
134  std::size_t sequence_size, Transformer transform_op) const;
135 
149  template <typename InputIterator, typename Identity, typename Combiner>
150  auto reduce(InputIterator first, std::size_t sequence_size,
151  Identity && identity, Combiner && combine_op) const;
152 
167  template <typename ... InputIterators, typename Identity,
168  typename Transformer, typename Combiner>
169  auto map_reduce(std::tuple<InputIterators...> firsts,
170  std::size_t sequence_size,
171  Identity && identity,
172  Transformer && transform_op, Combiner && combine_op) const;
173 
188  template <typename ... InputIterators, typename OutputIterator,
189  typename StencilTransformer, typename Neighbourhood>
190  void stencil(std::tuple<InputIterators...> firsts, OutputIterator first_out,
191  std::size_t sequence_size,
192  StencilTransformer && transform_op,
193  Neighbourhood && neighbour_op) const;
194 
207  template <typename Input, typename Divider, typename Solver, typename Combiner>
208  [[deprecated("Use new interface with predicate argument")]]
209  auto divide_conquer(Input && input,
210  Divider && divide_op,
211  Solver && solve_op,
212  Combiner && combine_op) const;
213 
214 
229  template <typename Input, typename Divider,typename Predicate, typename Solver, typename Combiner>
230  auto divide_conquer(Input && input,
231  Divider && divide_op,
232  Predicate && predicate_op,
233  Solver && solve_op,
234  Combiner && combine_op) const;
235 
243  template <typename Generator, typename ... Transformers>
244  void pipeline(Generator && generate_op,
245  Transformers && ... transform_op) const;
246 
257  template <typename InputType, typename Transformer, typename OutputType>
258  void pipeline(mpmc_queue<InputType> & input_queue, Transformer && transform_op,
259  mpmc_queue<OutputType> & output_queue) const
260  {
261  ::std::atomic<long> order {0};
262  pipeline(
263  [&](){
264  auto item = input_queue.pop();
265  if(!item.first) input_queue.push(item);
266  return item.first;
267  },
268  std::forward<Transformer>(transform_op),
269  [&](auto & item ){
270  output_queue.push(make_pair(typename OutputType::first_type{item}, order.load()));
271  order++;
272  }
273  );
274  output_queue.push(make_pair(typename OutputType::first_type{}, order.load()));
275  //sequential_execution seq{};
276  //seq.pipeline(input_queue, std::forward<Transformer>(transform_op), output_queue);
277  }
278 
279 
280 private:
281 
282  template <typename Input, typename Divider, typename Solver, typename Combiner>
283  auto divide_conquer(Input && input,
284  Divider && divide_op,
285  Solver && solve_op,
286  Combiner && combine_op,
287  std::atomic<int> & num_threads) const;
288 
289  template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
290  auto divide_conquer(Input && input,
291  Divider && divide_op,
292  Predicate && predicate_op,
293  Solver && solve_op,
294  Combiner && combine_op,
295  std::atomic<int> & num_threads) const;
296 
297  template <typename Input, typename Transformer,
299  auto make_filter(Transformer && transform_op) const;
300 
301  template <typename Input, typename Transformer, typename ... OtherTransformers,
303  auto make_filter(Transformer && transform_op,
304  OtherTransformers && ... other_transform_ops) const;
305 
306  template <typename Input, typename FarmTransformer,
307  template <typename> class Farm,
309  auto make_filter(Farm<FarmTransformer> & farm_obj) const
310  {
311  return this->template make_filter<Input>(std::move(farm_obj));
312  }
313 
314  template <typename Input, typename FarmTransformer,
315  template <typename> class Farm,
316  requires_farm<Farm<FarmTransformer>> = 0>
317  auto make_filter(Farm<FarmTransformer> && farm_obj) const;
318 
319  template <typename Input, typename Execution, typename Transformer,
320  template <typename, typename> class Context,
321  typename ... OtherTransformers,
322  requires_context<Context<Execution,Transformer>> = 0>
323  auto make_filter(Context<Execution,Transformer> && context_op,
324  OtherTransformers &&... other_ops) const
325  {
326  return this->template make_filter<Input>(std::forward<Transformer>(context_op.transformer()),
327  std::forward<OtherTransformers>(other_ops)...);
328  }
329 
330  template <typename Input, typename Execution, typename Transformer,
331  template <typename, typename> class Context,
332  typename ... OtherTransformers,
333  requires_context<Context<Execution,Transformer>> = 0>
334  auto make_filter(Context<Execution,Transformer> & context_op,
335  OtherTransformers &&... other_ops) const
336  {
337  return this->template make_filter<Input>(std::move(context_op),
338  std::forward<OtherTransformers>(other_ops)...);
339  }
340 
341 
342  template <typename Input, typename FarmTransformer,
343  template <typename> class Farm,
344  typename ... OtherTransformers,
345  requires_farm<Farm<FarmTransformer>> = 0>
346  auto make_filter(Farm<FarmTransformer> & filter_obj,
347  OtherTransformers && ... other_transform_ops) const
348  {
349  return this->template make_filter<Input>(std::move(filter_obj),
350  std::forward<OtherTransformers>(other_transform_ops)...);
351  }
352 
353  template <typename Input, typename FarmTransformer,
354  template <typename> class Farm,
355  typename ... OtherTransformers,
356  requires_farm<Farm<FarmTransformer>> = 0>
357  auto make_filter(Farm<FarmTransformer> && filter_obj,
358  OtherTransformers && ... other_transform_ops) const;
359 
360  template <typename Input, typename Predicate,
361  template <typename> class Filter,
362  requires_filter<Filter<Predicate>> = 0>
363  auto make_filter(Filter<Predicate> & filter_obj) const
364  {
365  return this->template make_filter<Input>(std::move(filter_obj));
366  }
367 
368  template <typename Input, typename Predicate,
369  template <typename> class Filter,
370  requires_filter<Filter<Predicate>> = 0>
371  auto make_filter(Filter<Predicate> && filter_obj) const;
372 
373  template <typename Input, typename Predicate,
374  template <typename> class Filter,
375  typename ... OtherTransformers,
376  requires_filter<Filter<Predicate>> = 0>
377  auto make_filter(Filter<Predicate> & filter_obj,
378  OtherTransformers && ... other_transform_ops) const
379  {
380  return this->template make_filter<Input>(std::move(filter_obj),
381  std::forward<OtherTransformers>(other_transform_ops)...);
382  }
383 
384  template <typename Input, typename Predicate,
385  template <typename> class Filter,
386  typename ... OtherTransformers,
387  requires_filter<Filter<Predicate>> = 0>
388  auto make_filter(Filter<Predicate> && filter_obj,
389  OtherTransformers && ... other_transform_ops) const;
390 
391  template <typename Input, typename Combiner, typename Identity,
392  template <typename C, typename I> class Reduce,
393  typename ... OtherTransformers,
394  requires_reduce<Reduce<Combiner,Identity>> = 0>
395  auto make_filter(Reduce<Combiner,Identity> & reduce_obj,
396  OtherTransformers && ... other_transform_ops) const
397  {
398  return this->template make_filter<Input>(std::move(reduce_obj),
399  std::forward<OtherTransformers>(other_transform_ops)...);
400  }
401 
402  template <typename Input, typename Combiner, typename Identity,
403  template <typename C, typename I> class Reduce,
404  typename ... OtherTransformers,
405  requires_reduce<Reduce<Combiner,Identity>> = 0>
406  auto make_filter(Reduce<Combiner,Identity> && reduce_obj,
407  OtherTransformers && ... other_transform_ops) const;
408 
409  template <typename Input, typename Transformer, typename Predicate,
410  template <typename T, typename P> class Iteration,
411  typename ... OtherTransformers,
412  requires_iteration<Iteration<Transformer,Predicate>> =0,
413  requires_no_pattern<Transformer> =0>
414  auto make_filter(Iteration<Transformer,Predicate> & iteration_obj,
415  OtherTransformers && ... other_transform_ops) const
416  {
417  return this->template make_filter<Input>(std::move(iteration_obj),
418  std::forward<OtherTransformers>(other_transform_ops)...);
419  }
420 
421  template <typename Input, typename Transformer, typename Predicate,
422  template <typename T, typename P> class Iteration,
423  typename ... OtherTransformers,
424  requires_iteration<Iteration<Transformer,Predicate>> =0,
425  requires_no_pattern<Transformer> =0>
426  auto make_filter(Iteration<Transformer,Predicate> && iteration_obj,
427  OtherTransformers && ... other_transform_ops) const;
428 
429  template <typename Input, typename Transformer, typename Predicate,
430  template <typename T, typename P> class Iteration,
431  typename ... OtherTransformers,
432  requires_iteration<Iteration<Transformer,Predicate>> =0,
433  requires_pipeline<Transformer> =0>
434  auto make_filter(Iteration<Transformer,Predicate> && iteration_obj,
435  OtherTransformers && ... other_transform_ops) const;
436 
437  template <typename Input, typename ... Transformers,
438  template <typename...> class Pipeline,
439  typename ... OtherTransformers,
440  requires_pipeline<Pipeline<Transformers...>> = 0>
441  auto make_filter(Pipeline<Transformers...> & pipeline_obj,
442  OtherTransformers && ... other_transform_ops) const
443  {
444  return this->template make_filter<Input>(std::move(pipeline_obj),
445  std::forward<OtherTransformers>(other_transform_ops)...);
446  }
447 
448  template <typename Input, typename ... Transformers,
449  template <typename...> class Pipeline,
450  typename ... OtherTransformers,
451  requires_pipeline<Pipeline<Transformers...>> = 0>
452  auto make_filter(Pipeline<Transformers...> && pipeline_obj,
453  OtherTransformers && ... other_transform_ops) const;
454 
455  template <typename Input, typename ... Transformers,
456  std::size_t ... I>
457  auto make_filter_nested(std::tuple<Transformers...> && transform_ops,
458  std::index_sequence<I...>) const;
459 
460 private:
461 
462  constexpr static int default_concurrency_degree = 4;
463  int concurrency_degree_ = default_concurrency_degree;
464 
465  bool ordering_ = true;
466 
467  constexpr static int default_queue_size = 100;
468  int queue_size_ = default_queue_size;
469 
470  constexpr static int default_num_tokens_ = 100;
471  int num_tokens_ = default_num_tokens_;
472 
473  queue_mode queue_mode_ = queue_mode::blocking;
474 };
475 
480 template <typename E>
481 constexpr bool is_parallel_execution_tbb() {
482  return std::is_same<E, parallel_execution_tbb>::value;
483 }
484 
489 template <>
490 constexpr bool is_supported<parallel_execution_tbb>() { return true; }
491 
496 template <>
497 constexpr bool supports_map<parallel_execution_tbb>() { return true; }
498 
503 template <>
504 constexpr bool supports_reduce<parallel_execution_tbb>() { return true; }
505 
510 template <>
511 constexpr bool supports_map_reduce<parallel_execution_tbb>() { return true; }
512 
517 template <>
518 constexpr bool supports_stencil<parallel_execution_tbb>() { return true; }
519 
524 template <>
525 constexpr bool supports_divide_conquer<parallel_execution_tbb>() { return true; }
526 
531 template <>
532 constexpr bool supports_pipeline<parallel_execution_tbb>() { return true; }
533 
534 template <typename ... InputIterators, typename OutputIterator,
535  typename Transformer>
537  std::tuple<InputIterators...> firsts,
538  OutputIterator first_out,
539  std::size_t sequence_size, Transformer transform_op) const
540 {
541  tbb::parallel_for(
542  std::size_t{0}, sequence_size,
543  [&] (std::size_t index){
544  first_out[index] = apply_iterators_indexed(transform_op, firsts, index);
545  }
546  );
547 
548 }
549 
550 template <typename InputIterator, typename Identity, typename Combiner>
552  InputIterator first,
553  std::size_t sequence_size,
554  Identity && identity,
555  Combiner && combine_op) const
556 {
557  constexpr sequential_execution seq;
558  return tbb::parallel_reduce(
559  tbb::blocked_range<InputIterator>(first, std::next(first,sequence_size)),
560  identity,
561  [combine_op,seq](const auto & range, auto value) {
562  return seq.reduce(range.begin(), range.size(), value, combine_op);
563  },
564  combine_op);
565 }
566 
567 template <typename ... InputIterators, typename Identity,
568  typename Transformer, typename Combiner>
570  std::tuple<InputIterators...> firsts,
571  std::size_t sequence_size,
572  Identity &&,
573  Transformer && transform_op, Combiner && combine_op) const
574 {
575  constexpr sequential_execution seq;
576  tbb::task_group g;
577 
578  using result_type = std::decay_t<Identity>;
579  std::vector<result_type> partial_results(concurrency_degree_);
580 
581  auto process_chunk = [&](auto fins, std::size_t sz, std::size_t i) {
582  partial_results[i] = seq.map_reduce(fins, sz,
583  std::forward<result_type>(partial_results[i]),
584  std::forward<Transformer>(transform_op),
585  std::forward<Combiner>(combine_op));
586  };
587 
588  const auto chunk_size = sequence_size/concurrency_degree_;
589 
590  for(int i=0; i<concurrency_degree_-1;++i) {
591  const auto delta = chunk_size * i;
592  const auto chunk_firsts = iterators_next(firsts,delta);
593 
594  g.run([&, chunk_firsts, i]() {
595  process_chunk(chunk_firsts, chunk_size, i);
596  });
597  }
598 
599  const auto delta = chunk_size * (concurrency_degree_ - 1);
600  const auto chunk_firsts = iterators_next(firsts,delta);
601  process_chunk(chunk_firsts, sequence_size - delta, concurrency_degree_-1);
602 
603  g.wait();
604 
605  return seq.reduce(std::next(partial_results.begin()),
606  partial_results.size()-1, std::forward<result_type>(partial_results[0]),
607  std::forward<Combiner>(combine_op));
608 }
609 
610 template <typename ... InputIterators, typename OutputIterator,
611  typename StencilTransformer, typename Neighbourhood>
613  std::tuple<InputIterators...> firsts, OutputIterator first_out,
614  std::size_t sequence_size,
615  StencilTransformer && transform_op,
616  Neighbourhood && neighbour_op) const
617 {
618  const auto chunk_size = sequence_size / concurrency_degree_;
619  auto process_chunk = [&](auto f, std::size_t sz, std::size_t delta) {
620  constexpr sequential_execution seq{};
621  seq.stencil(f, std::next(first_out,delta), sz,
622  std::forward<StencilTransformer>(transform_op),
623  std::forward<Neighbourhood>(neighbour_op));
624  };
625 
626  tbb::task_group g;
627  for (int i=0; i<concurrency_degree_-1; ++i) {
628  g.run([=](){
629  const auto delta = chunk_size * i;
630  const auto chunk_firsts = iterators_next(firsts,delta);
631  process_chunk(chunk_firsts, chunk_size, delta);
632  });
633  }
634 
635  const auto delta = chunk_size * (concurrency_degree_ - 1);
636  const auto chunk_firsts = iterators_next(firsts,delta);
637  const auto chunk_last = std::next(std::get<0>(firsts), sequence_size);
638  process_chunk(chunk_firsts,
639  std::distance(std::get<0>(chunk_firsts), chunk_last), delta);
640 
641  g.wait();
642 }
643 
644 template <typename Input, typename Divider, typename Solver, typename Combiner>
646  Input && input,
647  Divider && divide_op,
648  Solver && solve_op,
649  Combiner && combine_op) const
650 {
651  std::atomic<int> num_threads{concurrency_degree_-1};
652  return divide_conquer(std::forward<Input>(input),
653  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
654  std::forward<Combiner>(combine_op), num_threads);
655 }
656 
657 template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
659  Input && input,
660  Divider && divide_op,
661  Predicate && predicate_op,
662  Solver && solve_op,
663  Combiner && combine_op) const
664 {
665  std::atomic<int> num_threads{concurrency_degree_-1};
666  return divide_conquer(std::forward<Input>(input),
667  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
668  std::forward<Solver>(solve_op),
669  std::forward<Combiner>(combine_op), num_threads);
670 }
671 
672 
673 template <typename Generator, typename ... Transformers>
675  Generator && generate_op,
676  Transformers && ... transform_ops) const
677 {
678  using namespace std;
679  using namespace experimental;
680 
681  using result_type = decay_t<typename result_of<Generator()>::type>;
682  using output_value_type = typename result_type::value_type;
683  using output_type = optional<output_value_type>;
684 
685  auto generator = tbb::make_filter<void, output_type>(
686  tbb::filter::serial_in_order,
687  [&](tbb::flow_control & fc) -> output_type {
688  auto item = generate_op();
689  if (item) {
690  return *item;
691  }
692  else {
693  fc.stop();
694  return {};
695  }
696  }
697  );
698 
699  auto rest =
700  this->template make_filter<output_value_type>(forward<Transformers>(transform_ops)...);
701 
702 
703  tbb::task_group_context context;
704  tbb::parallel_pipeline(tokens(),
705  generator
706  &
707  rest);
708 }
709 
710 // PRIVATE MEMBERS
711 
712 template <typename Input, typename Divider, typename Solver, typename Combiner>
714  Input && input,
715  Divider && divide_op,
716  Solver && solve_op,
717  Combiner && combine_op,
718  std::atomic<int> & num_threads) const
719 {
720  constexpr sequential_execution seq;
721 
722  if (num_threads.load()<=0) {
723  return seq.divide_conquer(std::forward<Input>(input),
724  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
725  std::forward<Combiner>(combine_op));
726  }
727 
728  auto subproblems = divide_op(std::forward<Input>(input));
729 
730  if (subproblems.size()<=1) { return solve_op(std::forward<Input>(input)); }
731 
732  using subresult_type = std::decay_t<typename std::result_of<Solver(Input)>::type>;
733  std::vector<subresult_type> partials(subproblems.size()-1);
734  int division = 0;
735 
736  tbb::task_group g;
737  auto i = subproblems.begin()+1;
738  while (i!=subproblems.end() && num_threads.load()>0) {
739  g.run([&,this,it=i++,div=division++]() {
740  partials[div] = this->divide_conquer(std::forward<Input>(*it),
741  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
742  std::forward<Combiner>(combine_op), num_threads);
743  });
744  num_threads--;
745  }
746 
747  //Main thread works on the first subproblem.
748  while (i != subproblems.end()){
749  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
750  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
751  std::forward<Combiner>(combine_op));
752  }
753 
754  auto out = divide_conquer(std::forward<Input>(*subproblems.begin()),
755  std::forward<Divider>(divide_op), std::forward<Solver>(solve_op),
756  std::forward<Combiner>(combine_op), num_threads);
757 
758  g.wait();
759 
760  return seq.reduce(partials.begin(), partials.size(),
761  std::forward<subresult_type>(out), std::forward<Combiner>(combine_op));
762 }
763 
764 template <typename Input, typename Divider, typename Predicate, typename Solver, typename Combiner>
766  Input && input,
767  Divider && divide_op,
768  Predicate && predicate_op,
769  Solver && solve_op,
770  Combiner && combine_op,
771  std::atomic<int> & num_threads) const
772 {
773  constexpr sequential_execution seq;
774 
775  if (num_threads.load()<=0) {
776  return seq.divide_conquer(std::forward<Input>(input),
777  std::forward<Divider>(divide_op),std::forward<Predicate>(predicate_op),
778  std::forward<Solver>(solve_op),
779  std::forward<Combiner>(combine_op));
780  }
781 
782 
783  if (predicate_op(input)) { return solve_op(std::forward<Input>(input)); }
784  auto subproblems = divide_op(std::forward<Input>(input));
785 
786  using subresult_type = std::decay_t<typename std::result_of<Solver(Input)>::type>;
787  std::vector<subresult_type> partials(subproblems.size()-1);
788  int division = 0;
789 
790  tbb::task_group g;
791  auto i = subproblems.begin()+1;
792  while (i!=subproblems.end() && num_threads.load()>0) {
793  g.run([&,this,it=i++,div=division++]() {
794  partials[div] = this->divide_conquer(std::forward<Input>(*it),
795  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
796  std::forward<Solver>(solve_op),
797  std::forward<Combiner>(combine_op), num_threads);
798  });
799  num_threads--;
800  }
801 
802  //Main thread works on the first subproblem.
803  while (i != subproblems.end()){
804  partials[division] = seq.divide_conquer(std::forward<Input>(*i++),
805  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op),
806  std::forward<Solver>(solve_op),
807  std::forward<Combiner>(combine_op));
808  }
809 
810  auto out = divide_conquer(std::forward<Input>(*subproblems.begin()),
811  std::forward<Divider>(divide_op), std::forward<Predicate>(predicate_op), std::forward<Solver>(solve_op),
812  std::forward<Combiner>(combine_op), num_threads);
813 
814  g.wait();
815 
816  return seq.reduce(partials.begin(), partials.size(),
817  std::forward<subresult_type>(out), std::forward<Combiner>(combine_op));
818 }
819 
820 template <typename Input, typename Transformer,
821  requires_no_pattern<Transformer>>
822 auto parallel_execution_tbb::make_filter(
823  Transformer && transform_op) const
824 {
825  using namespace std;
826  using namespace experimental;
827 
828  using input_value_type = Input;
829  using gen_value_type = optional<input_value_type>;
830 
831  return tbb::make_filter<gen_value_type, void>(
832  tbb::filter::serial_in_order,
833  [=](gen_value_type item) {
834  if (item) transform_op(*item);
835  });
836 }
837 
838 template <typename Input, typename Transformer, typename ... OtherTransformers,
839  requires_no_pattern<Transformer>>
840 auto parallel_execution_tbb::make_filter(
841  Transformer && transform_op,
842  OtherTransformers && ... other_transform_ops) const
843 {
844  using namespace std;
845  using namespace experimental;
846 
847  using input_value_type = Input;
848  static_assert(!is_void<input_value_type>::value,
849  "Transformer must take non-void argument");
850  using gen_value_type = optional<input_value_type>;
851  using output_value_type =
852  decay_t<typename result_of<Transformer(input_value_type)>::type>;
853  static_assert(!is_void<output_value_type>::value,
854  "Transformer must return a non-void result");
855  using output_type = optional<output_value_type>;
856 
857 
858  return
859  tbb::make_filter<gen_value_type, output_type>(
860  tbb::filter::serial_in_order,
861  [=](gen_value_type item) -> output_type {
862  if (item) return transform_op(*item);
863  else return {};
864  })
865  &
866  this->template make_filter<output_value_type>(
867  std::forward<OtherTransformers>(other_transform_ops)...);
868 }
869 
870 template <typename Input, typename FarmTransformer,
871  template <typename> class Farm,
872  requires_farm<Farm<FarmTransformer>>>
873 auto parallel_execution_tbb::make_filter(
874  Farm<FarmTransformer> && farm_obj) const
875 {
876  using namespace std;
877  using namespace experimental;
878 
879  using input_value_type = Input;
880  using gen_value_type = optional<input_value_type>;
881 
882  return tbb::make_filter<gen_value_type, void>(
883  tbb::filter::parallel,
884  [=](gen_value_type item) {
885  if (item) farm_obj(*item);
886  });
887 }
888 
889 template <typename Input, typename FarmTransformer,
890  template <typename> class Farm,
891  typename ... OtherTransformers,
892  requires_farm<Farm<FarmTransformer>>>
893 auto parallel_execution_tbb::make_filter(
894  Farm<FarmTransformer> && farm_obj,
895  OtherTransformers && ... other_transform_ops) const
896 {
897  using namespace std;
898  using namespace experimental;
899 
900  using input_value_type = Input;
901  static_assert(!is_void<input_value_type>::value,
902  "Farm must take non-void argument");
903  using gen_value_type = optional<input_value_type>;
904  using output_value_type = decay_t<typename result_of<FarmTransformer(input_value_type)>::type>;
905  static_assert(!is_void<output_value_type>::value,
906  "Farm must return a non-void result");
907  using output_type = optional<output_value_type>;
908 
909  return tbb::make_filter<gen_value_type, output_type>(
910  tbb::filter::parallel,
911  [&](gen_value_type item) -> output_type {
912  if (item) return farm_obj(*item);
913  else return {};
914  })
915  &
916  this->template make_filter<output_value_type>(
917  std::forward<OtherTransformers>(other_transform_ops)...);
918 }
919 
920 template <typename Input, typename Predicate,
921  template <typename> class Filter,
922  requires_filter<Filter<Predicate>>>
923 auto parallel_execution_tbb::make_filter(
924  Filter<Predicate> &&) const
925 {
926  using namespace std;
927  using namespace experimental;
928 
929  using input_value_type = Input;
930  using gen_value_type = optional<input_value_type>;
931 
932  return tbb::make_filter<gen_value_type, void>(
933  tbb::filter::parallel,
934  [=](gen_value_type item) {
935  if (item) filter_obj(*item);
936  });
937 }
938 
939 template <typename Input, typename Predicate,
940  template <typename> class Filter,
941  typename ... OtherTransformers,
942  requires_filter<Filter<Predicate>>>
943 auto parallel_execution_tbb::make_filter(
944  Filter<Predicate> && filter_obj,
945  OtherTransformers && ... other_transform_ops) const
946 {
947  using namespace std;
948  using namespace experimental;
949 
950  using input_value_type = Input;
951  static_assert(!is_void<input_value_type>::value,
952  "Filter must take non-void argument");
953  using gen_value_type = optional<input_value_type>;
954 
955  return tbb::make_filter<gen_value_type, gen_value_type>(
956  tbb::filter::parallel,
957  [&](gen_value_type item) -> gen_value_type {
958  if (item && filter_obj(*item)) return item;
959  else return {};
960  })
961  &
962  this->template make_filter<input_value_type>(
963  std::forward<OtherTransformers>(other_transform_ops)...);
964 }
965 
966 template <typename Input, typename Combiner, typename Identity,
967  template <typename C, typename I> class Reduce,
968  typename ... OtherTransformers,
969  requires_reduce<Reduce<Combiner,Identity>>>
970 auto parallel_execution_tbb::make_filter(
971  Reduce<Combiner,Identity> && reduce_obj,
972  OtherTransformers && ... other_transform_ops) const
973 {
974  using namespace std;
975  using namespace experimental;
976 
977  using input_value_type = Input;
978  using gen_value_type = optional<input_value_type>;
979 
980  return tbb::make_filter<gen_value_type, gen_value_type>(
981  tbb::filter::serial,
982  [&, it=std::vector<input_value_type>()](gen_value_type item) -> gen_value_type {
983  if (!item) return {};
984  reduce_obj.add_item(std::forward<Identity>(*item));
985  if (reduce_obj.reduction_needed()) {
986  constexpr sequential_execution seq;
987  return reduce_obj.reduce_window(seq);
988  }
989  return {};
990  })
991  &
992  this->template make_filter<input_value_type>(
993  std::forward<OtherTransformers>(other_transform_ops)...);
994 }
995 
996 template <typename Input, typename Transformer, typename Predicate,
997  template <typename T, typename P> class Iteration,
998  typename ... OtherTransformers,
999  requires_iteration<Iteration<Transformer,Predicate>>,
1000  requires_no_pattern<Transformer>>
1001 auto parallel_execution_tbb::make_filter(
1002  Iteration<Transformer,Predicate> && iteration_obj,
1003  OtherTransformers && ... other_transform_ops) const
1004 {
1005  using namespace std;
1006  using namespace experimental;
1007 
1008  using input_value_type = Input;
1009  using gen_value_type = optional<input_value_type>;
1010 
1011  return tbb::make_filter<gen_value_type, gen_value_type>(
1012  tbb::filter::serial,
1013  [&](gen_value_type item) -> gen_value_type {
1014  if (!item) return {};
1015  do {
1016  item = iteration_obj.transform(*item);
1017  } while (!iteration_obj.predicate(*item));
1018  return item;
1019  })
1020  &
1021  this->template make_filter<input_value_type>(
1022  std::forward<OtherTransformers>(other_transform_ops)...);
1023 }
1024 
1025 template <typename Input, typename Transformer, typename Predicate,
1026  template <typename T, typename P> class Iteration,
1027  typename ... OtherTransformers,
1028  requires_iteration<Iteration<Transformer,Predicate>>,
1029  requires_pipeline<Transformer>>
1030 auto parallel_execution_tbb::make_filter(
1031  Iteration<Transformer,Predicate> &&,
1032  OtherTransformers && ...) const
1033 {
1034  static_assert(!is_pipeline<Transformer>, "Not implemented");
1035 }
1036 
1037 
1038 
1039 template <typename Input, typename ... Transformers,
1040  template <typename...> class Pipeline,
1041  typename ... OtherTransformers,
1042  requires_pipeline<Pipeline<Transformers...>>>
1043 auto parallel_execution_tbb::make_filter(
1044  Pipeline<Transformers...> && pipeline_obj,
1045  OtherTransformers && ... other_transform_ops) const
1046 {
1047  return this->template make_filter_nested<Input>(
1048  std::tuple_cat(pipeline_obj.transformers(),
1049  std::forward_as_tuple(other_transform_ops...)),
1050  std::make_index_sequence<sizeof...(Transformers)+sizeof...(OtherTransformers)>());
1051 }
1052 
1053 template <typename Input, typename ... Transformers,
1054  std::size_t ... I>
1055 auto parallel_execution_tbb::make_filter_nested(
1056  std::tuple<Transformers...> && transform_ops,
1057  std::index_sequence<I...>) const
1058 {
1059  return this->template make_filter<Input>(
1060  std::forward<Transformers>(std::get<I>(transform_ops))...);
1061 }
1062 
1063 
1064 } // end namespace grppi
1065 
1066 #else // GRPPI_TBB not defined
1067 
1068 namespace grppi {
1069 
1072 struct parallel_execution_tbb {};
1073 
1079 template <typename E>
1080 constexpr bool is_parallel_execution_tbb() {
1081  return false;
1082 }
1083 
1084 } // end namespace grppi
1085 
1086 #endif // GRPPI_TBB
1087 
1088 #endif
Definition: mpmc_queue.h:33
bool push(T item)
Definition: mpmc_queue.h:130
T pop()
Definition: mpmc_queue.h:95
TBB parallel execution policy.
Definition: parallel_execution_tbb.h:39
parallel_execution_tbb() noexcept
Default construct a TBB parallel execution policy.
Definition: parallel_execution_tbb.h:50
void map(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, Transformer transform_op) const
Applies a transformation to multiple sequences leaving the result in another sequence using available...
Definition: parallel_execution_tbb.h:536
auto map_reduce(std::tuple< InputIterators... > firsts, std::size_t sequence_size, Identity &&identity, Transformer &&transform_op, Combiner &&combine_op) const
Applies a map/reduce operation to a sequence of data items.
Definition: parallel_execution_tbb.h:569
auto reduce(InputIterator first, std::size_t sequence_size, Identity &&identity, Combiner &&combine_op) const
Applies a reduction to a sequence of data items.
Definition: parallel_execution_tbb.h:551
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_tbb.h:85
parallel_execution_tbb(int concurrency_degree, bool order=true) noexcept
Constructs a TBB parallel execution policy.
Definition: parallel_execution_tbb.h:62
int concurrency_degree() const noexcept
Get number of grppi threads.
Definition: parallel_execution_tbb.h:75
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_tbb.h:80
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: parallel_execution_tbb.h:645
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_tbb.h:107
void pipeline(mpmc_queue< InputType > &input_queue, Transformer &&transform_op, mpmc_queue< OutputType > &output_queue) const
Invoke Pipeline pattern coming from another context that uses mpmc_queues as communication channels.
Definition: parallel_execution_tbb.h:258
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_tbb.h:70
int tokens() const noexcept
Definition: parallel_execution_tbb.h:114
void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a transformation to multiple sequences leaving the result in another sequence.
Definition: parallel_execution_tbb.h:612
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_tbb.h:90
void pipeline(Generator &&generate_op, Transformers &&... transform_op) const
Invoke Pipeline pattern.
Definition: parallel_execution_tbb.h:674
void set_queue_attributes(int size, queue_mode mode, int tokens) noexcept
Sets the attributes for the queues built through make_queue<T>()
Definition: parallel_execution_tbb.h:95
Sequential execution policy.
Definition: sequential_execution.h:36
constexpr void stencil(std::tuple< InputIterators... > firsts, OutputIterator first_out, std::size_t sequence_size, StencilTransformer &&transform_op, Neighbourhood &&neighbour_op) const
Applies a stencil to multiple sequences leaving the result in another sequence.
Definition: sequential_execution.h:497
auto divide_conquer(Input &&input, Divider &&divide_op, Solver &&solve_op, Combiner &&combine_op) const
Invoke md_divide-conquer.
Definition: sequential_execution.h:538
Definition: callable_traits.h:21
typename std::enable_if_t< is_pipeline< T >, int > requires_pipeline
Definition: pipeline_pattern.h:107
typename internal::output_value_type< I, T >::type output_value_type
Definition: pipeline_pattern.h:128
typename std::enable_if_t< is_farm< T >, int > requires_farm
Definition: farm_pattern.h:85
constexpr bool supports_pipeline< parallel_execution_tbb >()
Determines if an execution policy supports the pipeline pattern.
Definition: parallel_execution_tbb.h:532
constexpr bool supports_map_reduce< parallel_execution_tbb >()
Determines if an execution policy supports the map-reduce pattern.
Definition: parallel_execution_tbb.h:511
typename std::result_of< Transformer(Input)>::type result_type
Determines the return type of applying a function on a input type.
Definition: patterns.h:105
constexpr bool is_supported< parallel_execution_tbb >()
Determines if an execution policy is supported in the current compilation.
Definition: parallel_execution_tbb.h:490
queue_mode
Definition: mpmc_queue.h:30
auto iterators_next(T &&t, int n)
Computes next n steps from a tuple of iterators.
Definition: iterator.h:170
constexpr bool supports_divide_conquer< parallel_execution_tbb >()
Determines if an execution policy supports the divide/conquer pattern.
Definition: parallel_execution_tbb.h:525
constexpr bool supports_reduce< parallel_execution_tbb >()
Determines if an execution policy supports the reduce pattern.
Definition: parallel_execution_tbb.h:504
decltype(auto) apply_iterators_indexed(F &&f, T &&t, std::size_t i)
Applies a callable object to the values obtained from the iterators in a tuple by indexing....
Definition: iterator.h:142
constexpr bool supports_map< parallel_execution_tbb >()
Determines if an execution policy supports the map pattern.
Definition: parallel_execution_tbb.h:497
std::enable_if_t< is_no_pattern< T >, int > requires_no_pattern
Definition: patterns.h:87
constexpr bool supports_stencil< parallel_execution_tbb >()
Determines if an execution policy supports the stencil pattern.
Definition: parallel_execution_tbb.h:518
constexpr bool is_parallel_execution_tbb()
Metafunction that determines if type E is parallel_execution_tbb.
Definition: parallel_execution_tbb.h:481