对c++ 并发编程实战-无锁队列的总结

引言
最近在看 C++ 并发编程实战(第2版)这本书。第七章的内容中给出了无锁栈及无锁队列的实现,一番阅读下来发现不少问题。
官方翻译版存在很多不准确的地方,后来找到了英文原版+ChatGPT翻译,好了很多
原书中存在一些错误以及疏忽的问题
一些概念晦涩难以理解
故借此机会记录一下自己的理解,也欢迎讨论及补充
单生产者单消费者(SPSC)无锁队列
现从一个简单的 SPSC 队列开始,首先我们需要一个结构体 node 作为链表的节点
template <typename T>
struct node {
std::shared_ptr<T> data;
node *next;
node() : next(nullptr) {}
};
十分简单,使用 std::shared_ptr
代替裸指针,next 成员指向下一个节点
我们使用 std::atomic<node*>
来记录队首与队尾
template <typename T>
class lock_free_queue {
private:
std::atomic<node*> head;
std::atomic<node*> tail;
public:
lock_free_queue() : head(new node), tail(head.load()) {}
}
可以看到构造队列时创建了一个占位节点,它还没有任何数据,head 指针与 tail 指针都指向这个占位节点,由此可以推断出判断队列为空的表达式大概是下面这样
head.load() == tail.load();
然后是入队函数 push()
template <typename T>
void lock_free_queue<T>::push(T new_value) {
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
node* p = new node;
node* const old_tail = tail.load();
old_tail->data.swap(new_data); // 1
old_tail->next = p; // 2
tail.store(p); // 3
}
主要有这三个步骤:
在占位节点中存下入队的元素
让当前节点的 next 指向一个新的占位节点
让 tail 指针指向新的占位节点,所以 tail 是始终指向占位节点的
再看看出队函数 pop()
template <typename T>
node* lock_free_queue<T>::pop_head() {
node *const old_head = head.load();
if (old_head == tail.load()) {
return nullptr;
}
head.store(old_head->next); // 1
return old_head; // 2
}
template <typename T>
std::shared_ptr<T> lock_free_queue<T>::pop() {
node* old_head = pop_head();
if (!old_head) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(old_head->data); // 3
delete old_head; // 4
return res; // 5
}
pop_head()
让 head 指针指向下一个节点(1),然后返回出队的节点指针(2)
pop 函数
调用 pop_head()
之后构造一个 std::shared_ptr
保存元素(3),然后释放出队的节点(4),返回元素(5)
以下是 SPSC 队列的完整实现
template <typename T>
class lock_free_queue {
private:
struct node {
std::shared_ptr<T> data;
node* next;
node(): next(nullptr) {}
};
std::atomic<node*> head;
std::atomic<node*> tail;
node* pop_head() {
node* const old_head = head.load();
if (old_head == tail.load()) { // 1
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue() : head(new node), tail(head.load()) {}
lock_free_queue(const lock_free_queue& other) = delete;
lock_free_queue& operator=(const lock_free_queue& other) = delete;
~lock_free_queue() {
while (node* const old_head = head.load()) {
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr<T> pop() {
node* old_head = pop_head();
if (!old_head) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(old_head->data); // 2
delete old_head;
return res;
}
void push(T new_value) {
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
node* p = new node; // 3
node* const old_tail = tail.load();
old_tail->data.swap(new_data); // 5
old_tail->next = p; // 6
tail.store(p); // 7
}
};
在单生产者单消费者的情况下不需要额外的同步手段,但要确保满足以下 happens-before
关系
tail 的存储(7)与 tail 的加载(1)之间是同步的
前一个节点的数据指针存储(5)在 tail 的存储之前发生
tail 的加载在数据指针的加载之前发生(2)
然而,当存在多个生产者与多个消费者时将产生数据竞争。
设想第一种情况:两个线程同时调用push
函数,它们可能会将不同的元素数据设置到同一个占位节点中(5);pop_head()
中也存在类似的问题。如果两个线程同时调用,它们都会读取相同的 head
值,然后都用相同的 next 指针覆盖旧值。两个线程现在都会认为它们获取了相同的节点——这就会导致灾难(node 节点将被重复释放)。你不仅要确保每个项只能被一个线程调用 pop()
,还需要确保其他线程可以安全地访问它们从 head 中读取的节点的下一个成员。
多生产者多消费者(MPMC)无锁队列
在改动第一个版本的 lock_free_queue
之前,我们先要知道 C++ 标准没有规定 std::shared_ptr<>
的实现必须是原子的,在某些编译器的实现中可能会在 std::shared_ptr<>
中加锁,如果你的平台支持 std::atomic_is_lock_free(&some_shared_ptr)
实现返回true,那么所有内存回收问题就都迎刃而解了。
对 std::shared_ptr<>
使用无锁原子操作的实现不仅很少见,而且能想起为其使用一致性的原子操作也很难。并发技术规范扩展提供了工具,可以来解决这个问题,扩展中在头文件 <experimental/atomic>
中提供了 std::experimental::atomic_shared_ptr<T>
。 因为其具有特殊的复制语义,可以正确的处理引用计数。也就是 std::experimental::atomic_shared_ptr<T>
能在确保原子操作的同时,正确的处理引用计数。
如果我们使用的平台不支持原子实现的 shared_ptr
,我们就需要手动管理计数,以确保在多个线程中操作同一个 node
时不会过早地释放它。
一种有效的方式是结合 std::atomic<>
实现一个 counted_node_ptr
,并且在操作 node
指针前增加计数, 操作后释放计数。
带有内外部计数的指针
一个实现了计数的指针看起来像这样
template<typename T>
class lock_free_queue {
private:
struct node;
struct counted_node_ptr {
int external_count;
node* ptr;
};
struct node_counter {
unsigned internal_count : 30;
unsigned external_counters : 2;
};
struct node {
std::atomic<T*> data;
std::atomic<node_counter> count;
counted_node_ptr next;
};
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;
};
此处有一些在原书中作者没有提到的小问题,但不影响讲解,将在文末统一探讨。
首先看 counted_node_ptr
struct counted_node_ptr {
int external_count;
node* ptr;
};
其中 external_counter
记录在某处
对 node
的引用次数,引用一次,计数加一。ptr 则是一个指向 node
的指针
一个 节点(node)
可能会在多处被引用,考虑四种情况:
队列为空时,占位节点同时被
head
处及tail
处引用队列不为空时队列的首个元素(下一个出队的元素)被
tail
处引用处于队列中间的元素被它的前一个元素的
next
指针引用push
函数中新入队的元素同时被tail
及旧的占位节点的next
引用
我们可以发现,单单一个 external_count
无法记录 node
完整引用次数,因为 head
和 tail
在不同的位置操作,我们来看看 node_counter
和 node
struct node_counter {
unsigned internal_count : 30;
unsigned external_counters : 2;
};
struct node {
std::atomic<T*> data;
std::atomic<node_counter> count;
counted_node_ptr next;
};
注意 node_counter
中的 external_counters
变量, 它记录 节点(node)
在几处地方被引用,上文举例的四种情况最多是 2 处,所以我们只需要 2 位就可以记录下来。现在我们可以计算一个 node
总共被引用多少次了:就是每一处引用的地方中 external_count
的总和 。比方说目前队列为空
有两个线程同时想 push
,那他们同时通过 tail
操作占位节点,算上 tail
本身对 node
的一次引用,此时 tail
对占位节点的引用就是 3,head
也一样,引用次数是 1,总和为 4。
再来看 node_counter
中的 internal_count
变量, 线程操作完后要释放引用,但我们不在 external_count
中自减,一是避免 ABA 问题
;二是在更新时如果另一个线程更新成功,当前线程使用 compare&exchange
操作会失败,是会把 counted_node_ptr
替换为新值的,导致我们无法再对原本引用的节点释放引用了。所以我们在 compare&exchange
之前一般会存一个 node
的裸指针 ,在 node
的内部计数 internal_count
中释放引用。
我们在 increase_external_count()
中使用比较交换增加外部计数(1),函数参数 counter
指在哪处进行自增,old_counter
是观测到的旧值。这个函数在某处引用中增加计数。
template <typename T>
class lock_free_queue {
//...
static void increase_external_count(std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter) {
counted_node_ptr new_counter;
do {
new_counter = old_counter;
++new_counter.external_count; // 1
} while (!counter.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
};
某个线程操作 push/pop
失败后,需要调用 node.release_ref()
释放引用(1),并且判断当前还有没别的线程在操作,没有则 delete
释放内存
template <typename T>
class lock_free_queue {
private:
struct node {
//...
void release_ref() {
node_counter old_counter = count.load(std::memory_order_relaxed);
node_counter new_counter;
do {
new_counter = old_counter;
--new_counter.internal_count; // 1
} while (!count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters) { // 3
delete this;
}
}
};
};
某个线程操作 push/pop
成功后,不仅需要释放引用次数(1),还要将 external_counter
减 1(2),因为 head/tail/上一个节点的next
不再引用它了,然后判断当前还有没别的线程在操作,没有则 delete
释放内存(3)
注意这里(1)减了 2,因为在某处
引用它,本身也算一次引用
template <typename T>
class lock_free_queue {
//...
static void free_external_counter(counted_node_ptr& old_node_ptr) {
node* const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count - 2; // 1
node_counter old_counter = ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do {
new_counter = old_counter;
--new_counter.external_counters; // 2
new_counter.internal_count += count_increase;
} while (!ptr->count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if (!new_counter.internal_count && !new_counter.external_counters) { // 3
delete ptr;
}
}
};
现在我们可以将 push()
改为能够处理多个线程的版本了
处理 push() 中的多个线程
void push(T new_value) {
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next; // 1
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail); // 2
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 3
old_tail.ptr->next = new_next;
old_tail = tail.exchange(new_next);
free_external_counter(old_tail); // 4
new_data.release();
break;
}
old_tail.ptr->release_ref(); // 5
}
}
此处有一些在原书中作者没有提到的问题,但不影响讲解,将在文末统一探讨。
与原来单生产者的版本有许多不同的地方,对节点操作之前,先增加引用计数(2),同一时刻只有一个线程能更新到占位节点的 data 指针(3),其他的线程只能释放引用然后重试(5)。其中 new_next
代表新的占位节点,成功更新到数据(3)后,将旧的占位节点的 next
更新为 new_next
,最后释放引用(4)
处理 pop() 中的多个线程
std::unique_ptr<T> pop() {
counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head); // 1
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) { // 2
ptr->release_ref(); // 3
return std::unique_ptr<T>();
}
if (head.compare_exchange_strong(old_head, ptr->next)) { // 4
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head); // 5
return std::unique_ptr<T>(res);
}
ptr->release_ref(); //6
}
}
这里(3)在原书中漏写了
pop
函数就比较简单,首先增加对 head
的引用计数(1),然后判断队列是否为空(2),队列为空时释放引用计数(3),否则尝试将 head
更新为当前 head
指向的下一个节点(4),成功则调用 free_external_counter
释放引用(5)然后返回,更新失败的线程释放引用(6)后重试。
到这里,lock_free_queue
的实现就基本完成了,但是还有优化的空间,我们再回头看一下 push
函数
void push(T new_value) {
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 1
old_tail.ptr->next = new_next; // 2
old_tail = tail.exchange(new_next); // 3
free_external_counter(old_tail);
new_data.release();
break;
}
old_tail.ptr->release_ref();
}
}
某个线程(假设它是线程A)成功将占位节点的 data
更新后,设置占位节点的 next
成员(2),更新 tail
为新的占位节点(3)。在此期间,其它未能更新 data
的线程在不断重试,而对(2)和(3)的更新其实不是非要线程 A 来完成的。我们下一步要做的,就是在某个线程成功更新 data
后,其余的线程能协助它完成更新。
带有协助机制的 push/pop
为支持这种机制,需要对 pop()
进行小幅修改,以便在读取 next
指针时保持线程安全。以下是修改后的代码。
template<typename T>
class lock_free_queue {
private:
struct node {
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next; // 1
};
public:
std::unique_ptr<T> pop() {
counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
return std::unique_ptr<T>();
}
counted_node_ptr next = ptr->next.load(); // 2
if (head.compare_exchange_strong(old_head, next)) {
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};
next
指针变为原子类型:将节点的next
指针声明为std::atomic<counted_node_ptr>
类型(1),确保在多线程环境下对其访问是线程安全的。原子加载
next
指针:在pop()
的实现中,通过原子操作加载next
指针(2),确保读取的值是一致的。
通过以上修改,等待线程可以主动帮助被阻塞的线程完成 push()
操作,从而减少等待线程的忙等待时间,避免性能浪费,使队列实现真正的无锁特性。
template<typename T>
class lock_free_queue {
private:
void set_new_tail(counted_node_ptr &old_tail, counted_node_ptr const &new_tail) { // 1
node* const current_tail_ptr = old_tail.ptr;
while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr); // 2
if (old_tail.ptr == current_tail_ptr) // 3
free_external_counter(old_tail); // 4
else
current_tail_ptr->release_ref(); // 5
}
public:
void push(T new_value) {
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 6
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { // 7
delete new_next.ptr; // 8
new_next = old_next; // 9
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
} else { // 10
counted_node_ptr old_next = {0};
if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { // 11
old_next = new_next; // 12
new_next.ptr = new node; // 13
}
set_new_tail(old_tail, old_next); // 14
}
}
}
};
与原始版本的 push()
相比,这一版本有几个关键的不同之处:
设置数据指针(6):
在设置节点的数据指针后,需要考虑其他线程已经协助完成的情况。
如果当前线程成功设置了数据指针,则进入下一步处理;否则进入
else
子句协助其他线程完成操作。
更新
next
指针(7):使用
compare_exchange_strong()
更新节点的next
指针。如果更新失败,说明另一个线程已经设置了
next
指针,因此释放新分配的节点(8),并使用其他线程设置的next
值(9)来更新tail
。
更新
tail
指针:set_new_tail()
方法提取了tail
指针的更新逻辑。
set_new_tail()
的逻辑:
compare_exchange_weak()
循环(2):使用弱的 CAS 循环尝试更新
tail
指针。如果有其他线程修改了
tail
的external_count
部分,操作将失败,但循环会继续重试。
确保安全性(3):
如果循环退出时
ptr
部分未变,说明当前线程成功更新了tail
,需要释放旧的外部引用计数(4)如果
ptr
部分已变,说明其他线程已释放计数,此时只需释放当前线程持有的单个引用(5)。
这一版本的 push()
实现引入了协助机制,使得即使主线程阻塞,其他线程也可以协助完成节点更新操作,从而保证无锁队列的高效性和无锁特性。
以上。我们已经实现了一个支持多生产者多消费者
的队列,文中没有对内存序
进行讨论, 各位在对 lock_free_queue
进行优化时请确保操作始终满足 Happens-Before
原则。
在阅读完本章内容后,我尝试实现书中 lock_free_queue
类,可以说是不太顺利。其中有些是可能是笔误,还有一些可能和平台或者编译器相关,另外还有一些作者没提到的大大小小的问题。不过最终都解决了,排查的过程中也有不少收获 :) 因此我在这里补充说明
对原书中的一些遗漏的补充及错误修正
counted_node_ptr
的内存对齐问题
原书中 counted_node_ptr
的实现是这样
struct counted_node_ptr {
int external_count;
node* ptr;
};
然而,在对 std::atomic<counted_node_ptr>
调用比较交换时总是会失败,最终发现在我的平台(macOS 10.15.x; i5处理器; gcc 编译器)指针占 8 字节,int
4 字节,比较交换只能进行 16 字节的操作,每次都额外操作了不属于 counted_node_ptr
的 4 个字节导致失败,通过内存对齐解决
struct counted_node_ptr {
int external_count;
node* ptr;
uint8_t padding[sizeof(void *) - sizeof(int)];
};
别忘了在构造时初始化它counted_node_ptr new_cnp{0, nullptr, {0}}
pop
函数的遗漏
这个前文有提到,pop 的最终版本中,判断队列为空后,返回空指针之前,没有释放引用(1)
std::unique_ptr<T> pop() {
counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
ptr->release_ref(); // 1
return std::unique_ptr<T>();
}
counted_node_ptr next = ptr->next.load();
if (head.compare_exchange_strong(old_head, next)) {
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
潜在的 ABA
问题
这个是书中最大的错误了!我对 lock_free_queue
进行测试时,发现在多生产者单消费者(或多消费者)并发操作时,偶有出现元素丢失(复现率挺高的,尤其是队列由空-非空时出现几率大一些)。
书中 push
和 pop
函数的实现
void push(T new_value) {
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail.load();
for (;;) {
increase_external_count(tail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 1
counted_node_ptr old_next = {0, nullptr, {0}};
if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { // 2
delete new_next.ptr;
new_next = old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
} else {
counted_node_ptr old_next = {0, nullptr, {0}};
if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
old_next = new_next;
new_next.ptr = new node;
}
set_new_tail(old_tail, old_next);
}
}
}
std::unique_ptr<T> pop() {
counted_node_ptr old_head = head.load(std::memory_order_relaxed);
for (;;) {
increase_external_count(head, old_head);
node* const ptr = old_head.ptr;
if (ptr == tail.load().ptr) {
return std::unique_ptr<T>();
}
counted_node_ptr next = ptr->next.load();
if (head.compare_exchange_strong(old_head, next)) {
T* const res = ptr->data.exchange(nullptr); // 3
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
考虑一种情况:当前队列为空,head
和 tail
都指向占位节点,线程 A, B
执行 push
操作,线程 C
执行 pop
操作。
线程
A
执行到(1)(还未执行)线程
Bpush
成功,此时tail
已经指向新的占位节点,A
还卡着线程
C
尝试pop
,将head
更新后,且将出队node
中的数据用nullptr
换出来了(3)线程
A
终于恢复执行,old_tail
还指着已经出队的node
,并且pop
线程已经将出队的node
中data
换为nullptr
了,所以(1)还是成功了,A 以为自己成功更新占位节点,实际上早就出队的node
都准备回收了,所以悲剧就这么发生了。
看起来问题很明显,对吧?这个问题苦苦折磨了我两周有余,百思不得其解,好在最后找到问题了。
对node
构造函数的补充
作者在对 node
修改后,忘记了补充它的构造函数实现。
node(): data({nullptr, false, {0}}),
count({0, 2}),
next({0, nullptr, {0}}) {}
其中 data
和 next
初始化为空;count
的内部引用计数为 0, 因为还没有任何执行释放引用的操作执行,并且持有者数量为2(被尾节点引用,被前一个节点引用);
对 lock_free_queue 析构函数的补充
析构函数当然要释放所有节点。
~lock_free_queue() {
while(pop());
}
以上就是所有内容,感谢你看到这里,有任何问题欢迎讨论指正!
Subscribe to my newsletter
Read articles from RuiXin Lin directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
