对c++ 并发编程实战-无锁队列的总结

RuiXin LinRuiXin Lin
8 min read

引言

最近在看 C++ 并发编程实战(第2版)这本书。第七章的内容中给出了无锁栈及无锁队列的实现,一番阅读下来发现不少问题。

  1. 官方翻译版存在很多不准确的地方,后来找到了英文原版+ChatGPT翻译,好了很多

  2. 原书中存在一些错误以及疏忽的问题

  3. 一些概念晦涩难以理解

故借此机会记录一下自己的理解,也欢迎讨论及补充

单生产者单消费者(SPSC)无锁队列

现从一个简单的 SPSC 队列开始,首先我们需要一个结构体 node 作为链表的节点

template <typename T>
struct node {
    std::shared_ptr<T> data;
    node *next;
    node() : next(nullptr) {}
};

十分简单,使用 std::shared_ptr 代替裸指针,next 成员指向下一个节点

我们使用 std::atomic<node*> 来记录队首与队尾

template <typename T>
class lock_free_queue {
private:
    std::atomic<node*> head;
    std::atomic<node*> tail;

public:
    lock_free_queue() : head(new node), tail(head.load()) {}

}

可以看到构造队列时创建了一个占位节点,它还没有任何数据,head 指针与 tail 指针都指向这个占位节点,由此可以推断出判断队列为空的表达式大概是下面这样

head.load() == tail.load();

然后是入队函数 push()

template <typename T>
void lock_free_queue<T>::push(T new_value) {
    std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
    node* p = new node;
    node* const old_tail = tail.load();
    old_tail->data.swap(new_data); // 1
    old_tail->next = p;            // 2
    tail.store(p);                 // 3
}

主要有这三个步骤:

  1. 在占位节点中存下入队的元素

  2. 让当前节点的 next 指向一个新的占位节点

  3. 让 tail 指针指向新的占位节点,所以 tail 是始终指向占位节点的

再看看出队函数 pop()

template <typename T> 
node* lock_free_queue<T>::pop_head() {
    node *const old_head = head.load();
    if (old_head == tail.load()) {
        return nullptr;
    }
    head.store(old_head->next); // 1
    return old_head;            // 2
}

template <typename T> 
std::shared_ptr<T> lock_free_queue<T>::pop() {
    node* old_head = pop_head();
    if (!old_head) {
        return std::shared_ptr<T>();
    }
    std::shared_ptr<T> const res(old_head->data); // 3
    delete old_head;                              // 4
    return res;                                   // 5
}

pop_head() 让 head 指针指向下一个节点(1),然后返回出队的节点指针(2)

pop 函数调用 pop_head() 之后构造一个 std::shared_ptr 保存元素(3),然后释放出队的节点(4),返回元素(5)

以下是 SPSC 队列的完整实现

template <typename T>
class lock_free_queue {
private:
    struct node {
        std::shared_ptr<T> data;
        node* next;
        node(): next(nullptr) {}
    };

    std::atomic<node*> head;
    std::atomic<node*> tail;

    node* pop_head() {
        node* const old_head = head.load();
        if (old_head == tail.load()) { // 1
            return nullptr;
        }
        head.store(old_head->next);
        return old_head;
    }

public:
    lock_free_queue() : head(new node), tail(head.load()) {}

    lock_free_queue(const lock_free_queue& other) = delete;
    lock_free_queue& operator=(const lock_free_queue& other) = delete;

    ~lock_free_queue() {
        while (node* const old_head = head.load()) {
            head.store(old_head->next);
            delete old_head;
        }
    }

    std::shared_ptr<T> pop() {
        node* old_head = pop_head();
        if (!old_head) {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> const res(old_head->data); // 2
        delete old_head;
        return res;
    }

    void push(T new_value) {
        std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
        node* p = new node;            // 3
        node* const old_tail = tail.load();
        old_tail->data.swap(new_data); // 5
        old_tail->next = p;            // 6
        tail.store(p);                 // 7
    }

};

在单生产者单消费者的情况下不需要额外的同步手段,但要确保满足以下 happens-before 关系

