GrPPI  0.2
Generic and Reusable Parallel Pattern Interface
mpmc_queue.h
Go to the documentation of this file.
1 
21 #ifndef GRPPI_COMMON_MPMC_QUEUE_H
22 #define GRPPI_COMMON_MPMC_QUEUE_H
23 
24 
25 #include <vector>
26 #include <atomic>
27 #include <iostream>
28 #include <mutex>
29 #include <condition_variable>
30 
31 namespace grppi{
32 
33 
34 
35 enum class queue_mode {lockfree = true, blocking = false};
36 
37 template <typename T>
38 class mpmc_queue{
39 
40  public:
41  using value_type = T;
42 
43  mpmc_queue<T>(int q_size, queue_mode q_mode ):
44  size{q_size}, buffer{std::vector<T>(q_size)}, mode{q_mode}, pread{0}, pwrite{0}, internal_pread{0}, internal_pwrite{0} { }
45 
47  size{q.size},
48  buffer{std::move(q.buffer)},
49  mode{q.mode},
50  pread{q.pread.load()},
51  pwrite{q.pwrite.load()},
52  internal_pread{q.internal_pread.load()},
53  internal_pwrite{q.internal_pwrite.load()},
54  m{},
55  empty{},
56  full{}
57  {}
58 
59  bool is_empty () const noexcept;
60  T pop () ;
61  bool push (T item) ;
62 
63  private:
64  bool is_full (unsigned long long current) const noexcept;
65  bool is_empty (unsigned long long current) const noexcept;
66 
67  int size;
68  std::vector<T> buffer;
69  queue_mode mode;
70 
71  std::atomic<unsigned long long> pread;
72  std::atomic<unsigned long long> pwrite;
73  std::atomic<unsigned long long> internal_pread;
74  std::atomic<unsigned long long> internal_pwrite;
75 
76 
77  std::mutex m;
78  std::condition_variable empty;
79  std::condition_variable full;
80 
81 };
82 
83 
84 template <typename T>
85 bool mpmc_queue<T>::is_empty() const noexcept {
86  return pread.load()==pwrite.load();
87 }
88 
89 template <typename T>
91  if(mode == queue_mode::lockfree){
92 
93  unsigned long long current;
94 
95  do{
96  current = internal_pread.load();
97  }while(!internal_pread.compare_exchange_weak(current, current+1));
98 
99  while(is_empty(current));
100 
101  auto item = std::move(buffer[current%size]);
102  auto aux = current;
103  do{
104  current = aux;
105  }while(!pread.compare_exchange_weak(current, current+1));
106 
107  return std::move(item);
108  }else{
109 
110  std::unique_lock<std::mutex> lk(m);
111  while(is_empty(pread)){
112  empty.wait(lk);
113  }
114  auto item = std::move(buffer[pread%size]);
115  pread++;
116  lk.unlock();
117  full.notify_one();
118 
119  return std::move(item);
120  }
121 
122 }
123 
124 template <typename T>
125 bool mpmc_queue<T>::push(T item){
126  if(mode == queue_mode::lockfree){
127  unsigned long long current;
128  do{
129  current = internal_pwrite.load();
130  }while(!internal_pwrite.compare_exchange_weak(current, current+1));
131 
132  while(is_full(current));
133 
134  buffer[current%size] = std::move(item);
135 
136  auto aux = current;
137  do{
138  current = aux;
139  }while(!pwrite.compare_exchange_weak(current, current+1));
140 
141  return true;
142  }else{
143 
144  std::unique_lock<std::mutex> lk(m);
145  while(is_full(pwrite)){
146  full.wait(lk);
147  }
148  buffer[pwrite%size] = std::move(item);
149 
150  pwrite++;
151  lk.unlock();
152  empty.notify_one();
153 
154  return true;
155  }
156 }
157 
158 template <typename T>
159 bool mpmc_queue<T>::is_empty(unsigned long long current) const noexcept {
160  if(current >= pwrite.load()) return true;
161  return false;
162 }
163 
164 template <typename T>
165 bool mpmc_queue<T>::is_full(unsigned long long current) const noexcept{
166  if(current >= (pread.load()+size)) return true;
167  return false;
168 
169 }
170 
171 }
172 
173 #endif
Definition: callable_traits.h:24
bool is_empty() const noexcept
Definition: mpmc_queue.h:85
mpmc_queue(mpmc_queue &&q)
Definition: mpmc_queue.h:46
queue_mode
Definition: mpmc_queue.h:35
Definition: mpmc_queue.h:38
T value_type
Definition: mpmc_queue.h:41
T pop()
Definition: mpmc_queue.h:90
bool push(T item)
Definition: mpmc_queue.h:125