Caller-Runs: Handling Backpressure by Rolling Up Your Sleeves
In high-concurrency system design, batching tasks is a standard optimization. We usually spawn a pool of workers, toss a batch of tasks into a queue, and wait for results.
But what happens when the load spikes? The worker pool saturates, the queue fills up, and your system starts to groan.
At this point, the thread submitting the tasks (the Caller) has a choice. It can block and wait (helplessly), or it can reject the tasks (fail fast). But there's a third, more proactive option: Caller-Runs.
If you want something done and the pool is busy, why not do it yourself?
The Philosophy of "Helping"
Caller-Runs is a form of backpressure handling. When producers (Callers) generate work faster than consumers (Workers) can process it, the system forces the producer to slow down by making it execute some of the work itself.
This isn't just about throttling; it's about maximizing resource utilization.
The Scenario
Imagine a web server where each incoming request triggers 10 sub-tasks (e.g., querying 10 different microservices). You submit these 10 tasks to a global worker pool.
- Happy Path: The worker pool has capacity. It picks up the tasks immediately. The Caller waits briefly and collects the results.
- Sad Path (Congestion): The worker pool is swamped.
- Strategy A (Blocking): The Caller thread blocks. It sits idle, consuming memory (stack space), but contributing zero CPU cycles. It’s a zombie thread waiting on a bottleneck.
- Strategy B (Caller-Runs): The Caller realizes the pool is busy. Instead of idling, it starts picking tasks from its own batch and executing them.
Strategy B turns "waiting time" into "working time." The Caller effectively becomes a temporary Worker.
Core Design: Atomic State Transitions
Implementing Caller-Runs requires careful coordination. A task might be picked up by a Worker thread or the Caller thread. We must ensure:
- Each task runs exactly once.
- Completion is signaled to the waiter regardless of who ran it.
This is a classic use case for CAS (Compare-And-Swap). We use an atomic flag (like started) for each task. Both the Worker and the Caller race to flip this flag from 0 to 1. The winner executes the task; the loser waits.
Implementation in Go
Go's concurrency primitives (atomic and channels) make this pattern elegant to implement.
Here is a simplified Batcher that demonstrates the concept. It allows you to submit tasks and then "Wait and Help."
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// Task represents a unit of work
type Task func() error
// TaskWrapper adds state control to the raw task
type TaskWrapper struct {
fn Task
started int32 // 0: pending, 1: started
done chan struct{}
err error
}
func (t *TaskWrapper) Run() {
// CAS Operation: Try to swap 'started' from 0 to 1.
// Only the thread that succeeds gets to run the task.
if atomic.CompareAndSwapInt32(&t.started, 0, 1) {
defer close(t.done)
t.err = t.fn()
} else {
// If CAS fails, another thread (Worker or Caller) beat us to it.
// We just wait for them to finish.
<-t.done
}
}
// Batcher manages a group of tasks
type Batcher struct {
pool chan func() // A simple buffered channel as a worker pool
tasks []*TaskWrapper
}
func NewBatcher(poolSize int) *Batcher {
b := &Batcher{
pool: make(chan func(), poolSize),
tasks: make([]*TaskWrapper, 0),
}
// Start fixed number of workers
for i := 0; i < poolSize; i++ {
go func(id int) {
for f := range b.pool {
f()
}
}(i)
}
return b
}
// Add submits a task to the batch.
// It tries to push to the pool non-blocking. If full, it leaves it for WaitAndHelp.
func (b *Batcher) Add(fn Task) {
wrapper := &TaskWrapper{
fn: fn,
done: make(chan struct{}),
}
b.tasks = append(b.tasks, wrapper)
// Try to push to worker pool without blocking
select {
case b.pool <- wrapper.Run:
// Successfully queued for a worker
default:
// Pool is full. We'll run this ourselves later.
}
}
// WaitAndHelp is the heart of the Caller-Runs strategy.
// The caller helps execute pending tasks while waiting for completion.
func (b *Batcher) WaitAndHelp() {
// Phase 1: Aggressively help
// The Caller iterates through its own tasks, trying to steal them back via CAS in Run()
for _, task := range b.tasks {
task.Run()
}
// Phase 2: Ensure completion
// Wait for any tasks that were stolen by workers to finish
for _, task := range b.tasks {
<-task.done
}
}
// Usage Simulation
func main() {
// A tiny pool with only 2 workers
batcher := NewBatcher(2)
// Submit 10 tasks
// The 2 workers can't handle this instantly, triggering Caller-Runs
for i := 0; i < 10; i++ {
id := i
batcher.Add(func() error {
fmt.Printf("Task %d running\n", id)
time.Sleep(100 * time.Millisecond)
return nil
})
}
fmt.Println("Caller starts helping...")
start := time.Now()
// The Caller thread is both waiting AND working here
batcher.WaitAndHelp()
fmt.Printf("All tasks done in %v\n", time.Since(start))
}
Key Takeaways from the Code
- Atomic Coordination: The
TaskWrapperholds the state. Thestartedflag is the source of truth. Whether it's a Worker thread popping fromb.poolor the Caller thread iterating inWaitAndHelp, they both execute the exact sameRun()method, which guards execution withatomic.CompareAndSwapInt32. - Non-Blocking Submission: In
Add, we useselect + defaultto push to the channel. If the channel is full, we don't block. We skip the queue. These "skipped" tasks are destined to be picked up by the Caller inWaitAndHelp(or potentially by workers if we added work-stealing, but let's keep it simple). - Dual Role:
WaitAndHelpis where the magic happens. The Caller doesn't justWait; itHelps.
Trade-offs
Caller-Runs is powerful, but it requires caution.
The Good
- Natural Backpressure: If the workers are overwhelmed, the Caller has to do the work. This naturally slows down the rate at which new batches are submitted.
- Latency Reduction: For the specific batch being processed, having the Caller help is usually faster than waiting in a long queue.
- CPU Efficiency: Converts idle wait time into productive execution time.
The Bad
- Caller Thread Hygiene: This is the big one. If your Caller thread is special (e.g., a UI thread, an event loop, or a high-priority control thread), you do not want it executing arbitrary background tasks. A slow task could freeze your UI or cause heartbeats to timeout.
- Rule of Thumb: Only use Caller-Runs if the Caller thread is functionally equivalent to a Worker thread (e.g., a generic request handler).
- Context Leaks: In languages with ThreadLocals, the Caller might accidentally leak context into the task, or vice versa.
Conclusion
The Caller-Runs strategy embodies a pragmatic approach to concurrency: If you want it done right (now), do it yourself.
It builds robustness into high-throughput systems by acknowledging that worker pools are finite. Instead of assuming infinite capacity, it uses collaboration to degrade gracefully under load.
Next time you see a saturated thread pool, ask yourself: could the waiters be working?