  1. tail 的存储(7)与 tail 的加载(1)之间是同步的

  2. 前一个节点的数据指针存储(5)在 tail 的存储之前发生

  3. tail 的加载在数据指针的加载之前发生(2)

然而,当存在多个生产者与多个消费者时将产生数据竞争。

设想第一种情况:两个线程同时调用push函数,它们可能会将不同的元素数据设置到同一个占位节点中(5);pop_head()中也存在类似的问题。如果两个线程同时调用,它们都会读取相同的 head 值,然后都用相同的 next 指针覆盖旧值。两个线程现在都会认为它们获取了相同的节点——这就会导致灾难(node 节点将被重复释放)。你不仅要确保每个项只能被一个线程调用 pop() ,还需要确保其他线程可以安全地访问它们从 head 中读取的节点的下一个成员。

多生产者多消费者(MPMC)无锁队列

在改动第一个版本的 lock_free_queue 之前,我们先要知道 C++ 标准没有规定 std::shared_ptr<> 的实现必须是原子的,在某些编译器的实现中可能会在 std::shared_ptr<> 中加锁,如果你的平台支持 std::atomic_is_lock_free(&some_shared_ptr) 实现返回true,那么所有内存回收问题就都迎刃而解了。

std::shared_ptr<> 使用无锁原子操作的实现不仅很少见,而且能想起为其使用一致性的原子操作也很难。并发技术规范扩展提供了工具,可以来解决这个问题,扩展中在头文件 <experimental/atomic> 中提供了 std::experimental::atomic_shared_ptr<T> 。 因为其具有特殊的复制语义,可以正确的处理引用计数。也就是 std::experimental::atomic_shared_ptr<T> 能在确保原子操作的同时,正确的处理引用计数。

如果我们使用的平台不支持原子实现的 shared_ptr ,我们就需要手动管理计数,以确保在多个线程中操作同一个 node 时不会过早地释放它。

一种有效的方式是结合 std::atomic<> 实现一个 counted_node_ptr,并且在操作 node 指针前增加计数, 操作后释放计数。

带有内外部计数的指针

一个实现了计数的指针看起来像这样

template<typename T>
class lock_free_queue {
private:

    struct node;

    struct counted_node_ptr {
        int   external_count;
        node* ptr;
    };

    struct node_counter {
        unsigned internal_count    : 30;
        unsigned external_counters : 2;
    };

    struct node {
        std::atomic<T*> data;
        std::atomic<node_counter> count;
        counted_node_ptr next;
    };

    std::atomic<counted_node_ptr> head;
    std::atomic<counted_node_ptr> tail;

};

此处有一些在原书中作者没有提到的小问题,但不影响讲解,将在文末统一探讨。

首先看 counted_node_ptr

struct counted_node_ptr {
    int   external_count;
    node* ptr;
};

其中 external_counter 记录在某处node 的引用次数,引用一次,计数加一。ptr 则是一个指向 node 的指针

一个 节点(node) 可能会在多处被引用,考虑四种情况:

  1. 队列为空时,占位节点同时被 head 处及 tail 处引用

  2. 队列不为空时队列的首个元素(下一个出队的元素)被 tail 处引用

  3. 处于队列中间的元素被它的前一个元素的 next 指针引用

