← Back to Blog
EN中文

谁主张,谁执行:通过 Caller-Runs 策略缓解并发背压

在高并发系统的设计中,任务批处理(Batching)是一个常见的优化手段。我们通常会创建一个线程池,将一批任务丢进去,然后等待结果。

但在极端高负载下,线程池的队列可能会瞬间被打满。这时候,提交任务的线程(Caller)该怎么办?

传统的做法是阻塞等待(Blocking)或者直接拒绝(Rejecting)。但还有一种更积极的策略:Caller-Runs(调用者运行)。既然你(Caller)急着要结果,而线程池又忙不过来,那你为什么不自己动手干呢?

本文将探讨这种“主动协作”的并发模式,并用 Go 语言实现一个基于此策略的批处理器。

什么是 Caller-Runs?

Caller-Runs 是一种背压(Backpressure)处理策略。当生产者(Caller)生成任务的速度超过了消费者(Worker Pool)的处理速度时,为了防止内存溢出或系统崩溃,系统强迫生产者通过执行部分任务来“减速”。

这不仅仅是减速,更是一种资源利用的最大化。

场景推演

假设你有一个 web 服务器,每个请求需要处理 10 个独立的子任务(例如查询 10 个不同的下游服务)。你将这 10 个任务提交给一个全局的 Worker 线程池。

  1. 正常情况:Worker 线程池有空闲,迅速吃掉这 10 个任务。Caller 线程只需稍作等待即可收集结果。
  2. 拥堵情况:Worker 线程池忙于处理其他请求,队列积压。
    • 策略 A (Blocking): Caller 线程挂起,不做任何事,纯粹等待。通过 channel 或 WaitGroup 阻塞。此时,Caller 线程占用了内存(栈空间),但对 CPU 贡献为零。
    • 策略 B (Caller-Runs): Caller 发现提交任务后需要等待,于是它不闲着,主动从自己提交的任务列表中拉取未开始的任务来执行。

策略 B 的优势在于:它把等待的时间转化为了生产力。Caller 线程变成了临时的 Worker。

核心设计:状态竞争

要实现 Caller-Runs,最关键的一点是任务状态的原子性管理

因为一个任务既可能被 Worker 线程抢走执行,也可能被 Caller 线程自己执行,我们需要确保:

  1. 每个任务只被执行一次。
  2. 任务完成后,无论谁执行的,都能通知到等待方。

这通常通过 CAS(Compare-And-Swap)操作来实现。每个任务有一个标志位(如 started),Worker 和 Caller 都会尝试将这个标志位从 0 修改为 1。谁修改成功,谁就负责执行该任务。

Go 语言实现

在 Go 中,我们可以利用 atomic 包和 channel 来优雅地实现这一模式。

以下是一个简化版的 Batcher 实现。它允许你添加任务,并在等待结果时主动参与执行。

package main

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

// Task 是我们要在批处理中执行的工作单位
type Task func() error

// TaskWrapper 包装了原始任务,增加了状态控制
type TaskWrapper struct {
	fn      Task
	started int32 // 0: pending, 1: started
	done    chan struct{}
	err     error
}

func (t *TaskWrapper) Run() {
	// CAS 操作:尝试将 started 从 0 置为 1
	// 只有成功的那个线程才有资格执行任务
	if atomic.CompareAndSwapInt32(&t.started, 0, 1) {
		defer close(t.done)
		t.err = t.fn()
	} else {
		// 如果 CAS 失败,说明其他线程(Worker 或 Caller)已经抢到了任务
		// 我们只需要等待它完成
		<-t.done
	}
}

// Batcher 管理一批任务的执行
type Batcher struct {
	pool  chan func() // 模拟一个简单的 worker pool
	tasks []*TaskWrapper
}

func NewBatcher(poolSize int) *Batcher {
	b := &Batcher{
		pool:  make(chan func(), poolSize), // 带缓冲的 channel 作为任务队列
		tasks: make([]*TaskWrapper, 0),
	}

	// 启动固定数量的 worker
	for i := 0; i < poolSize; i++ {
		go func(id int) {
			for f := range b.pool {
				f()
			}
		}(i)
	}
	return b
}

