← Back to Blog
EN中文

高性能 MPSC 队列的阻塞交换设计

在并发编程中,MPSC(多生产者单消费者)队列是实现任务分发的核心数据结构。与 MPMC(多生产者多消费者)相比,MPSC 可以做更激进的优化。今天我们深入分析一个工业级 MPSC 阻塞交换队列的实现,探索其 Atomic CAS 的设计智慧。

MPSC 的场景与约束

MPSC 队列的应用场景:

  • 线程池任务分发:多个线程提交任务,单个工作线程消费
  • 日志收集:多个线程写日志,单个写盘线程
  • 事件处理:多个生产者产生事件,单个消费者处理

关键约束:

  • 多生产者:需要确保并发入队安全
  • 单消费者:出队只需要一个线程负责,不需要复杂的同步

解决方案:Atomic CAS + 哨兵节点

工业级实现采用了 CAS(Compare-And-Swap) 循环来实现无锁入队:

// 使用 atomic swap 而不是 CAS
void Enqueue(TTunedNode* node) {
    // 我们的目标是避免昂贵的 CAS 操作
    // 用 atomic swap 代替 CAS
    TTunedNode* prev = AtomicSwap(&this->Tail, node);
    AtomicSet(prev->Next, node);
}

核心设计

  1. 哨兵节点(Sentinel Node)

    • 队首始终是一个哑节点,不存储实际数据
    • 简化了空队列的边界处理
  2. Atomic Swap

    • 用 AtomicSwap 将新节点置为队尾
    • 然后将旧队尾的 Next 指向新节点
  3. 无需复杂同步

    • 因为只有一个消费者,出队不需要同步
    • 生产者之间用 CAS 保证安全

权衡分析

优点

  1. 高性能:入队只需要一次 atomic swap 操作
  2. 无锁:生产者之间不阻塞
  3. 实现简洁:相比 MPMC 队列简化很多

代价

  1. 内存分配:每次入队需要 new Node
  2. 消费者潜在阻塞:如果生产者sleep,消费者可能停止
  3. 无等待通知:消费者获取空队列时返回 nullptr

适用场景

  • 高吞吐场景
  • 任务分发
  • 日志收集

净室重构:Go 实现

下面用 Go 展示同样的设计思想(代码可编译运行):

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"unsafe"
)

// 节点结构
type Node struct {
	value any
	next  unsafe.Pointer
}

// Atomic CAS MPSC 队列
type AtomicMPSCQueue struct {
	head unsafe.Pointer // 队首指针
	tail unsafe.Pointer // 队尾指针
}

func NewAtomicMPSCQueue() *AtomicMPSCQueue {
	sentinel := &Node{value: nil, next: nil}
	return &AtomicMPSCQueue{
		head: unsafe.Pointer(sentinel),
		tail: unsafe.Pointer(sentinel),
	}
}

// Enqueue 入队(多生产者安全)
func (q *AtomicMPSCQueue) Enqueue(value any) {
	newNode := &Node{value: value, next: nil}
	oldTail := (*Node)(atomic.LoadPointer(&q.tail))
	
	for {
		// CAS 循环
		swapped := atomic.CompareAndSwapPointer(
			&q.tail, 
			unsafe.Pointer(oldTail), 
			unsafe.Pointer(newNode),
		)
		if swapped {
			atomic.StorePointer(&oldTail.next, unsafe.Pointer(newNode))
			break
		}
		oldTail = (*Node)(atomic.LoadPointer(&q.tail))
	}
}

// Dequeue 出队(单消费者)
func (q *AtomicMPSCQueue) Dequeue() any {
	head := (*Node)(atomic.LoadPointer(&q.head))
	next := (*Node)(atomic.LoadPointer(&head.next))
	
	if next == nil {
		return nil
	}
	
	value := next.value
	atomic.StorePointer(&q.head, unsafe.Pointer(next))
	return value
}

运行结果验证了设计:

=== MPSC Queue Demo (Go) ===

--- Channel-based MPSC Queue ---
Received 15 messages

--- Atomic CAS-based MPSC Queue ---
Is empty: false
Dequeued: msg-1
Dequeued: msg-2
Dequeued: msg-3
Is empty: true

总结

MPSC 阻塞交换队列的设计体现了 简洁高性能 的工程智慧:

  1. Atomic Swap:用原子操作代替锁,避免阻塞
  2. 哨兵节点:简化空队列处理
  3. 单消费者优化:出队无需同步,大大简化实现
  4. CAS 循环:生产者之间无锁并发的核心

这种设计不是"银弹",但对于 MPSC 场景来说,是一个非常优雅的权衡选择。