  4. push 函数中新入队的元素同时被 tail 及旧的占位节点的 next 引用

我们可以发现,单单一个 external_count 无法记录 node 完整引用次数,因为 headtail 在不同的位置操作,我们来看看 node_counternode

struct node_counter {
    unsigned internal_count    : 30;
    unsigned external_counters : 2;
};

struct node {
    std::atomic<T*> data;
    std::atomic<node_counter> count;
    counted_node_ptr next;
};

注意 node_counter 中的 external_counters 变量, 它记录 节点(node) 在几处地方被引用,上文举例的四种情况最多是 2 处,所以我们只需要 2 位就可以记录下来。现在我们可以计算一个 node 总共被引用多少次了:就是每一处引用的地方中 external_count 的总和 。比方说目前队列为空

有两个线程同时想 push ,那他们同时通过 tail 操作占位节点,算上 tail 本身对 node 的一次引用,此时 tail 对占位节点的引用就是 3,head 也一样,引用次数是 1,总和为 4。

再来看 node_counter 中的 internal_count 变量, 线程操作完后要释放引用,但我们不在 external_count 中自减,一是避免 ABA 问题;二是在更新时如果另一个线程更新成功,当前线程使用 compare&exchange 操作会失败,是会把 counted_node_ptr 替换为新值的,导致我们无法再对原本引用的节点释放引用了。所以我们在 compare&exchange 之前一般会存一个 node 的裸指针 ,在 node 的内部计数 internal_count 中释放引用。

我们在 increase_external_count()中使用比较交换增加外部计数(1),函数参数 counter 指在哪处进行自增,old_counter 是观测到的旧值。这个函数在某处引用中增加计数。

template <typename T>
class lock_free_queue {
    //...
    static void increase_external_count(std::atomic<counted_node_ptr>& counter,
                                        counted_node_ptr& old_counter) {
        counted_node_ptr new_counter;
        do {
            new_counter = old_counter;
            ++new_counter.external_count; // 1
        } while (!counter.compare_exchange_strong(old_counter, new_counter,
                                                  std::memory_order_acquire,
                                                  std::memory_order_relaxed));
        old_counter.external_count = new_counter.external_count;
    }
};

某个线程操作 push/pop 失败后,需要调用 node.release_ref() 释放引用(1),并且判断当前还有没别的线程在操作,没有则 delete 释放内存

template <typename T>
class lock_free_queue {
private:
    struct node {
        //...
        void release_ref() {
            node_counter old_counter = count.load(std::memory_order_relaxed);
            node_counter new_counter;
            do {
                new_counter = old_counter;
                --new_counter.internal_count; // 1
            } while (!count.compare_exchange_strong(old_counter, new_counter,
                                                    std::memory_order_acquire, 
                                                    std::memory_order_relaxed)); 
            if (!new_counter.internal_count && !new_counter.external_counters) { // 3
                delete this;
            }
        }
    }; 
};

某个线程操作 push/pop 成功后,不仅需要释放引用次数(1),还要将 external_counter 减 1(2),因为 head/tail/上一个节点的next 不再引用它了,然后判断当前还有没别的线程在操作,没有则 delete 释放内存(3)

注意这里(1)减了 2,因为在某处引用它,本身也算一次引用


template <typename T>
class lock_free_queue {
    //...
    static void free_external_counter(counted_node_ptr& old_node_ptr) {
        node* const ptr = old_node_ptr.ptr;
        int const count_increase = old_node_ptr.external_count - 2; // 1
        node_counter old_counter = ptr->count.load(std::memory_order_relaxed);
        node_counter new_counter;
        do {
            new_counter = old_counter;
            --new_counter.external_counters; // 2
            new_counter.internal_count += count_increase;
        } while (!ptr->count.compare_exchange_strong(old_counter, new_counter,
                                                        std::memory_order_acquire,
                                                        std::memory_order_relaxed));
        if (!new_counter.internal_count && !new_counter.external_counters) { // 3
            delete ptr;
        }
    }
};

现在我们可以将 push() 改为能够处理多个线程的版本了

处理 push() 中的多个线程

void push(T new_value) {
    std::unique_ptr<T> new_data(new T(new_value));
    counted_node_ptr new_next;   // 1
    new_next.ptr = new node;
    new_next.external_count = 1;
    counted_node_ptr old_tail = tail.load();
    for (;;) {
        increase_external_count(tail, old_tail); // 2
        T* old_data = nullptr; 
        if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 3
            old_tail.ptr->next = new_next;
            old_tail = tail.exchange(new_next);
            free_external_counter(old_tail); // 4
            new_data.release();
            break;
        }
        old_tail.ptr->release_ref(); // 5
    }
}

此处有一些在原书中作者没有提到的问题,但不影响讲解,将在文末统一探讨。

