GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
parallel_execution_native.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
22 #define GRPPI_NATIVE_PARALLEL_EXECUTION_NATIVE_H
23 
24 #include "../common/mpmc_queue.h"
25 #include "pool.h"
26 
27 #include <thread>
28 #include <atomic>
29 #include <algorithm>
30 #include <vector>
31 #include <type_traits>
32 
33 namespace grppi {
34 
50 public:
51  thread_registry() noexcept = default;
52 
56  void register_thread() noexcept;
57 
61  void deregister_thread() noexcept;
62 
68  int current_index() const noexcept;
69 
70 private:
71  mutable std::atomic_flag lock_ = ATOMIC_FLAG_INIT;
72  std::vector<std::thread::id> ids_;
73 };
74 
75 inline void thread_registry::register_thread() noexcept
76 {
77  using namespace std;
78  while (lock_.test_and_set(memory_order_acquire)) {}
79  auto this_id = this_thread::get_id();
80  ids_.push_back(this_id);
81  lock_.clear(memory_order_release);
82 }
83 
84 inline void thread_registry::deregister_thread() noexcept
85 {
86  using namespace std;
87  while (lock_.test_and_set(memory_order_acquire)) {}
88  auto this_id = this_thread::get_id();
89  auto current = find(begin(ids_), end(ids_), this_id);
90  *current = {}; //Empty thread
91  lock_.clear(memory_order_release);
92 }
93 
94 inline int thread_registry::current_index() const noexcept
95 {
96  using namespace std;
97  while (lock_.test_and_set(memory_order_acquire)) {}
98  auto this_id = this_thread::get_id();
99  auto current = find(begin(ids_), end(ids_), this_id);
100  auto index = distance(begin(ids_), current);
101  lock_.clear(memory_order_release);
102  return index;
103 };
104 
112 public:
117  : registry_{registry}
118  { registry_.register_thread(); }
119 
124  registry_.deregister_thread();
125  }
126 
127 private:
128  thread_registry & registry_;
129 };
130 
137 public:
138 
151  static_cast<int>(2 * std::thread::hardware_concurrency()),
152  true}
153  {}
154 
163  parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept :
164  concurrency_degree_{concurrency_degree},
165  ordering_{ordering}
166  {
167  pool.initialise(concurrency_degree_);
168  }
169 
173  void set_concurrency_degree(int degree) noexcept { concurrency_degree_ = degree; }
174 
178  int concurrency_degree() const noexcept { return concurrency_degree_; }
179 
183  void enable_ordering() noexcept { ordering_=true; }
184 
188  void disable_ordering() noexcept { ordering_=false; }
189 
193  bool is_ordered() const noexcept { return ordering_; }
194 
200  return native_thread_manager{thread_registry_};
201  }
202 
207  int get_thread_id() const noexcept {
208  return thread_registry_.current_index();
209  }
210 
214  void set_queue_attributes(int size, queue_mode mode) noexcept {
215  queue_size_ = size;
216  queue_mode_ = mode;
217  }
218 
224  template <typename T>
226  return {queue_size_, queue_mode_};
227  }
228 
229 public:
236 
237 private:
238  thread_registry thread_registry_;
239 
240  int concurrency_degree_;
241  bool ordering_;
242 
243  constexpr static int default_queue_size = 100;
244  int queue_size_ = default_queue_size;
245 
246  queue_mode queue_mode_ = queue_mode::blocking;
247 };
248 
253 template <typename E>
255  return std::is_same<E, parallel_execution_native>::value;
256 }
257 
262 template <typename E>
263 constexpr bool is_supported();
264 
268 template <>
270  return true;
271 }
272 
273 
274 } // end namespace grppi
275 
276 
277 #endif
Definition: callable_traits.h:24
void set_concurrency_degree(int degree) noexcept
Set number of grppi threads.
Definition: parallel_execution_native.h:173
constexpr bool is_supported< parallel_execution_native >()
Specialization stating that parallel_execution_native is supported.
Definition: parallel_execution_native.h:269
native_thread_manager(thread_registry &registry)
Saves a reference to the registry and registers current thread.
Definition: parallel_execution_native.h:116
constexpr bool is_supported()
Metafunction that determines if type E is supported in the current build.
void enable_ordering() noexcept
Enable ordering.
Definition: parallel_execution_native.h:183
int concurrency_degree() const noexcept
Get number of grppi trheads.
Definition: parallel_execution_native.h:178
void set_queue_attributes(int size, queue_mode mode) noexcept
Sets the attributes for the queues built through make_queue<T>()
Definition: parallel_execution_native.h:214
STL namespace.
thread_pool pool
Thread pool for lanching workers.
Definition: parallel_execution_native.h:235
queue_mode
Definition: mpmc_queue.h:35
void deregister_thread() noexcept
Removes current thread id from the registry.
Definition: parallel_execution_native.h:84
Native parallel execution policy. This policy uses ISO C++ threads as implementation building block a...
Definition: parallel_execution_native.h:136
thread_registry() noexcept=default
Definition: mpmc_queue.h:38
parallel_execution_native() noexcept
Default construct a native parallel execution policy.
Definition: parallel_execution_native.h:149
~native_thread_manager()
Deregisters current thread from the registry.
Definition: parallel_execution_native.h:123
constexpr bool is_parallel_execution_native()
Metafunction that determines if type E is parallel_execution_native.
Definition: parallel_execution_native.h:254
void register_thread() noexcept
Adds the current thread id in the registry.
Definition: parallel_execution_native.h:75
native_thread_manager thread_manager()
Get a manager object for registration/deregistration in the thread index table for current thread...
Definition: parallel_execution_native.h:199
int current_index() const noexcept
Integer index for current thread.
Definition: parallel_execution_native.h:94
parallel_execution_native(int concurrency_degree, bool ordering=true) noexcept
Constructs a native parallel execution policy.
Definition: parallel_execution_native.h:163
Thread index table to provide portable natural thread indices.
Definition: parallel_execution_native.h:49
void disable_ordering() noexcept
Disable ordering.
Definition: parallel_execution_native.h:188
RAII class to manage registration/deregistration pairs. This class allows to manage automatic deregis...
Definition: parallel_execution_native.h:111
Definition: pool.h:30
mpmc_queue< T > make_queue() const
Makes a communication queue for elements of type T. Constructs a queue using the attributes that can ...
Definition: parallel_execution_native.h:225
int get_thread_id() const noexcept
Get index of current thread in the thread table.
Definition: parallel_execution_native.h:207
bool is_ordered() const noexcept
Is execution ordered.
Definition: parallel_execution_native.h:193