1.源码

cyber/base/bounded_queue.h中实现了一个有界无锁队列的模板类,通过c++提供的原子操作来确保线程安全,代码如下:

template <typename T>
class BoundedQueue {
public:
using value_type = T;
using size_type = uint64_t;

public:
BoundedQueue() {}
BoundedQueue& operator=(const BoundedQueue& other) = delete;
BoundedQueue(const BoundedQueue& other) = delete;
~BoundedQueue();
bool Init(uint64_t size);
bool Init(uint64_t size, WaitStrategy* strategy);
bool Enqueue(const T& element);
bool Enqueue(T&& element);
bool WaitEnqueue(const T& element);
bool WaitEnqueue(T&& element);
bool Dequeue(T* element);
bool WaitDequeue(T* element);
uint64_t Size();
bool Empty();
void SetWaitStrategy(WaitStrategy* WaitStrategy);
void BreakAllWait();
uint64_t Head() { return head_.load(); }
uint64_t Tail() { return tail_.load(); }
uint64_t Commit() { return commit_.load(); }

private:
uint64_t GetIndex(uint64_t num);

alignas(CACHELINE_SIZE) std::atomic<uint64_t> head_ = {0};
alignas(CACHELINE_SIZE) std::atomic<uint64_t> tail_ = {1};
alignas(CACHELINE_SIZE) std::atomic<uint64_t> commit_ = {1};
// alignas(CACHELINE_SIZE) std::atomic<uint64_t> size_ = {0};
uint64_t pool_size_ = 0;
T* pool_ = nullptr;
std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr;
volatile bool break_all_wait_ = false;
};

template <typename T>
BoundedQueue<T>::~BoundedQueue() {
if (wait_strategy_) {
BreakAllWait();
}
if (pool_) {
for (uint64_t i = 0; i < pool_size_; ++i) {
pool_[i].~T();
}
std::free(pool_);
}
}

template <typename T>
inline bool BoundedQueue<T>::Init(uint64_t size) {
return Init(size, new SleepWaitStrategy());
}

template <typename T>
bool BoundedQueue<T>::Init(uint64_t size, WaitStrategy* strategy) {
// Head and tail each occupy a space
pool_size_ = size + 2;
pool_ = reinterpret_cast<T*>(std::calloc(pool_size_, sizeof(T)));
if (pool_ == nullptr) {
return false;
}
for (uint64_t i = 0; i < pool_size_; ++i) {
new (&(pool_[i])) T();
}
wait_strategy_.reset(strategy);
return true;
}

template <typename T>
bool BoundedQueue<T>::Enqueue(const T& element) {
uint64_t new_tail = 0;
uint64_t old_commit = 0;
uint64_t old_tail = tail_.load(std::memory_order_acquire);
do {
new_tail = old_tail + 1;
if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
return false;
}
} while (!tail_.compare_exchange_weak(old_tail, new_tail,
std::memory_order_acq_rel,
std::memory_order_relaxed));
pool_[GetIndex(old_tail)] = element;
do {
old_commit = old_tail;
} while (cyber_unlikely(!commit_.compare_exchange_weak(
old_commit, new_tail, std::memory_order_acq_rel,
std::memory_order_relaxed)));
//唤醒一个等待的线程,告诉它队列中已经有新的元素可以被取出或处理了。
wait_strategy_->NotifyOne();
return true;
}

template <typename T>
bool BoundedQueue<T>::Enqueue(T&& element) {
uint64_t new_tail = 0;
uint64_t old_commit = 0;
uint64_t old_tail = tail_.load(std::memory_order_acquire);
do {
new_tail = old_tail + 1;
if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
return false;
}
} while (!tail_.compare_exchange_weak(old_tail, new_tail,
std::memory_order_acq_rel,
std::memory_order_relaxed));
pool_[GetIndex(old_tail)] = std::move(element);
do {
old_commit = old_tail;
} while (cyber_unlikely(!commit_.compare_exchange_weak(
old_commit, new_tail, std::memory_order_acq_rel,
std::memory_order_relaxed)));
//唤醒一个等待的线程,告诉它队列中已经有新的元素可以被取出或处理了。
wait_strategy_->NotifyOne();
return true;
}

template <typename T>
bool BoundedQueue<T>::Dequeue(T* element) {
uint64_t new_head = 0;
uint64_t old_head = head_.load(std::memory_order_acquire);
do {
new_head = old_head + 1;
if (new_head == commit_.load(std::memory_order_acquire)) {
return false;
}
*element = pool_[GetIndex(new_head)];
} while (!head_.compare_exchange_weak(old_head, new_head,
std::memory_order_acq_rel,
std::memory_order_relaxed));
return true;
}

template <typename T>
bool BoundedQueue<T>::WaitEnqueue(const T& element) {
while (!break_all_wait_) {
if (Enqueue(element)) {
return true;
}
if (wait_strategy_->EmptyWait()) {
continue;
}
// wait timeout
break;
}

return false;
}