与原来单生产者的版本有许多不同的地方,对节点操作之前,先增加引用计数(2),同一时刻只有一个线程能更新到占位节点的 data 指针(3),其他的线程只能释放引用然后重试(5)。其中 new_next 代表新的占位节点,成功更新到数据(3)后,将旧的占位节点的 next 更新为 new_next,最后释放引用(4)

处理 pop() 中的多个线程

std::unique_ptr<T> pop() {
    counted_node_ptr old_head = head.load(std::memory_order_relaxed); 
    for (;;) {
        increase_external_count(head, old_head); // 1
        node* const ptr = old_head.ptr;
        if (ptr == tail.load().ptr) { // 2
            ptr->release_ref(); // 3
            return std::unique_ptr<T>();
        }
        if (head.compare_exchange_strong(old_head, ptr->next)) { // 4
            T* const res = ptr->data.exchange(nullptr);
            free_external_counter(old_head); // 5
            return std::unique_ptr<T>(res);
        }
        ptr->release_ref(); //6
    }
}

这里(3)在原书中漏写了

pop 函数就比较简单,首先增加对 head 的引用计数(1),然后判断队列是否为空(2),队列为空时释放引用计数(3),否则尝试将 head 更新为当前 head 指向的下一个节点(4),成功则调用 free_external_counter 释放引用(5)然后返回,更新失败的线程释放引用(6)后重试。

到这里,lock_free_queue 的实现就基本完成了,但是还有优化的空间,我们再回头看一下 push 函数

void push(T new_value) {
    std::unique_ptr<T> new_data(new T(new_value));
    counted_node_ptr new_next; 
    new_next.ptr = new node;
    new_next.external_count = 1;
    counted_node_ptr old_tail = tail.load();
    for (;;) {
        increase_external_count(tail, old_tail); 
        T* old_data = nullptr; 
        if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 1
            old_tail.ptr->next = new_next;      // 2 
            old_tail = tail.exchange(new_next); // 3
            free_external_counter(old_tail); 
            new_data.release();
            break;
        }
        old_tail.ptr->release_ref();
    }
}

某个线程(假设它是线程A)成功将占位节点的 data 更新后,设置占位节点的 next 成员(2),更新 tail 为新的占位节点(3)。在此期间,其它未能更新 data 的线程在不断重试,而对(2)和(3)的更新其实不是非要线程 A 来完成的。我们下一步要做的,就是在某个线程成功更新 data 后,其余的线程能协助它完成更新。

带有协助机制的 push/pop

为支持这种机制,需要对 pop() 进行小幅修改,以便在读取 next 指针时保持线程安全。以下是修改后的代码。

template<typename T>
class lock_free_queue {
private:
    struct node {
        std::atomic<T*>               data;
        std::atomic<node_counter>     count;
        std::atomic<counted_node_ptr> next; // 1
    };
public:
    std::unique_ptr<T> pop() {
        counted_node_ptr old_head = head.load(std::memory_order_relaxed);
        for (;;) {
            increase_external_count(head, old_head);
            node* const ptr = old_head.ptr;
            if (ptr == tail.load().ptr) {
                return std::unique_ptr<T>();
            }
            counted_node_ptr next = ptr->next.load(); // 2
            if (head.compare_exchange_strong(old_head, next)) {
                T* const res = ptr->data.exchange(nullptr);
                free_external_counter(old_head);
                return std::unique_ptr<T>(res);
            }
            ptr->release_ref();
        }
    }
};
  1. next 指针变为原子类型:将节点的 next 指针声明为 std::atomic<counted_node_ptr> 类型(1),确保在多线程环境下对其访问是线程安全的。

  2. 原子加载 next 指针:在 pop() 的实现中,通过原子操作加载 next 指针(2),确保读取的值是一致的。

通过以上修改,等待线程可以主动帮助被阻塞的线程完成 push() 操作,从而减少等待线程的忙等待时间,避免性能浪费,使队列实现真正的无锁特性。

