GrPPI  0.3.1
Generic and Reusable Parallel Pattern Interface
mpmc_queue.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_COMMON_MPMC_QUEUE_H
22 #define GRPPI_COMMON_MPMC_QUEUE_H
23 
24 
25 #include <vector>
26 #include <atomic>
27 #include <iostream>
28 #include <mutex>
29 #include <condition_variable>
30 
31 namespace grppi{
32 
33 
34 
35 enum class queue_mode {lockfree = true, blocking = false};
36 
37 template <typename T>
38 class mpmc_queue{
39 
40  public:
41  using value_type = T;
42 
43  mpmc_queue<T>(int q_size, queue_mode q_mode ):
44  size{q_size}, buffer{std::vector<T>(q_size)}, mode{q_mode}, pread{0}, pwrite{0}, internal_pread{0}, internal_pwrite{0} { }
45 
47  size{q.size},
48  buffer{std::move(q.buffer)},
49  mode{q.mode},
50  pread{q.pread.load()},
51  pwrite{q.pwrite.load()},
52  internal_pread{q.internal_pread.load()},
53  internal_pwrite{q.internal_pwrite.load()},
54  m{},
55  empty{},
56  full{}
57  {}
58 
59  mpmc_queue(const mpmc_queue &) = delete;
60  mpmc_queue & operator=(const mpmc_queue &) = delete;
61 
62  bool is_empty () const noexcept;
63  T pop () ;
64  bool push (T item) ;
65 
66  private:
67  bool is_full (unsigned long long current) const noexcept;
68  bool is_empty (unsigned long long current) const noexcept;
69 
70  int size;
71  std::vector<T> buffer;
72  queue_mode mode;
73 
74  std::atomic<unsigned long long> pread;
75  std::atomic<unsigned long long> pwrite;
76  std::atomic<unsigned long long> internal_pread;
77  std::atomic<unsigned long long> internal_pwrite;
78 
79 
80  std::mutex m;
81  std::condition_variable empty;
82  std::condition_variable full;
83 
84 };
85 
86 
87 template <typename T>
88 bool mpmc_queue<T>::is_empty() const noexcept {
89  return pread.load()==pwrite.load();
90 }
91 
92 template <typename T>
94  if(mode == queue_mode::lockfree){
95 
96  unsigned long long current;
97 
98  do{
99  current = internal_pread.load();
100  }while(!internal_pread.compare_exchange_weak(current, current+1));
101 
102  while(is_empty(current));
103 
104  auto item = std::move(buffer[current%size]);
105  auto aux = current;
106  do{
107  current = aux;
108  }while(!pread.compare_exchange_weak(current, current+1));
109 
110  return std::move(item);
111  }else{
112 
113  std::unique_lock<std::mutex> lk(m);
114  while(is_empty(pread)){
115  empty.wait(lk);
116  }
117  auto item = std::move(buffer[pread%size]);
118  pread++;
119  lk.unlock();
120  full.notify_one();
121 
122  return std::move(item);
123  }
124 
125 }
126 
127 template <typename T>
128 bool mpmc_queue<T>::push(T item){
129  if(mode == queue_mode::lockfree){
130  unsigned long long current;
131  do{
132  current = internal_pwrite.load();
133  }while(!internal_pwrite.compare_exchange_weak(current, current+1));
134 
135  while(is_full(current));
136 
137  buffer[current%size] = std::move(item);
138 
139  auto aux = current;
140  do{
141  current = aux;
142  }while(!pwrite.compare_exchange_weak(current, current+1));
143 
144  return true;
145  }else{
146 
147  std::unique_lock<std::mutex> lk(m);
148  while(is_full(pwrite)){
149  full.wait(lk);
150  }
151  buffer[pwrite%size] = std::move(item);
152 
153  pwrite++;
154  lk.unlock();
155  empty.notify_one();
156 
157  return true;
158  }
159 }
160 
161 template <typename T>
162 bool mpmc_queue<T>::is_empty(unsigned long long current) const noexcept {
163  if(current >= pwrite.load()) return true;
164  return false;
165 }
166 
167 template <typename T>
168 bool mpmc_queue<T>::is_full(unsigned long long current) const noexcept{
169  if(current >= (pread.load()+size)) return true;
170  return false;
171 
172 }
173 
174 namespace internal {
175 
176 template <typename T>
177 struct is_queue : std::false_type {};
178 
179 template <typename T>
180 struct is_queue<mpmc_queue<T>> : std::true_type {};
181 
182 }
183 
184 template <typename T>
185 constexpr bool is_queue = internal::is_queue<T>();
186 
187 template <typename T>
188 using requires_queue = std::enable_if_t<is_queue<T>, int>;
189 
190 }
191 
192 #endif
Definition: callable_traits.h:26
bool is_empty() const noexcept
Definition: mpmc_queue.h:88
mpmc_queue(mpmc_queue &&q)
Definition: mpmc_queue.h:46
constexpr bool is_queue
Definition: mpmc_queue.h:185
queue_mode
Definition: mpmc_queue.h:35
Definition: mpmc_queue.h:38
std::enable_if_t< is_queue< T >, int > requires_queue
Definition: mpmc_queue.h:188
T value_type
Definition: mpmc_queue.h:41
T pop()
Definition: mpmc_queue.h:93
Definition: mpmc_queue.h:177
bool push(T item)
Definition: mpmc_queue.h:128