21 #ifndef GRPPI_COMMON_MPMC_QUEUE_H 22 #define GRPPI_COMMON_MPMC_QUEUE_H 29 #include <condition_variable> 44 size{q_size}, buffer{std::vector<T>(q_size)}, mode{q_mode}, pread{0}, pwrite{0}, internal_pread{0}, internal_pwrite{0} { }
48 buffer{std::move(q.buffer)},
50 pread{q.pread.load()},
51 pwrite{q.pwrite.load()},
52 internal_pread{q.internal_pread.load()},
53 internal_pwrite{q.internal_pwrite.load()},
59 bool is_empty ()
const noexcept;
64 bool is_full (
unsigned long long current)
const noexcept;
65 bool is_empty (
unsigned long long current)
const noexcept;
68 std::vector<T> buffer;
71 std::atomic<unsigned long long> pread;
72 std::atomic<unsigned long long> pwrite;
73 std::atomic<unsigned long long> internal_pread;
74 std::atomic<unsigned long long> internal_pwrite;
78 std::condition_variable empty;
79 std::condition_variable full;
86 return pread.load()==pwrite.load();
93 unsigned long long current;
96 current = internal_pread.load();
97 }
while(!internal_pread.compare_exchange_weak(current, current+1));
99 while(is_empty(current));
101 auto item = std::move(buffer[current%size]);
105 }
while(!pread.compare_exchange_weak(current, current+1));
107 return std::move(item);
110 std::unique_lock<std::mutex> lk(m);
111 while(is_empty(pread)){
114 auto item = std::move(buffer[pread%size]);
119 return std::move(item);
124 template <
typename T>
127 unsigned long long current;
129 current = internal_pwrite.load();
130 }
while(!internal_pwrite.compare_exchange_weak(current, current+1));
132 while(is_full(current));
134 buffer[current%size] = std::move(item);
139 }
while(!pwrite.compare_exchange_weak(current, current+1));
144 std::unique_lock<std::mutex> lk(m);
145 while(is_full(pwrite)){
148 buffer[pwrite%size] = std::move(item);
158 template <
typename T>
160 if(current >= pwrite.load())
return true;
164 template <
typename T>
166 if(current >= (pread.load()+size))
return true;
Definition: callable_traits.h:24
bool is_empty() const noexcept
Definition: mpmc_queue.h:85
mpmc_queue(mpmc_queue &&q)
Definition: mpmc_queue.h:46
queue_mode
Definition: mpmc_queue.h:35
Definition: mpmc_queue.h:38
T value_type
Definition: mpmc_queue.h:41
T pop()
Definition: mpmc_queue.h:90
bool push(T item)
Definition: mpmc_queue.h:125