// Add 提交任务到批处理中
// 它会尝试将任务扔给 pool,如果 pool 满了或忙,这里不阻塞,留给 WaitAndHelp 处理
func (b *Batcher) Add(fn Task) {
	wrapper := &TaskWrapper{
		fn:   fn,
		done: make(chan struct{}),
	}
	b.tasks = append(b.tasks, wrapper)

	// 尝试非阻塞地将任务推入 pool
	select {
	case b.pool <- wrapper.Run:
		// 成功推入 worker 队列
	default:
		// Pool 满了,没关系,我们稍后自己跑
	}
}

// WaitAndHelp 是 Caller-Runs 策略的核心
// 调用者在等待所有任务完成的同时,主动执行那些还没被 Worker 抢到的任务
func (b *Batcher) WaitAndHelp() {
	// 第一轮:主动帮助执行
	// Caller 遍历自己提交的所有任务,尝试通过 Run() 中的 CAS 抢占执行权
	for _, task := range b.tasks {
		task.Run()
	}

	// 第二轮:确保所有任务都已完成
	// 对于那些被 Worker 抢走执行的任务,Run() 会阻塞等待其 done channel 关闭
	for _, task := range b.tasks {
		<-task.done
	}
}

// 模拟使用场景
func main() {
	// 创建一个只有 2 个 worker 的小池子
	batcher := NewBatcher(2)

	// 提交 10 个任务
	// 显然 2 个 worker 来不及处理所有任务,这会触发 Caller-Runs
	for i := 0; i < 10; i++ {
		id := i
		batcher.Add(func() error {
			fmt.Printf("Task %d running\n", id)
			time.Sleep(100 * time.Millisecond)
			return nil
		})
	}

	fmt.Println("Caller starts helping...")
	start := time.Now()
	
	// Caller 线程在此处既是等待者,也是执行者
	batcher.WaitAndHelp()
	
	fmt.Printf("All tasks done in %v\n", time.Since(start))
}

代码解析

  1. Atomic CAS: TaskWrapper 中的 started 字段是核心。无论是 Worker 线程(通过 b.pool 获取)还是 Caller 线程(在 WaitAndHelp 中遍历),执行前都必须通过 atomic.CompareAndSwapInt32 竞争执行权。
  2. 非阻塞提交: 在 Add 方法中,我们使用 select + default 向 channel 发送任务。如果 worker channel 满了,我们不会在此处阻塞,而是直接通过。这意味着那些没进 channel 的任务,注定要由 Caller 或者后续的空闲 Worker 来处理(如果实现了更复杂的窃取逻辑)。但在本例简化版中,它们主要依赖 Caller 在 WaitAndHelp 中执行。
  3. 双重兜底: WaitAndHelp 中的 task.Run() 既包含了“执行逻辑”(如果 CAS 成功),也包含了“等待逻辑”(如果 CAS 失败,else 分支会 <-t.done)。

权衡与代价

Caller-Runs 并不是银弹,它也有适用的边界。

优点

  1. 天然的背压: 当 Worker 处理不过来时,Caller 被迫自己干活。这自然地降低了 Caller 提交新 Batch 的速度,防止了系统过载。
  2. 利用空闲算力: Caller 在等待结果时往往是空转的(Blocked),让它干活能提高 CPU 利用率。
  3. 降低延迟: 对于本次 Batch 的任务,Caller 自己动手通常比等待排队要快。

缺点

  1. Caller 隔离性破坏: 这是最大的风险。如果 Caller 是一个非常关键的线程(比如 UI 主线程,或者处理高优先级心跳的线程),让它去执行可能耗时较长的普通任务,会导致 UI 卡顿或心跳超时。
    • Best Practice: 仅在 Caller 线程也是即插即用的 Worker 线程(如 RPC 处理线程)时使用此策略。
  2. 上下文污染: Caller 线程执行任务时,其 ThreadLocal 变量(在 Go 中虽然少用但概念类似)可能会影响任务执行,或者任务的执行污染 Caller 的上下文。

总结

Caller-Runs 策略体现了一种“实用主义”的并发哲学:如果没人干活,那就自己动手

在构建高吞吐的微服务或数据处理管道时,这种模式能显著提升系统的鲁棒性。它不再假设 Worker 资源是无限的,而是承认资源的有限性,并通过协作来优雅降级。

下次当你发现线程池队列总是满载时,不妨考虑一下:是不是可以让那些焦急等待的 Caller 也下来搭把手?