template<typename T>
class lock_free_queue {
private:
    void set_new_tail(counted_node_ptr &old_tail, counted_node_ptr const &new_tail) { // 1
        node* const current_tail_ptr = old_tail.ptr;
        while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr); // 2
        if (old_tail.ptr == current_tail_ptr) // 3
            free_external_counter(old_tail); // 4
        else
            current_tail_ptr->release_ref(); // 5
    }

public:
    void push(T new_value) {
        std::unique_ptr<T> new_data(new T(new_value));
        counted_node_ptr new_next;
        new_next.ptr = new node;
        new_next.external_count = 1;
        counted_node_ptr old_tail = tail.load();
        for (;;) {
            increase_external_count(tail, old_tail);
            T* old_data = nullptr;
            if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 6
                counted_node_ptr old_next = {0};
                if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { // 7
                    delete new_next.ptr; // 8
                    new_next = old_next; // 9
                }
                set_new_tail(old_tail, new_next);
                new_data.release();
                break;
            } else { // 10
                counted_node_ptr old_next = {0};
                if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { // 11
                    old_next = new_next; // 12
                    new_next.ptr = new node; // 13
                }
                set_new_tail(old_tail, old_next); // 14
            }
        }
    }

};

与原始版本的 push() 相比,这一版本有几个关键的不同之处:

  1. 设置数据指针(6):

    1. 在设置节点的数据指针后,需要考虑其他线程已经协助完成的情况。

    2. 如果当前线程成功设置了数据指针,则进入下一步处理;否则进入 else 子句协助其他线程完成操作。

  2. 更新 next 指针(7):

    1. 使用 compare_exchange_strong() 更新节点的 next 指针。

    2. 如果更新失败,说明另一个线程已经设置了 next 指针,因此释放新分配的节点(8),并使用其他线程设置的 next 值(9)来更新 tail

  3. 更新 tail 指针:

    1. set_new_tail() 方法提取了 tail 指针的更新逻辑。

set_new_tail() 的逻辑:

  1. compare_exchange_weak() 循环(2):

    1. 使用弱的 CAS 循环尝试更新 tail 指针。

    2. 如果有其他线程修改了 tailexternal_count 部分,操作将失败,但循环会继续重试。

  2. 确保安全性(3):

    1. 如果循环退出时 ptr 部分未变,说明当前线程成功更新了 tail,需要释放旧的外部引用计数(4)

    2. 如果 ptr 部分已变,说明其他线程已释放计数,此时只需释放当前线程持有的单个引用(5)。

这一版本的 push() 实现引入了协助机制,使得即使主线程阻塞,其他线程也可以协助完成节点更新操作,从而保证无锁队列的高效性和无锁特性。


以上。我们已经实现了一个支持多生产者多消费者 的队列,文中没有对内存序 进行讨论, 各位在对 lock_free_queue 进行优化时请确保操作始终满足 Happens-Before 原则。

在阅读完本章内容后,我尝试实现书中 lock_free_queue 类,可以说是不太顺利。其中有些是可能是笔误,还有一些可能和平台或者编译器相关,另外还有一些作者没提到的大大小小的问题。不过最终都解决了,排查的过程中也有不少收获 :) 因此我在这里补充说明

对原书中的一些遗漏的补充及错误修正

counted_node_ptr 的内存对齐问题

原书中 counted_node_ptr 的实现是这样

struct counted_node_ptr {
    int     external_count;
    node*   ptr;
};

然而,在对 std::atomic<counted_node_ptr> 调用比较交换时总是会失败,最终发现在我的平台(macOS 10.15.x; i5处理器; gcc 编译器)指针占 8 字节,int 4 字节,比较交换只能进行 16 字节的操作,每次都额外操作了不属于 counted_node_ptr 的 4 个字节导致失败,通过内存对齐解决

struct counted_node_ptr {
    int     external_count;
    node*   ptr;
    uint8_t padding[sizeof(void *) - sizeof(int)];
};

别忘了在构造时初始化它counted_node_ptr new_cnp{0, nullptr, {0}}

pop 函数的遗漏

