GrPPI  1.0
Generic and Reusable Parallel Pattern Interface
mpmc_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright 2018 Universidad Carlos III de Madrid
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef GRPPI_COMMON_MPMC_QUEUE_H
17 #define GRPPI_COMMON_MPMC_QUEUE_H
18 
19 
20 #include <vector>
21 #include <atomic>
22 #include <iostream>
23 #include <mutex>
24 #include <condition_variable>
25 
26 namespace grppi{
27 
28 
29 
30 enum class queue_mode {lockfree = true, blocking = false};
31 
32 template <typename T>
33 class mpmc_queue{
34 
35  public:
36  using value_type = T;
37 
38  mpmc_queue<T>(int q_size, queue_mode q_mode ) :
39  size{q_size},
40  buffer{std::vector<T>(q_size)},
41  mode{q_mode},
42  pread{0},
43  pwrite{0},
44  internal_pread{0},
45  internal_pwrite{0}
46  {}
47 
49  size{q.size},
50  buffer{std::move(q.buffer)},
51  mode{q.mode},
52  pread{q.pread.load()},
53  pwrite{q.pwrite.load()},
54  internal_pread{q.internal_pread.load()},
55  internal_pwrite{q.internal_pwrite.load()},
56  m{},
57  empty{},
58  full{}
59  {}
60 
61  mpmc_queue(const mpmc_queue &) = delete;
62  mpmc_queue & operator=(const mpmc_queue &) = delete;
63 
64  bool is_empty () const noexcept;
65  T pop () ;
66  bool push (T item) ;
67 
68  private:
69  bool is_full (unsigned long long current) const noexcept;
70  bool is_empty (unsigned long long current) const noexcept;
71 
72  int size;
73  std::vector<T> buffer;
74  queue_mode mode;
75 
76  std::atomic<unsigned long long> pread;
77  std::atomic<unsigned long long> pwrite;
78  std::atomic<unsigned long long> internal_pread;
79  std::atomic<unsigned long long> internal_pwrite;
80 
81 
82  std::mutex m {};
83  std::condition_variable empty{};
84  std::condition_variable full{};
85 
86 };
87 
88 
89 template <typename T>
90 bool mpmc_queue<T>::is_empty() const noexcept {
91  return pread.load()==pwrite.load();
92 }
93 
94 template <typename T>
96  if(mode == queue_mode::lockfree){
97 
98  unsigned long long current;
99 
100  do{
101  current = internal_pread.load();
102  }while(!internal_pread.compare_exchange_weak(current, current+1));
103 
104  while(is_empty(current));
105 
106  auto item = std::move(buffer[current%size]);
107  auto aux = current;
108  do{
109  current = aux;
110  }while(!pread.compare_exchange_weak(current, current+1));
111 
112  return std::move(item);
113  }else{
114 
115  std::unique_lock<std::mutex> lk(m);
116  while(is_empty(pread)){
117  empty.wait(lk);
118  }
119  auto item = std::move(buffer[pread%size]);
120  pread++;
121  lk.unlock();
122  full.notify_one();
123 
124  return std::move(item);
125  }
126 
127 }
128 
129 template <typename T>
130 bool mpmc_queue<T>::push(T item){
131  if(mode == queue_mode::lockfree){
132  unsigned long long current;
133  do{
134  current = internal_pwrite.load();
135  }while(!internal_pwrite.compare_exchange_weak(current, current+1));
136 
137  while(is_full(current));
138 
139  buffer[current%size] = std::move(item);
140 
141  auto aux = current;
142  do{
143  current = aux;
144  }while(!pwrite.compare_exchange_weak(current, current+1));
145 
146  return true;
147  }else{
148 
149  std::unique_lock<std::mutex> lk(m);
150  while(is_full(pwrite)){
151  full.wait(lk);
152  }
153  buffer[pwrite%size] = std::move(item);
154 
155  pwrite++;
156  lk.unlock();
157  empty.notify_one();
158 
159  return true;
160  }
161 }
162 
163 template <typename T>
164 bool mpmc_queue<T>::is_empty(unsigned long long current) const noexcept {
165  if(current >= pwrite.load()) return true;
166  return false;
167 }
168 
169 template <typename T>
170 bool mpmc_queue<T>::is_full(unsigned long long current) const noexcept{
171  if(current >= (pread.load()+size)) return true;
172  return false;
173 
174 }
175 
176 namespace internal {
177 
178 template <typename T>
179 struct is_queue : std::false_type {};
180 
181 template <typename T>
182 struct is_queue<mpmc_queue<T>> : std::true_type {};
183 
184 }
185 
186 template <typename T>
187 constexpr bool is_queue = internal::is_queue<T>();
188 
189 template <typename T>
190 using requires_queue = std::enable_if_t<is_queue<T>, int>;
191 
192 }
193 
194 #endif
Definition: mpmc_queue.h:33
mpmc_queue & operator=(const mpmc_queue &)=delete
mpmc_queue(const mpmc_queue &)=delete
T value_type
Definition: mpmc_queue.h:36
mpmc_queue(mpmc_queue &&q)
Definition: mpmc_queue.h:48
bool is_empty() const noexcept
Definition: mpmc_queue.h:90
bool push(T item)
Definition: mpmc_queue.h:130
T pop()
Definition: mpmc_queue.h:95
Definition: callable_traits.h:21
constexpr bool is_queue
Definition: mpmc_queue.h:187
queue_mode
Definition: mpmc_queue.h:30
std::enable_if_t< is_queue< T >, int > requires_queue
Definition: mpmc_queue.h:190
Definition: mpmc_queue.h:179