1. 源码

cyber/base/atomic_rw_lock.h中实现了一个读写锁,代码如下:

class AtomicRWLock {
friend class ReadLockGuard<AtomicRWLock>;
friend class WriteLockGuard<AtomicRWLock>;

public:
static const int32_t RW_LOCK_FREE = 0;
static const int32_t WRITE_EXCLUSIVE = -1;
static const uint32_t MAX_RETRY_TIMES = 5;
AtomicRWLock() {}
explicit AtomicRWLock(bool write_first) : write_first_(write_first) {}

private:
// all these function only can used by ReadLockGuard/WriteLockGuard;
void ReadLock();
void WriteLock();

void ReadUnlock();
void WriteUnlock();

AtomicRWLock(const AtomicRWLock&) = delete;
AtomicRWLock& operator=(const AtomicRWLock&) = delete;
std::atomic<uint32_t> write_lock_wait_num_ = {0};
std::atomic<int32_t> lock_num_ = {0};
bool write_first_ = true;
};

inline void AtomicRWLock::ReadLock() {
uint32_t retry_times = 0;
int32_t lock_num = lock_num_.load();
if (write_first_) {
do {
while (lock_num < RW_LOCK_FREE || write_lock_wait_num_.load() > 0) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
} else {
do {
while (lock_num < RW_LOCK_FREE) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
}
}

inline void AtomicRWLock::WriteLock() {
int32_t rw_lock_free = RW_LOCK_FREE;
uint32_t retry_times = 0;
write_lock_wait_num_.fetch_add(1);
while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// rw_lock_free will change after CAS fail, so init agin
rw_lock_free = RW_LOCK_FREE;
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
}
write_lock_wait_num_.fetch_sub(1);
}

inline void AtomicRWLock::ReadUnlock() { lock_num_.fetch_sub(1); }

inline void AtomicRWLock::WriteUnlock() { lock_num_.fetch_add(1); }

2.原子读写锁的实现

读写锁的特性:

  • 当读写锁被加了写锁时,其他线程对该锁加读锁或者写锁都会阻塞(不是失败)。
  • 当读写锁被加了读锁时,其他线程对该锁加写锁会阻塞,加读锁会成功。

简单说就是运行多个线程同时读,但是同时只能允许一个线程去写,非常适合读多少写的场景

首先AtomicRWLock 中声明了两个友元类:

friend class ReadLockGuard<AtomicRWLock>;
friend class WriteLockGuard<AtomicRWLock>;

这两个友元类定义在cyber/base/rw_lock_guard.h

template <typename RWLock>
class ReadLockGuard {
public:
explicit ReadLockGuard(RWLock& lock) : rw_lock_(lock) { rw_lock_.ReadLock(); }

~ReadLockGuard() { rw_lock_.ReadUnlock(); }

private:
ReadLockGuard(const ReadLockGuard& other) = delete;
ReadLockGuard& operator=(const ReadLockGuard& other) = delete;
RWLock& rw_lock_;
};

template <typename RWLock>
class WriteLockGuard {
public:
explicit WriteLockGuard(RWLock& lock) : rw_lock_(lock) {
rw_lock_.WriteLock();
}

~WriteLockGuard() { rw_lock_.WriteUnlock(); }

private:
WriteLockGuard(const WriteLockGuard& other) = delete;
WriteLockGuard& operator=(const WriteLockGuard& other) = delete;
RWLock& rw_lock_;
};

可以看见这两个类在创建的时候构造函数会去调用读写锁的读或者写操作去持有锁,在析构的时候会去解锁,这就是利用c++的RAII机制实现对加锁和开锁的封装,这样就不用用户手动去加锁和释放锁了,和c++提供的std::lock_guard用法类似

读写锁的操作是靠几个变量来控制的:

std::atomic<uint32_t> write_lock_wait_num_ = {0};
std::atomic<int32_t> lock_num_ = {0};
bool write_first_ = true;
static const int32_t RW_LOCK_FREE = 0;
static const int32_t WRITE_EXCLUSIVE = -1;
static const uint32_t MAX_RETRY_TIMES = 5;
  • write_lock_wait_num_ 代表了有多少个线程等着写就是多少个线程等着持有读锁
  • lock_num_代表持有锁的线程的数量,由于读锁是允许多个线程同时读的,所以
  • RW_LOCK_FREE:标志位,代表此时没有线程持有锁,可读可写
  • WRITE_EXCLUSIVE:标志位,代表此时锁被一个写的线程暂用
  • MAX_RETRY_TIMES:尝试获取锁的时候连续尝试次数,就像自旋锁那样,连续失败MAX_RETRY_TIMES次则会让出线程的执行权
  • write_first_:默认写的操作优先

我们先来看读写锁的写锁:

inline void AtomicRWLock::WriteLock() {
int32_t rw_lock_free = RW_LOCK_FREE;
uint32_t retry_times = 0;
write_lock_wait_num_.fetch_add(1);
while (!lock_num_.compare_exchange_weak(rw_lock_free, WRITE_EXCLUSIVE,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
// rw_lock_free will change after CAS fail, so init agin
rw_lock_free = RW_LOCK_FREE;
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
}
write_lock_wait_num_.fetch_sub(1);
}
  • write_lock_wait_num_记录了想要持有读锁的线程的数量,因此每次WriteLock()操作需要+1
  • 接着循环会去比较lock_num_rw_lock_free的值,会出现以下两种情况:
    • 1.:lock_num_ = rw_lock_free== 0,代表此时没人用锁,则将lock_num_的值置为WRITE_EXCLUSIVE=-1,直接跳出循环,此时假设另外一个线程也来写,而lock_num_=-1,所以必须等待前一个持有写锁的人解锁才行,所以解锁操作也显而易见了,就是将lock_num_+1恢复到RW_LOCK_FREE
    • 2:lock_num_>=1代表其他线程持有了读锁,或lock_num_>=WRITE_EXCLUSIVE这种情况就是上面提到的另外一个线程持有了读锁,但是还没解锁,那么此时该线程就会去执行while循环里的操作,在while循环里会重置rw_lock_free的值,然后循环判断几次如果还在等待就执行yield()操作让当前线程释放cpu所有权。

再来看读写锁的读锁:

inline void AtomicRWLock::ReadLock() {
uint32_t retry_times = 0;
int32_t lock_num = lock_num_.load();
if (write_first_) {
do {
while (lock_num < RW_LOCK_FREE || write_lock_wait_num_.load() > 0) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
} else {
do {
while (lock_num < RW_LOCK_FREE) {
if (++retry_times == MAX_RETRY_TIMES) {
// saving cpu
std::this_thread::yield();
retry_times = 0;
}
lock_num = lock_num_.load();
}
} while (!lock_num_.compare_exchange_weak(lock_num, lock_num + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed));
}
}

由于默认写优先就是说我在写的时候是不允许读的,先进入一个分支执行do操作:然后去进行判断是否有线程在执行写操作,即lock_num < RW_LOCK_FREE || write_lock_wait_num_.load() > 0,则想要读的这个线程就让出cpu等着写的线程写完先,如果写完了重新加载一下lock_num的值,然后去执行lock_num_.compare_exchange_weak这个操作,这里有两种情况:

  • 1.lock_num_==lock_num(即大于等于0),可能你会说这不是肯定的吗?这个还真不一定,虽然刚执行过lock_num = lock_num_.load();但是lock_num是个多线程控制的值,随时在变。然后将lock_num_的值加一
  • 2.lock_num_!=lock_num lock_num赋值为lock_num_(无用) 重新循环 简单点来讲就是:刚刚的判断好好的,正当我要办事的时候,突然有线程偷偷改变了值,一切判断作废,重新来过

最后是解锁操作:

inline void AtomicRWLock::ReadUnlock() { lock_num_.fetch_sub(1); }

inline void AtomicRWLock::WriteUnlock() { lock_num_.fetch_add(1); }

读锁解锁就是将大于0的lock_num_做减1操作,写锁的解锁就是将-1转到0

参考链接