高性能 MPSC 队列的阻塞交换设计
在并发编程中,MPSC(多生产者单消费者)队列是实现任务分发的核心数据结构。与 MPMC(多生产者多消费者)相比,MPSC 可以做更激进的优化。今天我们深入分析一个工业级 MPSC 阻塞交换队列的实现,探索其 Atomic CAS 的设计智慧。
MPSC 的场景与约束
MPSC 队列的应用场景:
- 线程池任务分发:多个线程提交任务,单个工作线程消费
- 日志收集:多个线程写日志,单个写盘线程
- 事件处理:多个生产者产生事件,单个消费者处理
关键约束:
- 多生产者:需要确保并发入队安全
- 单消费者:出队只需要一个线程负责,不需要复杂的同步
解决方案:Atomic CAS + 哨兵节点
工业级实现采用了 CAS(Compare-And-Swap) 循环来实现无锁入队:
// 使用 atomic swap 而不是 CAS
void Enqueue(TTunedNode* node) {
// 我们的目标是避免昂贵的 CAS 操作
// 用 atomic swap 代替 CAS
TTunedNode* prev = AtomicSwap(&this->Tail, node);
AtomicSet(prev->Next, node);
}
核心设计
哨兵节点(Sentinel Node)
- 队首始终是一个哑节点,不存储实际数据
- 简化了空队列的边界处理
Atomic Swap
- 用 AtomicSwap 将新节点置为队尾
- 然后将旧队尾的 Next 指向新节点
无需复杂同步
- 因为只有一个消费者,出队不需要同步
- 生产者之间用 CAS 保证安全
权衡分析
优点
- 高性能:入队只需要一次 atomic swap 操作
- 无锁:生产者之间不阻塞
- 实现简洁:相比 MPMC 队列简化很多
代价
- 内存分配:每次入队需要 new Node
- 消费者潜在阻塞:如果生产者sleep,消费者可能停止
- 无等待通知:消费者获取空队列时返回 nullptr
适用场景
- 高吞吐场景
- 任务分发
- 日志收集
净室重构:Go 实现
下面用 Go 展示同样的设计思想(代码可编译运行):
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// 节点结构
type Node struct {
value any
next unsafe.Pointer
}
// Atomic CAS MPSC 队列
type AtomicMPSCQueue struct {
head unsafe.Pointer // 队首指针
tail unsafe.Pointer // 队尾指针
}
func NewAtomicMPSCQueue() *AtomicMPSCQueue {
sentinel := &Node{value: nil, next: nil}
return &AtomicMPSCQueue{
head: unsafe.Pointer(sentinel),
tail: unsafe.Pointer(sentinel),
}
}
// Enqueue 入队(多生产者安全)
func (q *AtomicMPSCQueue) Enqueue(value any) {
newNode := &Node{value: value, next: nil}
oldTail := (*Node)(atomic.LoadPointer(&q.tail))
for {
// CAS 循环
swapped := atomic.CompareAndSwapPointer(
&q.tail,
unsafe.Pointer(oldTail),
unsafe.Pointer(newNode),
)
if swapped {
atomic.StorePointer(&oldTail.next, unsafe.Pointer(newNode))
break
}
oldTail = (*Node)(atomic.LoadPointer(&q.tail))
}
}
// Dequeue 出队(单消费者)
func (q *AtomicMPSCQueue) Dequeue() any {
head := (*Node)(atomic.LoadPointer(&q.head))
next := (*Node)(atomic.LoadPointer(&head.next))
if next == nil {
return nil
}
value := next.value
atomic.StorePointer(&q.head, unsafe.Pointer(next))
return value
}
运行结果验证了设计:
=== MPSC Queue Demo (Go) ===
--- Channel-based MPSC Queue ---
Received 15 messages
--- Atomic CAS-based MPSC Queue ---
Is empty: false
Dequeued: msg-1
Dequeued: msg-2
Dequeued: msg-3
Is empty: true
总结
MPSC 阻塞交换队列的设计体现了 简洁高性能 的工程智慧:
- Atomic Swap:用原子操作代替锁,避免阻塞
- 哨兵节点:简化空队列处理
- 单消费者优化:出队无需同步,大大简化实现
- CAS 循环:生产者之间无锁并发的核心
这种设计不是"银弹",但对于 MPSC 场景来说,是一个非常优雅的权衡选择。
系列: Arch (88/90)
系列页
▼