谁主张,谁执行:通过 Caller-Runs 策略缓解并发背压
在高并发系统的设计中,任务批处理(Batching)是一个常见的优化手段。我们通常会创建一个线程池,将一批任务丢进去,然后等待结果。
但在极端高负载下,线程池的队列可能会瞬间被打满。这时候,提交任务的线程(Caller)该怎么办?
传统的做法是阻塞等待(Blocking)或者直接拒绝(Rejecting)。但还有一种更积极的策略:Caller-Runs(调用者运行)。既然你(Caller)急着要结果,而线程池又忙不过来,那你为什么不自己动手干呢?
本文将探讨这种“主动协作”的并发模式,并用 Go 语言实现一个基于此策略的批处理器。
什么是 Caller-Runs?
Caller-Runs 是一种背压(Backpressure)处理策略。当生产者(Caller)生成任务的速度超过了消费者(Worker Pool)的处理速度时,为了防止内存溢出或系统崩溃,系统强迫生产者通过执行部分任务来“减速”。
这不仅仅是减速,更是一种资源利用的最大化。
场景推演
假设你有一个 web 服务器,每个请求需要处理 10 个独立的子任务(例如查询 10 个不同的下游服务)。你将这 10 个任务提交给一个全局的 Worker 线程池。
- 正常情况:Worker 线程池有空闲,迅速吃掉这 10 个任务。Caller 线程只需稍作等待即可收集结果。
- 拥堵情况:Worker 线程池忙于处理其他请求,队列积压。
- 策略 A (Blocking): Caller 线程挂起,不做任何事,纯粹等待。通过 channel 或 WaitGroup 阻塞。此时,Caller 线程占用了内存(栈空间),但对 CPU 贡献为零。
- 策略 B (Caller-Runs): Caller 发现提交任务后需要等待,于是它不闲着,主动从自己提交的任务列表中拉取未开始的任务来执行。
策略 B 的优势在于:它把等待的时间转化为了生产力。Caller 线程变成了临时的 Worker。
核心设计:状态竞争
要实现 Caller-Runs,最关键的一点是任务状态的原子性管理。
因为一个任务既可能被 Worker 线程抢走执行,也可能被 Caller 线程自己执行,我们需要确保:
- 每个任务只被执行一次。
- 任务完成后,无论谁执行的,都能通知到等待方。
这通常通过 CAS(Compare-And-Swap)操作来实现。每个任务有一个标志位(如 started),Worker 和 Caller 都会尝试将这个标志位从 0 修改为 1。谁修改成功,谁就负责执行该任务。
Go 语言实现
在 Go 中,我们可以利用 atomic 包和 channel 来优雅地实现这一模式。
以下是一个简化版的 Batcher 实现。它允许你添加任务,并在等待结果时主动参与执行。
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// Task 是我们要在批处理中执行的工作单位
type Task func() error
// TaskWrapper 包装了原始任务,增加了状态控制
type TaskWrapper struct {
fn Task
started int32 // 0: pending, 1: started
done chan struct{}
err error
}
func (t *TaskWrapper) Run() {
// CAS 操作:尝试将 started 从 0 置为 1
// 只有成功的那个线程才有资格执行任务
if atomic.CompareAndSwapInt32(&t.started, 0, 1) {
defer close(t.done)
t.err = t.fn()
} else {
// 如果 CAS 失败,说明其他线程(Worker 或 Caller)已经抢到了任务
// 我们只需要等待它完成
<-t.done
}
}
// Batcher 管理一批任务的执行
type Batcher struct {
pool chan func() // 模拟一个简单的 worker pool
tasks []*TaskWrapper
}
func NewBatcher(poolSize int) *Batcher {
b := &Batcher{
pool: make(chan func(), poolSize), // 带缓冲的 channel 作为任务队列
tasks: make([]*TaskWrapper, 0),
}
// 启动固定数量的 worker
for i := 0; i < poolSize; i++ {
go func(id int) {
for f := range b.pool {
f()
}
}(i)
}
return b
}
// Add 提交任务到批处理中
// 它会尝试将任务扔给 pool,如果 pool 满了或忙,这里不阻塞,留给 WaitAndHelp 处理
func (b *Batcher) Add(fn Task) {
wrapper := &TaskWrapper{
fn: fn,
done: make(chan struct{}),
}
b.tasks = append(b.tasks, wrapper)
// 尝试非阻塞地将任务推入 pool
select {
case b.pool <- wrapper.Run:
// 成功推入 worker 队列
default:
// Pool 满了,没关系,我们稍后自己跑
}
}
// WaitAndHelp 是 Caller-Runs 策略的核心
// 调用者在等待所有任务完成的同时,主动执行那些还没被 Worker 抢到的任务
func (b *Batcher) WaitAndHelp() {
// 第一轮:主动帮助执行
// Caller 遍历自己提交的所有任务,尝试通过 Run() 中的 CAS 抢占执行权
for _, task := range b.tasks {
task.Run()
}
// 第二轮:确保所有任务都已完成
// 对于那些被 Worker 抢走执行的任务,Run() 会阻塞等待其 done channel 关闭
for _, task := range b.tasks {
<-task.done
}
}
// 模拟使用场景
func main() {
// 创建一个只有 2 个 worker 的小池子
batcher := NewBatcher(2)
// 提交 10 个任务
// 显然 2 个 worker 来不及处理所有任务,这会触发 Caller-Runs
for i := 0; i < 10; i++ {
id := i
batcher.Add(func() error {
fmt.Printf("Task %d running\n", id)
time.Sleep(100 * time.Millisecond)
return nil
})
}
fmt.Println("Caller starts helping...")
start := time.Now()
// Caller 线程在此处既是等待者,也是执行者
batcher.WaitAndHelp()
fmt.Printf("All tasks done in %v\n", time.Since(start))
}
代码解析
- Atomic CAS:
TaskWrapper中的started字段是核心。无论是 Worker 线程(通过b.pool获取)还是 Caller 线程(在WaitAndHelp中遍历),执行前都必须通过atomic.CompareAndSwapInt32竞争执行权。 - 非阻塞提交: 在
Add方法中,我们使用select + default向 channel 发送任务。如果 worker channel 满了,我们不会在此处阻塞,而是直接通过。这意味着那些没进 channel 的任务,注定要由 Caller 或者后续的空闲 Worker 来处理(如果实现了更复杂的窃取逻辑)。但在本例简化版中,它们主要依赖 Caller 在WaitAndHelp中执行。 - 双重兜底:
WaitAndHelp中的task.Run()既包含了“执行逻辑”(如果 CAS 成功),也包含了“等待逻辑”(如果 CAS 失败,else 分支会<-t.done)。
权衡与代价
Caller-Runs 并不是银弹,它也有适用的边界。
优点
- 天然的背压: 当 Worker 处理不过来时,Caller 被迫自己干活。这自然地降低了 Caller 提交新 Batch 的速度,防止了系统过载。
- 利用空闲算力: Caller 在等待结果时往往是空转的(Blocked),让它干活能提高 CPU 利用率。
- 降低延迟: 对于本次 Batch 的任务,Caller 自己动手通常比等待排队要快。
缺点
- Caller 隔离性破坏: 这是最大的风险。如果 Caller 是一个非常关键的线程(比如 UI 主线程,或者处理高优先级心跳的线程),让它去执行可能耗时较长的普通任务,会导致 UI 卡顿或心跳超时。
- Best Practice: 仅在 Caller 线程也是即插即用的 Worker 线程(如 RPC 处理线程)时使用此策略。
- 上下文污染: Caller 线程执行任务时,其 ThreadLocal 变量(在 Go 中虽然少用但概念类似)可能会影响任务执行,或者任务的执行污染 Caller 的上下文。
总结
Caller-Runs 策略体现了一种“实用主义”的并发哲学:如果没人干活,那就自己动手。
在构建高吞吐的微服务或数据处理管道时,这种模式能显著提升系统的鲁棒性。它不再假设 Worker 资源是无限的,而是承认资源的有限性,并通过协作来优雅降级。
下次当你发现线程池队列总是满载时,不妨考虑一下:是不是可以让那些焦急等待的 Caller 也下来搭把手?