16 #ifndef GRPPI_COMMON_MPMC_QUEUE_H
17 #define GRPPI_COMMON_MPMC_QUEUE_H
24 #include <condition_variable>
40 buffer{std::vector<T>(q_size)},
50 buffer{std::move(q.buffer)},
52 pread{q.pread.load()},
53 pwrite{q.pwrite.load()},
54 internal_pread{q.internal_pread.load()},
55 internal_pwrite{q.internal_pwrite.load()},
69 bool is_full (
unsigned long long current) const noexcept;
70 bool is_empty (
unsigned long long current) const noexcept;
73 std::vector<T> buffer;
76 std::atomic<
unsigned long long> pread;
77 std::atomic<
unsigned long long> pwrite;
78 std::atomic<
unsigned long long> internal_pread;
79 std::atomic<
unsigned long long> internal_pwrite;
83 std::condition_variable empty{};
84 std::condition_variable full{};
91 return pread.load()==pwrite.load();
98 unsigned long long current;
101 current = internal_pread.load();
102 }
while(!internal_pread.compare_exchange_weak(current, current+1));
104 while(is_empty(current));
106 auto item = std::move(buffer[current%size]);
110 }
while(!pread.compare_exchange_weak(current, current+1));
112 return std::move(item);
115 std::unique_lock<std::mutex> lk(m);
116 while(is_empty(pread)){
119 auto item = std::move(buffer[pread%size]);
124 return std::move(item);
129 template <
typename T>
132 unsigned long long current;
134 current = internal_pwrite.load();
135 }
while(!internal_pwrite.compare_exchange_weak(current, current+1));
137 while(is_full(current));
139 buffer[current%size] = std::move(item);
144 }
while(!pwrite.compare_exchange_weak(current, current+1));
149 std::unique_lock<std::mutex> lk(m);
150 while(is_full(pwrite)){
153 buffer[pwrite%size] = std::move(item);
163 template <
typename T>
165 if(current >= pwrite.load())
return true;
169 template <
typename T>
170 bool mpmc_queue<T>::is_full(
unsigned long long current)
const noexcept{
171 if(current >= (pread.load()+size))
return true;
178 template <
typename T>
181 template <
typename T>
186 template <
typename T>
189 template <
typename T>
Definition: mpmc_queue.h:33
mpmc_queue & operator=(const mpmc_queue &)=delete
mpmc_queue(const mpmc_queue &)=delete
T value_type
Definition: mpmc_queue.h:36
mpmc_queue(mpmc_queue &&q)
Definition: mpmc_queue.h:48
bool is_empty() const noexcept
Definition: mpmc_queue.h:90
bool push(T item)
Definition: mpmc_queue.h:130
T pop()
Definition: mpmc_queue.h:95
Definition: callable_traits.h:21
constexpr bool is_queue
Definition: mpmc_queue.h:187
queue_mode
Definition: mpmc_queue.h:30
std::enable_if_t< is_queue< T >, int > requires_queue
Definition: mpmc_queue.h:190
Definition: mpmc_queue.h:179