1.源码 在cyber/base/bounded_queue.h
中实现了一个有界无锁队列的模板类,通过c++提供的原子操作来确保线程安全,代码如下:
template <typename T>class BoundedQueue { public : using value_type = T; using size_type = uint64_t ; public : BoundedQueue () {} BoundedQueue& operator =(const BoundedQueue& other) = delete ; BoundedQueue (const BoundedQueue& other) = delete ; ~BoundedQueue (); bool Init (uint64_t size) ; bool Init (uint64_t size, WaitStrategy* strategy) ; bool Enqueue (const T& element) ; bool Enqueue (T&& element) ; bool WaitEnqueue (const T& element) ; bool WaitEnqueue (T&& element) ; bool Dequeue (T* element) ; bool WaitDequeue (T* element) ; uint64_t Size () ; bool Empty () ; void SetWaitStrategy (WaitStrategy* WaitStrategy) ; void BreakAllWait () ; uint64_t Head () { return head_.load (); } uint64_t Tail () { return tail_.load (); } uint64_t Commit () { return commit_.load (); } private : uint64_t GetIndex (uint64_t num) ; alignas (CACHELINE_SIZE) std::atomic<uint64_t > head_ = {0 }; alignas (CACHELINE_SIZE) std::atomic<uint64_t > tail_ = {1 }; alignas (CACHELINE_SIZE) std::atomic<uint64_t > commit_ = {1 }; uint64_t pool_size_ = 0 ; T* pool_ = nullptr ; std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr ; volatile bool break_all_wait_ = false ; }; template <typename T>BoundedQueue<T>::~BoundedQueue () { if (wait_strategy_) { BreakAllWait (); } if (pool_) { for (uint64_t i = 0 ; i < pool_size_; ++i) { pool_[i].~T (); } std::free (pool_); } } template <typename T>inline bool BoundedQueue<T>::Init (uint64_t size) { return Init (size, new SleepWaitStrategy ()); } template <typename T>bool BoundedQueue<T>::Init (uint64_t size, WaitStrategy* strategy) { pool_size_ = size + 2 ; pool_ = reinterpret_cast <T*>(std::calloc (pool_size_, sizeof (T))); if (pool_ == nullptr ) { return false ; } for (uint64_t i = 0 ; i < pool_size_; ++i) { new (&(pool_[i])) T (); } wait_strategy_.reset (strategy); return true ; } template <typename T>bool BoundedQueue<T>::Enqueue (const T& element) { uint64_t new_tail = 0 ; uint64_t old_commit = 0 ; uint64_t old_tail = tail_.load (std::memory_order_acquire); do { new_tail = old_tail + 1 ; if (GetIndex (new_tail) == GetIndex (head_.load (std::memory_order_acquire))) { return false ; } } while (!tail_.compare_exchange_weak (old_tail, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed)); pool_[GetIndex (old_tail)] = element; do { old_commit = old_tail; } while (cyber_unlikely (!commit_.compare_exchange_weak ( old_commit, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed))); wait_strategy_->NotifyOne (); return true ; } template <typename T>bool BoundedQueue<T>::Enqueue (T&& element) { uint64_t new_tail = 0 ; uint64_t old_commit = 0 ; uint64_t old_tail = tail_.load (std::memory_order_acquire); do { new_tail = old_tail + 1 ; if (GetIndex (new_tail) == GetIndex (head_.load (std::memory_order_acquire))) { return false ; } } while (!tail_.compare_exchange_weak (old_tail, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed)); pool_[GetIndex (old_tail)] = std::move (element); do { old_commit = old_tail; } while (cyber_unlikely (!commit_.compare_exchange_weak ( old_commit, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed))); wait_strategy_->NotifyOne (); return true ; } template <typename T>bool BoundedQueue<T>::Dequeue (T* element) { uint64_t new_head = 0 ; uint64_t old_head = head_.load (std::memory_order_acquire); do { new_head = old_head + 1 ; if (new_head == commit_.load (std::memory_order_acquire)) { return false ; } *element = pool_[GetIndex (new_head)]; } while (!head_.compare_exchange_weak (old_head, new_head, std::memory_order_acq_rel, std::memory_order_relaxed)); return true ; } template <typename T>bool BoundedQueue<T>::WaitEnqueue (const T& element) { while (!break_all_wait_) { if (Enqueue (element)) { return true ; } if (wait_strategy_->EmptyWait ()) { continue ; } break ; } return false ; } template <typename T>bool BoundedQueue<T>::WaitEnqueue (T&& element) { while (!break_all_wait_) { if (Enqueue (std::move (element))) { return true ; } if (wait_strategy_->EmptyWait ()) { continue ; } break ; } return false ; } template <typename T>bool BoundedQueue<T>::WaitDequeue (T* element) { while (!break_all_wait_) { if (Dequeue (element)) { return true ; } if (wait_strategy_->EmptyWait ()) { continue ; } break ; } return false ; } template <typename T>inline uint64_t BoundedQueue<T>::Size () { return tail_ - head_ - 1 ; } template <typename T>inline bool BoundedQueue<T>::Empty () { return Size () == 0 ; } template <typename T>inline uint64_t BoundedQueue<T>::GetIndex (uint64_t num) { return num - (num / pool_size_) * pool_size_; } template <typename T>inline void BoundedQueue<T>::SetWaitStrategy (WaitStrategy* strategy) { wait_strategy_.reset (strategy); } template <typename T>inline void BoundedQueue<T>::BreakAllWait () { break_all_wait_ = true ; wait_strategy_->BreakAllWait (); }
2.有界无锁队列的实现 首先来看BoundedQueue
这个模板类拥有的数据成员:
alignas (CACHELINE_SIZE) std::atomic<uint64_t > head_ = {0 };alignas (CACHELINE_SIZE) std::atomic<uint64_t > tail_ = {1 };alignas (CACHELINE_SIZE) std::atomic<uint64_t > commit_ = {1 };uint64_t pool_size_ = 0 ;T* pool_ = nullptr ; std::unique_ptr<WaitStrategy> wait_strategy_ = nullptr ; volatile bool break_all_wait_ = false ;
有界的无锁队列是采用顺序存储结构来实现的,可以理解为一个数组,所以头部和尾部的描述就不是使用的链式结构的指针来维护的,直接定义为u64
格式的索引,pool_size_
就是这个数组的大小,wait_strategy_
是一个unique_ptr
,因此wait_strategy_
指向的内容不允许被其他指针共享,T* pool
是指针,用来指向队列数组的头,在初始化时需要为此指针分配内存大小
还定义了一个私有函数:
template <typename T> inline uint64_t BoundedQueue<T>::GetIndex(uint64_t num) { return num - (num / pool_size_) * pool_size_; }
说明一下上面的操作相当于取余操作,因为无符号整型的数相除,得到的值也是一个无符号数,如果相除的值小于1,则得到的值是0,所以实现了类似取余的操作。
然后看init
函数:
template <typename T>bool BoundedQueue<T>::Init (uint64_t size, WaitStrategy* strategy) { pool_size_ = size + 2 ; pool_ = reinterpret_cast <T*>(std::calloc (pool_size_, sizeof (T))); if (pool_ == nullptr ) { return false ; } for (uint64_t i = 0 ; i < pool_size_; ++i) { new (&(pool_[i])) T (); } wait_strategy_.reset (strategy); return true ; }
std::calloc(pool_size_, sizeof(T))
: 这部分使用了 C 标准库函数 calloc
,该函数用于分配指定数量的元素并将它们的内存初始化为零。在这里,它分配了 pool_size_
个元素,每个元素的大小为 sizeof(T)
字节。calloc
与 malloc
不同之处在于它会将分配的内存初始化为零。
reinterpret_cast<T*>(...)
: 这是 C++ 中的类型转换语法,用于将 calloc
返回的指针从 void*
类型转换为 T*
类型。reinterpret_cast
表示底层的二进制表示不发生改变,这种转换通常用于进行低级别的类型转换。
new (&(pool_[i])) T();
这个用法是在指定的内存地址上调用 T 类型的构造函数。这是一个称为 “placement new” 的 C++ 特性。它允许我们在给定的内存地址上构造对象,而不是在默认的堆上分配内存。这通常用于需要对内存进行更精细控制的场景。
在调用init
函数后,队列的内存模型如下:
这里其实我没想明白为啥要pool_size_ = size + 2;
看写的注释说是head_
和tail_
会占空间。
接着是向队列中添加元素:
template <typename T>bool BoundedQueue<T>::Enqueue (const T& element) { uint64_t new_tail = 0 ; uint64_t old_commit = 0 ; uint64_t old_tail = tail_.load (std::memory_order_acquire); do { new_tail = old_tail + 1 ; if (GetIndex (new_tail) == GetIndex (head_.load (std::memory_order_acquire))) { return false ; } } while (!tail_.compare_exchange_weak (old_tail, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed)); pool_[GetIndex (old_tail)] = element; do { old_commit = old_tail; } while (cyber_unlikely (!commit_.compare_exchange_weak ( old_commit, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed))); wait_strategy_->NotifyOne (); return true ; }
首先是将new_tail+1
,然后判断队列是否越界,然后调用tail_.compare_exchange_weak
来原子的更新tail
_的值。
//tail_为原子变量,将当前的tail_的值和old_tail进行比较,如果相等,则tail_更新为new_tail //返回true,!操作返回fasle,使得跳出循环,开始下面的入队操作 //否则,如果tail_的值和old_tail不相等(将old_tail更新为当前的tail_值), //说明其他线程已经做了do里边的操作并且tail_值已经更新,已经抢先入队 //这时返回false,!操作返回true,继续下一次执行do里面的操作,等待入队的时机(或队列已满返回false) //在old_tail的位置入队,old_tail可能在上面的循环了进行了多次的累加 //和程序入口的old_tail可能已经不同了
上面的逻辑是当两个线程操作时,假设其中一个线程执行完毕new_tail = old_tail + 1;
后,另外一个线程已经更新了tail_
的值,此时就会出现第二张图的情况,tail_
不等去当前线程获得的old_tail_
,因此将当前线程的old_tail
跟新为新的tail_
,然后再返回while循环,将new_tail_+1
,然后再进行比较发现此时的tail_=old_tail_
了,然后再次更新tail_
的值就成了第三幅图的样子。
然后将新加的值放入pool_[GetIndex(old_tail)]
中
这里使用了一个很有意思的宏#define cyber_unlikely(x) (__builtin_expect((x), 0))
宏定义使用了 GCC 内建函数 __builtin_expect
,它是 GCC 编译器提供的一个特殊内建函数,用于提供条件概率的提示,以帮助编译器进行更好的代码优化。
具体而言,这个宏的作用是提示编译器表达式 (x)
的结果是不太可能的。__builtin_expect
函数的参数是两个值,第一个是表达式,第二个是期望的结果。在这里,0
表示不太可能的分支。
do { old_commit = old_tail; } while (cyber_unlikely (!commit_.compare_exchange_weak ( old_commit, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed)));
这里有点绕,假设只有一个线程执行入队操作,那么old_commit
和commit
的值是相等的,所以此时会将commit_
的值更新到新的tail_
的位置
假设有两个线程操作了此队列:后修改tail_
的那个线程就会出现如下的情况:old_commit!=commit
,因此此线程的old_commit
值会被更新成commit
的值,然后在do里面old_commit
又会被更新成old_tail
的值,相当于它一直阻塞在这里了
而对于先修改tail_
的那个线程来说:old_commit=commit
,因此会跳出do循环,同时将commit_
的值更新成此线程的new_tail
的值,即上面那个线程的old_tail
的值。
因此commit_的值是完全根据入队的顺序进行递增的,不同线程根据入队的循序依次跳出该循环,哪个线程先完成入队操作,哪个线程先跳出该while循环。
但我感觉这个commit的值没啥屌用阿,md还很难理解
然后是从队列中取出元素:
template <typename T>bool BoundedQueue<T>::Dequeue (T* element) { uint64_t new_head = 0 ; uint64_t old_head = head_.load (std::memory_order_acquire); do { new_head = old_head + 1 ; if (new_head == commit_.load (std::memory_order_acquire)) { return false ; } *element = pool_[GetIndex (new_head)]; } while (!head_.compare_exchange_weak (old_head, new_head, std::memory_order_acq_rel, std::memory_order_relaxed)); return true ; }
当队列里有数据的时候,head_
和old_head
比较,如果相等,则更新为new_head
并返回true
,退出循环。如果不等,则说明其他线程已经取走了当前的head
元素,将old_head
更新为head_
值并进入下一次do里面的操作,这里说明一下为什么去元素是从new_head
处取,因为插入元素时,head_
和tail_
处是不会放元素的。
当队列里没数据的时候,第一次do
的操作,会对new_head
的值和commit_
的值进行判断,如果这两个值相等,说明队列里没数据,则返回false
然后是实现了等待策略的插入函数:
template <typename T>bool BoundedQueue<T>::WaitEnqueue (T&& element) { while (!break_all_wait_) { if (Enqueue (std::move (element))) { return true ; } if (wait_strategy_->EmptyWait ()) { continue ; } break ; } return false ; }
这里实现了等待机制,如果队列未满,则立马插入返回,否则进入空等状态
实现了等待策略的取出函数:如果队列中没数据则空等
template <typename T>bool BoundedQueue<T>::WaitDequeue (T* element) { while (!break_all_wait_) { if (Dequeue (element)) { return true ; } if (wait_strategy_->EmptyWait ()) { continue ; } break ; } return false ; }
队列的入队操作就是往尾部添加元素,然后移动尾部指针。出队操作就是从头部取出一个元素,然后移动头部指针,虽然移动了头部指针,但是之前申请的内存是还在的,并不会释放。
参考链接