Distributed Task Dispatching and Lease Management with Anti-Thrashing Design
In distributed crawlers or large-scale state synchronization systems, task dispatching is a core challenge. Especially when the task queue is massive and processing nodes (Workers) can crash or experience network partitions at any time, how do we guarantee "At-Least-Once processing" while "minimizing duplicate processing"?
This article explores a Push Queue Manager design based on the Lease mechanism. This pattern is commonly used to coordinate exclusive access by distributed workers to shared resources (such as Tablets or Partitions).
The Core Challenge: Node Loss and Task Drifting
In a decentralized or weakly centralized cluster, if a Worker takes Task A but then goes silent (Crash or GC pause), how does the cluster know to redistribute Task A?
A common solution is Heartbeats, but that's not enough. We need to introduce the concept of a Lease: Workers not only "take" a task but must also periodically "renew" it. Once a lease expires, other Workers can take over the task via "preemption."
Furthermore, to prevent frequent task drifting (Thrashing) caused by network jitter, we need to introduce Anti-Thrashing (Debouncing) mechanisms. This involves setting a grace period after lease expiration or using a hold threshold to smooth out state transitions.
Architectural Design: Lease Lifecycle
Designing a DistributedQueueManager, the core logic is as follows:
- Acquire (Preempt): Worker attempts to lock a specific shard.
- Renew: The Worker holding the lock sends periodic heartbeats to extend the expiration time.
- Revoke: If a Worker fails to renew for a long time, exceeding the
RevokeThreshold, the lock is automatically released or forcibly preempted by others. - Debounce: In some implementations, even if a lease is detected as expired, the system might wait for a few heartbeat cycles to confirm the node is truly dead, preventing misjudgment due to transient network latency.
Clean Room Implementation (Go)
The following code uses Go to simulate a lease-based task dispatcher. For simplicity, we use an in-memory Map to simulate distributed storage (like Etcd or ZooKeeper).
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Lease represents a lease
type Lease struct {
OwnerID string
Expiration time.Time
}
// SimulatedStore mimics distributed storage (like Etcd/Redis)
type SimulatedStore struct {
mu sync.Mutex
leases map[string]Lease
}
func NewStore() *SimulatedStore {
return &SimulatedStore{leases: make(map[string]Lease)}
}
// TryAcquire attempts to acquire a lease (CAS logic)
func (s *SimulatedStore) TryAcquire(resourceID, ownerID string, duration time.Duration) bool {
s.mu.Lock()
defer s.mu.Unlock()
lease, exists := s.leases[resourceID]
now := time.Now()
// If lease doesn't exist, or has expired -> Preemption successful
if !exists || now.After(lease.Expiration) {
s.leases[resourceID] = Lease{
OwnerID: ownerID,
Expiration: now.Add(duration),
}
return true
}
// If owned by self -> Renewal successful
if lease.OwnerID == ownerID {
s.leases[resourceID] = Lease{
OwnerID: ownerID,
Expiration: now.Add(duration),
}
return true
}
// Owned by someone else and not expired -> Failed
return false
}
// Worker simulates a distributed node
type Worker struct {
ID string
Store *SimulatedStore
StopCh chan struct{}
}
func (w *Worker) Start(resourceID string) {
ticker := time.NewTicker(500 * time.Millisecond) // Heartbeat interval
leaseDuration := 2 * time.Second // Lease duration
go func() {
defer ticker.Stop()
for {
select {
case <-w.StopCh:
fmt.Printf("[%s] Stopping...\n", w.ID)
return
case <-ticker.C:
// Attempt to acquire or renew
success := w.Store.TryAcquire(resourceID, w.ID, leaseDuration)
if success {
fmt.Printf("[%s] Holds lease on %s (Processing task...)\n", w.ID, resourceID)
// Execute actual task logic here...
} else {
fmt.Printf("[%s] Failed to acquire lease on %s\n", w.ID, resourceID)
}
}
}
}()
}
func main() {
store := NewStore()
resource := "partition-0"
// Worker 1 starts
w1 := &Worker{ID: "worker-1", Store: store, StopCh: make(chan struct{})}
w1.Start(resource)
// Let Worker 1 run for a while
time.Sleep(3 * time.Second)
// Worker 2 starts, tries to preempt
w2 := &Worker{ID: "worker-2", Store: store, StopCh: make(chan struct{})}
w2.Start(resource)
// At this point Worker 1 is still alive, Worker 2 should fail
time.Sleep(2 * time.Second)
// Simulate Worker 1 crash (stops renewing)
fmt.Println("!!! Worker 1 Crashed !!!")
close(w1.StopCh)
// Wait for lease to expire (Lease Duration is 2s)
time.Sleep(3 * time.Second)
// Worker 2 should succeed in preemption
time.Sleep(2 * time.Second)
close(w2.StopCh)
}
Key Design Trade-offs
1. Lease TTL (Time-To-Live)
This is a classic trade-off point:
- TTL too short: Renewal requests (RPCs) become frequent, putting pressure on central storage (like Etcd); minor network jitters can cause lease loss, leading to unnecessary task rescheduling.
- TTL too long: If a node crashes, the Mean Time To Recovery (MTTR) increases. The system must wait for the TTL to expire before other nodes can take over, causing task processing stagnation during this period.
2. Split-Brain Problem
In distributed systems, it's hard to achieve a truly "single master." In the code above, if Worker 1 just pauses due to network lag (GC Pause) for 3 seconds, its lease expires, and Worker 2 acquires the lock. When Worker 1 recovers, it might still "think" it holds the lock and continue executing tasks. Anti-Thrashing Design: Usually requires introducing a Fencing Token (often a version number). Every write operation must carry the lease version, and the storage layer rejects writes with old versions.
3. Revoke Threshold
Besides TTL, some systems introduce a RevokeThreshold. For example, even if the lease is 10 seconds, if a Worker fails 3 consecutive heartbeats (e.g., 3 seconds), the Manager might mark the node as "suspected offline" and begin preparing task migration, rather than rigidly waiting for the full 10 seconds. This is a proactive anti-thrashing optimization.
Summary
Lease-based task dispatching is one of the most fundamental patterns in distributed systems. It solves the problem of dynamic node availability through simple "preempt-renew" logic. However, in production environments, TTL and heartbeat frequency must be carefully tuned, and combined with versioning (Fencing) mechanisms to find the balance between availability and consistency.