High-Performance MPSC Queue: Blocking Swap Design
In concurrent programming, MPSC (Multiple Producer Single Consumer) queues are core data structures for task distribution. Compared to MPMC, MPSC allows more aggressive optimizations. Today we dive into an industrial-grade MPSC blocking swap queue implementation, exploring its Atomic CAS design.
MPSC Scenarios and Constraints
MPSC queue application scenarios:
- Thread pool task distribution: Multiple threads submit tasks, single worker consumes
- Log collection: Multiple threads write logs, single disk-write thread
- Event handling: Multiple producers generate events, single consumer processes
Key constraints:
- Multiple producers: Need to ensure concurrent enqueue safety
- Single consumer: Dequeue only needs one thread, no complex synchronization needed
Solution: Atomic CAS + Sentinel Node
The industrial implementation uses CAS (Compare-And-Swap) loop for lock-free enqueue:
// Using atomic swap instead of CAS
void Enqueue(TTunedNode* node) {
// Our goal is to avoid expensive CAS here
// but now consumer will be blocked until new tail linked.
// Fortunately 'window of inconsistency' is extremely small.
TTunedNode* prev = AtomicSwap(&this->Tail, node);
AtomicSet(prev->Next, node);
}
Core Design
Sentinel Node
- Head always has a dummy node that stores no actual data
- Simplifies empty queue boundary handling
Atomic Swap
- Use AtomicSwap to set new node as tail
- Then set old tail's Next to point to new node
No Complex Synchronization
- Since only one consumer, dequeue needs no synchronization
- Producers use CAS to ensure safety
Trade-off Analysis
Advantages
- High performance: Enqueue only needs one atomic swap operation
- Lock-free: Producers don't block each other
- Simple implementation: Much simpler compared to MPMC queue
Costs
- Memory allocation: Each enqueue needs new Node
- Consumer potential blocking: If producers sleep, consumer may stop
- No wait-notify: Consumer returns nullptr when queue is empty
Use Cases
- High-throughput scenarios
- Task distribution
- Log collection
Clean Room Reimplementation: Go Implementation
Here's the same design philosophy demonstrated in Go (compilable code):
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
// Node structure
type Node struct {
value any
next unsafe.Pointer
}
// Atomic CAS MPSC Queue
type AtomicMPSCQueue struct {
head unsafe.Pointer
tail unsafe.Pointer
}
func NewAtomicMPSCQueue() *AtomicMPSCQueue {
sentinel := &Node{value: nil, next: nil}
return &AtomicMPSCQueue{
head: unsafe.Pointer(sentinel),
tail: unsafe.Pointer(sentinel),
}
}
// Enqueue (multiple producers safe)
func (q *AtomicMPSCQueue) Enqueue(value any) {
newNode := &Node{value: value, next: nil}
oldTail := (*Node)(atomic.LoadPointer(&q.tail))
for {
swapped := atomic.CompareAndSwapPointer(
&q.tail,
unsafe.Pointer(oldTail),
unsafe.Pointer(newNode),
)
if swapped {
atomic.StorePointer(&oldTail.next, unsafe.Pointer(newNode))
break
}
oldTail = (*Node)(atomic.LoadPointer(&q.tail))
}
}
// Dequeue (single consumer)
func (q *AtomicMPSCQueue) Dequeue() any {
head := (*Node)(atomic.LoadPointer(&q.head))
next := (*Node)(atomic.LoadPointer(&head.next))
if next == nil {
return nil
}
value := next.value
atomic.StorePointer(&q.head, unsafe.Pointer(next))
return value
}
The output validates the design:
=== MPSC Queue Demo (Go) ===
--- Channel-based MPSC Queue ---
Received 15 messages
--- Atomic CAS-based MPSC Queue ---
Is empty: false
Dequeued: msg-1
Dequeued: msg-2
Dequeued: msg-3
Is empty: true
Summary
MPSC blocking swap queue design embodies simplicity + high performance engineering wisdom:
- Atomic Swap: Uses atomic operations instead of locks, avoiding blocking
- Sentinel Node: Simplifies empty queue handling
- Single consumer optimization: Dequeue needs no synchronization, greatly simplifying implementation
- CAS loop: Core of lock-free concurrency between producers
This design isn't a silver bullet, but for MPSC scenarios, it's a very elegant trade-off choice.