这个前文有提到,pop 的最终版本中,判断队列为空后,返回空指针之前,没有释放引用(1)

std::unique_ptr<T> pop() {
    counted_node_ptr old_head = head.load(std::memory_order_relaxed);
    for (;;) {
        increase_external_count(head, old_head);
        node* const ptr = old_head.ptr;
        if (ptr == tail.load().ptr) {
            ptr->release_ref(); // 1
            return std::unique_ptr<T>();
        }
        counted_node_ptr next = ptr->next.load(); 
        if (head.compare_exchange_strong(old_head, next)) {
            T* const res = ptr->data.exchange(nullptr);
            free_external_counter(old_head);
            return std::unique_ptr<T>(res);
        }
        ptr->release_ref();
    }
}

潜在的 ABA 问题

这个是书中最大的错误了!我对 lock_free_queue 进行测试时,发现在多生产者单消费者(或多消费者)并发操作时,偶有出现元素丢失(复现率挺高的,尤其是队列由空-非空时出现几率大一些)。

书中 pushpop 函数的实现

void push(T new_value) {
    std::unique_ptr<T> new_data(new T(new_value));
    counted_node_ptr new_next;
    new_next.ptr = new node;
    new_next.external_count = 1;
    counted_node_ptr old_tail = tail.load();
    for (;;) {
        increase_external_count(tail, old_tail);
        T* old_data = nullptr;
        if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) { // 1
            counted_node_ptr old_next = {0, nullptr, {0}};
            if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) { // 2
                delete new_next.ptr; 
                new_next = old_next;
            }
            set_new_tail(old_tail, new_next);
            new_data.release();
            break;
        } else {
            counted_node_ptr old_next = {0, nullptr, {0}};
            if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
                old_next = new_next; 
                new_next.ptr = new node; 
            }
            set_new_tail(old_tail, old_next); 
        }
    }
}

std::unique_ptr<T> pop() {
    counted_node_ptr old_head = head.load(std::memory_order_relaxed);
    for (;;) {
        increase_external_count(head, old_head);
        node* const ptr = old_head.ptr;
        if (ptr == tail.load().ptr) {
            return std::unique_ptr<T>();
        }
        counted_node_ptr next = ptr->next.load(); 
        if (head.compare_exchange_strong(old_head, next)) {
            T* const res = ptr->data.exchange(nullptr); // 3
            free_external_counter(old_head);
            return std::unique_ptr<T>(res);
        }
        ptr->release_ref();
    }
}

考虑一种情况:当前队列为空,headtail 都指向占位节点,线程 A, B 执行 push 操作,线程 C 执行 pop 操作。

  1. 线程 A 执行到(1)(还未执行)

  2. 线程 Bpush 成功,此时 tail 已经指向新的占位节点,A 还卡着

  3. 线程 C 尝试 pop,将 head 更新后,且将出队 node 中的数据用 nullptr 换出来了(3)

  4. 线程 A 终于恢复执行, old_tail 还指着已经出队的 node,并且 pop 线程已经将出队的 nodedata 换为 nullptr 了,所以(1)还是成功了,A 以为自己成功更新占位节点,实际上早就出队的 node 都准备回收了,所以悲剧就这么发生了。

看起来问题很明显,对吧?这个问题苦苦折磨了我两周有余,百思不得其解,好在最后找到问题了。

node 构造函数的补充

作者在对 node 修改后,忘记了补充它的构造函数实现。

node(): data({nullptr, false, {0}}),
        count({0, 2}),
        next({0, nullptr, {0}}) {}

其中 datanext 初始化为空;count 的内部引用计数为 0, 因为还没有任何执行释放引用的操作执行,并且持有者数量为2(被尾节点引用,被前一个节点引用);

对 lock_free_queue 析构函数的补充

析构函数当然要释放所有节点。

~lock_free_queue() {
    while(pop());
}

以上就是所有内容,感谢你看到这里,有任何问题欢迎讨论指正!

0
Subscribe to my newsletter

Read articles from RuiXin Lin directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

RuiXin Lin
RuiXin Lin