← Back to Blog

Dynamic Backpressure and Adaptability in Elastic Queues

In high-concurrency systems, task queues are the core bridge between producers and consumers. However, elegantly handling new tasks when the queue is full is a tricky problem. Should we reject tasks? Block the producer? Or dynamically adjust? Today we dive into an industrial-grade Elastic Queue implementation, exploring its dynamic backpressure design.

The Problem: Static vs Dynamic

Traditional task queues use static capacity strategy: set a fixed maximum queue length, reject new tasks or block producers when full. Simple, but has an issue:

Scenario: Queue max size = 100, 4 worker threads.

  • When queue has 100 tasks and all 4 threads are busy, new tasks are rejected
  • But if only 1 thread is busy, there are actually 3 threads of "processing capacity" idle

Core contradiction: Static capacity doesn't account for worker thread busy state, leading to poor resource utilization.

Solution: Dynamic Capacity

Industrial elastic queues use dynamic capacity strategy:

// Actual queue limit = maxQueueSize - numBusyThreads
// Key insight: Queue capacity should dynamically adjust with busy thread count

Core Implementation

class TElasticQueue: public IThreadPool {
private:
    TAtomic ObjectCount_ = 0;  // Current task count
    TAtomic GuardCount_ = 0;  // Guard count
    
    bool TryIncCounter() {
        // Actual limit = maxQueueSize - busy threads
        if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) {
            AtomicDecrement(GuardCount_);
            return false;
        }
        return true;
    }
};

Key design points:

  1. GuardCount atomic: Uses atomic operations to control enqueue, preventing concurrency issues
  2. Dynamic capacity calculation: Actual allowed enqueue count = maxSize - busyThreads
  3. Task wrapper: TDecrementingWrapper automatically decrements count on task completion

Task Wrapper

class TDecrementingWrapper: public IObjectInQueue {
    void Process(void* tsr) override {
        RealObject_->Process(tsr);
        // Auto-decrement count on task completion
        AtomicDecrement(Queue_->ObjectCount_);
        AtomicDecrement(Queue_->GuardCount_);
    }
};

This design ensures:

  • GuardCount++ on task enqueue
  • GuardCount-- and ObjectCount-- on task completion
  • Count always reflects true queue load state

Trade-off Analysis

Advantages

  1. High resource utilization: When queue is full, new tasks can enqueue if any thread is idle
  2. Adaptive backpressure: Dynamically adjusts rejection policy based on actual load
  3. Elegant implementation: Returns failure instead of blocking producers indefinitely

Costs

  1. Memory overhead: Each task requires additional wrapper object
  2. Atomic operation overhead: TryIncCounter requires atomic increment/decrement
  3. Implementation complexity: Requires careful handling of concurrent boundary cases

Use Cases

  • CPU-intensive tasks: Fixed thread pool size, need dynamic adjustment based on load
  • Service mesh: Need adaptive rate limiting
  • Batch processing: Uncertain task processing time, need dynamic capacity

Clean Room Reimplementation: Go Implementation

Here's the same design philosophy demonstrated in Go (compilable code):

package main

import (
	"fmt"
	"sync/atomic"
	"time"
)

// Task wrapper - auto-decrements count on completion
type decrementingWrapper struct {
	realObject IObjectInQueue
	queue      *ElasticQueue
}

func (w *decrementingWrapper) Process() {
	w.realObject.Process()
	atomic.AddInt64(&w.queue.objCount, -1)
	atomic.AddInt64(&w.queue.guardCount, -1)
}

// ElasticQueue
// Core design: actual capacity = maxQueueSize - busyThreads
type ElasticQueue struct {
	slaveQueue chan IObjectInQueue
	maxSize    int64
	objCount   int64
	guardCount int64
}

func (q *ElasticQueue) TryIncCounter() bool {
	busyThreads := atomic.LoadInt64(&q.objCount)
	maxAllowed := q.maxSize - busyThreads

	if atomic.AddInt64(&q.guardCount, 1) > maxAllowed {
		atomic.AddInt64(&q.guardCount, -1)
		return false
	}
	return true
}

func (q *ElasticQueue) Add(obj IObjectInQueue) bool {
	if !q.TryIncCounter() {
		return false
	}

	wrapper := &decrementingWrapper{
		realObject: obj,
		queue:      q,
	}

	atomic.AddInt64(&q.objCount, 1)

	select {
	case q.slaveQueue <- wrapper:
		return true
	default:
		atomic.AddInt64(&q.objCount, -1)
		atomic.AddInt64(&q.guardCount, -1)
		return false
	}
}

The output validates the design:

=== Elastic Queue Demo ===
Task 0 added successfully
Task 1 added successfully
...
Task 0 processed
Task 3 processed
...

=== Backpressure Test ===
Task 125 rejected - backpressure active
Task 126 rejected - backpressure active
...

Summary

Elastic queue design embodies dynamic adaptability engineering wisdom:

  1. Dynamic capacity: capacity = maxSize - busyThreads, fully utilizing idle threads
  2. Atomic guard: Uses atomic operations for concurrent enqueue control, ensuring thread safety
  3. Auto-release: Wrapper automatically decrements count on task completion, no manual management needed
  4. Graceful degradation: Returns failure when queue is full instead of infinite blocking

This design isn't a silver bullet, but for high-concurrency systems requiring high resource utilization, it's a worthwhile trade-off to consider.