template <typename T>
bool BoundedQueue<T>::WaitEnqueue(T&& element) {
while (!break_all_wait_) {
if (Enqueue(std::move(element))) {
return true;
}
if (wait_strategy_->EmptyWait()) {
continue;
}
// wait timeout
break;
}

return false;
}

template <typename T>
bool BoundedQueue<T>::WaitDequeue(T* element) {
while (!break_all_wait_) {
if (Dequeue(element)) {
return true;
}
if (wait_strategy_->EmptyWait()) {
continue;
}
// wait timeout
break;
}

return false;
}

template <typename T>
inline uint64_t BoundedQueue<T>::Size() {
return tail_ - head_ - 1;
}

template <typename T>
inline bool BoundedQueue<T>::Empty() {
return Size() == 0;
}

template <typename T>
inline uint64_t BoundedQueue<T>::GetIndex(uint64_t num) {
return num - (num / pool_size_) * pool_size_; // faster than %
}

template <typename T>
inline void BoundedQueue<T>::SetWaitStrategy(WaitStrategy* strategy) {
wait_strategy_.reset(strategy);
}

template <typename T>
inline void BoundedQueue<T>::BreakAllWait() {
break_all_wait_ = true;
wait_strategy_->BreakAllWait();
}

2.有界无锁队列的实现

首先来看BoundedQueue 这个模板类拥有的数据成员:

alignas(CACHELINE_SIZE) std::atomic<uint64_t> head_ = {0};
alignas(CACHELINE_SIZE) std::atomic<uint64_t> tail_ = {1};
alignas(CACHELINE_SIZE) std::atomic<uint64_t> commit_ = {1};
// alignas(CACHELINE_SIZE) std::atomic<uint64_t> size_ = {0};
uint64_t pool_size_ = 0;
T* pool_ = nullptr;
std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr;
volatile bool break_all_wait_ = false;

有界的无锁队列是采用顺序存储结构来实现的,可以理解为一个数组,所以头部和尾部的描述就不是使用的链式结构的指针来维护的,直接定义为u64格式的索引,pool_size_就是这个数组的大小,wait_strategy_是一个unique_ptr,因此wait_strategy_指向的内容不允许被其他指针共享,T* pool是指针,用来指向队列数组的头,在初始化时需要为此指针分配内存大小

还定义了一个私有函数:

/* 由于是无符号整数,所以返回的是索引,类似于取余*/
template <typename T>
inline uint64_t BoundedQueue<T>::GetIndex(uint64_t num) {
return num - (num / pool_size_) * pool_size_; // faster than %
}

说明一下上面的操作相当于取余操作,因为无符号整型的数相除,得到的值也是一个无符号数,如果相除的值小于1,则得到的值是0,所以实现了类似取余的操作。

然后看init函数:

template <typename T>
bool BoundedQueue<T>::Init(uint64_t size, WaitStrategy* strategy) {
// Head and tail each occupy a space
pool_size_ = size + 2;
pool_ = reinterpret_cast<T*>(std::calloc(pool_size_, sizeof(T)));
if (pool_ == nullptr) {
return false;
}
for (uint64_t i = 0; i < pool_size_; ++i) {
new (&(pool_[i])) T();
}
wait_strategy_.reset(strategy);
return true;
}
  • std::calloc(pool_size_, sizeof(T)): 这部分使用了 C 标准库函数 calloc,该函数用于分配指定数量的元素并将它们的内存初始化为零。在这里,它分配了 pool_size_ 个元素,每个元素的大小为 sizeof(T) 字节。callocmalloc 不同之处在于它会将分配的内存初始化为零。

  • reinterpret_cast<T*>(...): 这是 C++ 中的类型转换语法,用于将 calloc 返回的指针从 void* 类型转换为 T* 类型。reinterpret_cast 表示底层的二进制表示不发生改变,这种转换通常用于进行低级别的类型转换。

  • new (&(pool_[i])) T(); 这个用法是在指定的内存地址上调用 T 类型的构造函数。这是一个称为 “placement new” 的 C++ 特性。它允许我们在给定的内存地址上构造对象,而不是在默认的堆上分配内存。这通常用于需要对内存进行更精细控制的场景。

在调用init函数后,队列的内存模型如下:

image-20231124152948409

这里其实我没想明白为啥要pool_size_ = size + 2;看写的注释说是head_tail_会占空间。

接着是向队列中添加元素:

template <typename T>
bool BoundedQueue<T>::Enqueue(const T& element) {
uint64_t new_tail = 0;
uint64_t old_commit = 0;
uint64_t old_tail = tail_.load(std::memory_order_acquire);
do {
new_tail = old_tail + 1;
if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) {
return false;
}
} while (!tail_.compare_exchange_weak(old_tail, new_tail,
std::memory_order_acq_rel,
std::memory_order_relaxed));
pool_[GetIndex(old_tail)] = element;
do {
old_commit = old_tail;
} while (cyber_unlikely(!commit_.compare_exchange_weak(
old_commit, new_tail, std::memory_order_acq_rel,
std::memory_order_relaxed)));
wait_strategy_->NotifyOne();
return true;
}

