← Back to Blog
EN中文

分布式系统中的租约式任务分发与防抖设计

在分布式爬虫或大规模状态同步系统中,任务分发(Task Dispatching)是一个核心难题。特别是当任务队列非常庞大,且处理节点(Worker)随时可能宕机或网络分区时,如何保证**“每个任务至少被处理一次(At-Least-Once)”“尽量不被重复处理”**?

本文将探讨一种基于**租约(Lease)**机制的 Push Queue Manager 设计。这种模式常用于协调分布式 Worker 对共享资源(如 Tablet、Partition)的独占访问。

核心挑战:节点失联与任务漂移

在一个无中心或弱中心的集群中,如果一个 Worker 领走了任务 A 但随后失联了(Crash 或 GC 停顿),集群怎么知道该重新分发任务 A?

常见的解法是心跳(Heartbeat),但这还不够。我们需要引入**租约(Lease)**的概念:Worker 不仅要“拿”任务,还要定期“续租”。一旦租约过期,其他 Worker 就可以通过“抢占”来接管该任务。

此外,为了防止因网络抖动导致的频繁任务漂移(Thrashing),还需要引入**防抖动(Debouncing)**机制,即在租约过期后设定一个缓冲期(Grace Period),或者通过持有阈值(Hold Threshold)来平滑状态变更。

架构设计:租约生命周期

设计一个 DistributedQueueManager,其核心逻辑如下:

  1. Acquire (抢占):Worker 尝试对某个分片(Shard)加锁。
  2. Renew (续租):持有锁的 Worker 定期发送心跳,延长过期时间。
  3. Revoke (撤销):如果 Worker 长时间未续租,超过 RevokeThreshold,锁自动释放或被他人强制抢占。
  4. Debounce (防抖):在某些实现中,即使检测到租约失效,也可能等待几个心跳周期以确认节点真的死亡,防止因瞬时网络延迟导致的误判。

净室实现(Go)

下面的代码使用 Go 模拟了一个基于租约的任务分发器。为了简化,我们用内存 Map 模拟分布式存储(如 Etcd 或 ZooKeeper)。

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Lease 代表一个租约
type Lease struct {
	OwnerID    string
	Expiration time.Time
}

// SimulatedStore 模拟分布式存储 (如 Etcd/Redis)
type SimulatedStore struct {
	mu     sync.Mutex
	leases map[string]Lease
}

func NewStore() *SimulatedStore {
	return &SimulatedStore{leases: make(map[string]Lease)}
}

// TryAcquire 尝试获取租约 (CAS 逻辑)
func (s *SimulatedStore) TryAcquire(resourceID, ownerID string, duration time.Duration) bool {
	s.mu.Lock()
	defer s.mu.Unlock()

	lease, exists := s.leases[resourceID]
	now := time.Now()

	// 如果租约不存在,或已过期 -> 抢占成功
	if !exists || now.After(lease.Expiration) {
		s.leases[resourceID] = Lease{
			OwnerID:    ownerID,
			Expiration: now.Add(duration),
		}
		return true
	}

	// 如果是自己持有的 -> 续租成功
	if lease.OwnerID == ownerID {
		s.leases[resourceID] = Lease{
			OwnerID:    ownerID,
			Expiration: now.Add(duration),
		}
		return true
	}

	// 别人持有且未过期 -> 失败
	return false
}

// Worker 模拟分布式节点
type Worker struct {
	ID        string
	Store     *SimulatedStore
	StopCh    chan struct{}
}

func (w *Worker) Start(resourceID string) {
	ticker := time.NewTicker(500 * time.Millisecond) // 心跳间隔
	leaseDuration := 2 * time.Second                 // 租约时长

	go func() {
		defer ticker.Stop()
		for {
			select {
			case <-w.StopCh:
				fmt.Printf("[%s] Stopping...\n", w.ID)
				return
			case <-ticker.C:
				// 尝试获取或续租
				success := w.Store.TryAcquire(resourceID, w.ID, leaseDuration)
				if success {
					fmt.Printf("[%s] Holds lease on %s (Processing task...)\n", w.ID, resourceID)
					// 这里执行实际的任务逻辑...
				} else {
					fmt.Printf("[%s] Failed to acquire lease on %s\n", w.ID, resourceID)
				}
			}
		}
	}()
}

func main() {
	store := NewStore()
	resource := "partition-0"

	// Worker 1 启动
	w1 := &Worker{ID: "worker-1", Store: store, StopCh: make(chan struct{})}
	w1.Start(resource)

	// 让 Worker 1 运行一会儿
	time.Sleep(3 * time.Second)

	// Worker 2 启动,尝试抢占
	w2 := &Worker{ID: "worker-2", Store: store, StopCh: make(chan struct{})}
	w2.Start(resource)

	// 此时 Worker 1 还在,Worker 2 应该抢不到
	time.Sleep(2 * time.Second)

	// 模拟 Worker 1 宕机 (停止续租)
	fmt.Println("!!! Worker 1 Crashed !!!")
	close(w1.StopCh)

	// 等待租约过期 (Lease Duration 是 2s)
	time.Sleep(3 * time.Second)

	// Worker 2 应该成功抢占
	time.Sleep(2 * time.Second)
	close(w2.StopCh)
}

关键设计权衡 (Trade-offs)

1. 租约时长 (Lease TTL)

这是一个经典的权衡点:

  • TTL 太短:续租请求(RPC)频繁,对中心存储(如 Etcd)压力大;网络稍有抖动就会丢失租约,导致任务被不必要地重新调度。
  • TTL 太长:节点宕机后,故障恢复时间(MTTR)变长。系统必须等待 TTL 过期后,其他节点才能接管任务,导致这段时间内任务处理停滞。

2. 双主问题 (Split-Brain)

在分布式系统中,很难做到绝对的“唯一主”。 在上述代码中,如果 Worker 1 只是网络卡顿(GC Pause)了 3 秒,它的租约过期了,Worker 2 抢到了锁。此时 Worker 1 恢复,它可能还“自认为”持有锁,继续执行任务。 防抖设计:通常需要引入 Fencing Token(通常是版本号)。每次写操作都要带上租约的版本号,存储层拒绝旧版本的写入。

3. 撤销阈值 (Revoke Threshold)

除了 TTL,有些系统会引入 RevokeThreshold。例如,租约虽然是 10秒,但如果 Worker 连续 3 次心跳失败(比如 3秒),Manager 可能就会标记该节点为“疑似下线”,开始准备迁移任务,而不是死板地等到 10秒结束。这是一种主动的防抖优化

总结

基于租约的任务分发是分布式系统中最基础的模式之一。它通过简单的“抢占-续租”逻辑,解决了节点动态上下线的问题。但在生产环境中,必须仔细调整 TTL 和心跳频率,并配合版本号(Fencing)机制,才能在可用性和一致性之间找到平衡点。