首先是将new_tail+1,然后判断队列是否越界,然后调用tail_.compare_exchange_weak 来原子的更新tail_的值。

  //tail_为原子变量,将当前的tail_的值和old_tail进行比较,如果相等,则tail_更新为new_tail
//返回true,!操作返回fasle,使得跳出循环,开始下面的入队操作
//否则,如果tail_的值和old_tail不相等(将old_tail更新为当前的tail_值),
//说明其他线程已经做了do里边的操作并且tail_值已经更新,已经抢先入队
//这时返回false,!操作返回true,继续下一次执行do里面的操作,等待入队的时机(或队列已满返回false)
//在old_tail的位置入队,old_tail可能在上面的循环了进行了多次的累加
//和程序入口的old_tail可能已经不同了

image-20231124165704805

上面的逻辑是当两个线程操作时,假设其中一个线程执行完毕new_tail = old_tail + 1;后,另外一个线程已经更新了tail_的值,此时就会出现第二张图的情况,tail_不等去当前线程获得的old_tail_,因此将当前线程的old_tail跟新为新的tail_,然后再返回while循环,将new_tail_+1,然后再进行比较发现此时的tail_=old_tail_了,然后再次更新tail_的值就成了第三幅图的样子。

然后将新加的值放入pool_[GetIndex(old_tail)]

这里使用了一个很有意思的宏#define cyber_unlikely(x) (__builtin_expect((x), 0))

宏定义使用了 GCC 内建函数 __builtin_expect,它是 GCC 编译器提供的一个特殊内建函数,用于提供条件概率的提示,以帮助编译器进行更好的代码优化。

具体而言,这个宏的作用是提示编译器表达式 (x) 的结果是不太可能的。__builtin_expect 函数的参数是两个值,第一个是表达式,第二个是期望的结果。在这里,0 表示不太可能的分支。

do {
old_commit = old_tail;
} while (cyber_unlikely(!commit_.compare_exchange_weak(
old_commit, new_tail, std::memory_order_acq_rel,
std::memory_order_relaxed)));

这里有点绕,假设只有一个线程执行入队操作,那么old_commitcommit的值是相等的,所以此时会将commit_的值更新到新的tail_的位置

image-20231124163223274

假设有两个线程操作了此队列:后修改tail_的那个线程就会出现如下的情况:old_commit!=commit,因此此线程的old_commit值会被更新成commit的值,然后在do里面old_commit又会被更新成old_tail的值,相当于它一直阻塞在这里了

image-20231124170412439

而对于先修改tail_的那个线程来说:old_commit=commit,因此会跳出do循环,同时将commit_的值更新成此线程的new_tail的值,即上面那个线程的old_tail的值。

image-20231124170550072

因此commit_的值是完全根据入队的顺序进行递增的,不同线程根据入队的循序依次跳出该循环,哪个线程先完成入队操作,哪个线程先跳出该while循环。

但我感觉这个commit的值没啥屌用阿,md还很难理解

然后是从队列中取出元素:

template <typename T>
bool BoundedQueue<T>::Dequeue(T* element) {
uint64_t new_head = 0;
uint64_t old_head = head_.load(std::memory_order_acquire);
do {
new_head = old_head + 1;
if (new_head == commit_.load(std::memory_order_acquire)) {
return false;
}
*element = pool_[GetIndex(new_head)];
} while (!head_.compare_exchange_weak(old_head, new_head,
std::memory_order_acq_rel,
std::memory_order_relaxed));
return true;
}

当队列里有数据的时候,head_old_head比较,如果相等,则更新为new_head并返回true,退出循环。如果不等,则说明其他线程已经取走了当前的head元素,将old_head更新为head_值并进入下一次do里面的操作,这里说明一下为什么去元素是从new_head处取,因为插入元素时,head_tail_处是不会放元素的。

当队列里没数据的时候,第一次do的操作,会对new_head的值和commit_的值进行判断,如果这两个值相等,说明队列里没数据,则返回false

image-20231124172803708

然后是实现了等待策略的插入函数:

template <typename T>
bool BoundedQueue<T>::WaitEnqueue(T&& element) {
while (!break_all_wait_) {
if (Enqueue(std::move(element))) {
return true;
}
if (wait_strategy_->EmptyWait()) {
continue;
}
// wait timeout
break;
}

return false;
}

这里实现了等待机制,如果队列未满,则立马插入返回,否则进入空等状态

实现了等待策略的取出函数:如果队列中没数据则空等

template <typename T>
bool BoundedQueue<T>::WaitDequeue(T* element) {
while (!break_all_wait_) {
if (Dequeue(element)) {
return true;
}
if (wait_strategy_->EmptyWait()) {
continue;
}
// wait timeout
break;
}

return false;
}

队列的入队操作就是往尾部添加元素,然后移动尾部指针。出队操作就是从头部取出一个元素,然后移动头部指针,虽然移动了头部指针,但是之前申请的内存是还在的,并不会释放。

参考链接