← Back to Blog
EN中文

同步的艺术:工业级文件队列的冲突解决策略

在分布式文件同步系统中,如何确保文件可靠地从 A 同步到 B?当同步失败时,是直接丢弃还是重试?如果重试,如何避免重复处理?这些问题构成了文件同步队列的核心挑战。今天我们深入分析一个工业级文件同步队列的实现,看看它如何用简洁的设计解决复杂的分布式问题。

问题的本质:可靠投递与冲突处理

文件同步面临的核心挑战:

  1. 可靠性:确保文件最终能同步成功
  2. 重复检测:避免重复处理导致的副作用
  3. 冲突解决:当多个节点同时处理时如何决策

工业级系统的解决方案是基于数据库的队列 + 尝试次数计数

  • 使用数据库作为持久化存储,保证不丢失
  • 使用尝试次数(Attempt)计数器追踪重试次数
  • 使用"返回"机制处理失败而非直接丢弃

工业级实现的核心设计

在某工业级云盘系统中,我找到了一个经过多年迭代的文件同步队列模块。它的设计选择非常务实:

设计一:数据库-backed 队列

class TDatasyncQueueClient: public TDBEntities<TDatasyncQueueEntry> {
    TMap<TString, TDatasyncQueueEntry> Get(ui64 amount, ...) const;
    bool Delete(const TString& assignmentId, ...) const;
    bool Push(const TString& assignmentId, ...) const;
    bool Return(TDatasyncQueueEntry assignment, ...) const;
};

选择:使用数据库作为队列的持久化层

权衡考量

  • 优点:数据不丢失,支持事务,跨进程共享
  • 缺点:相比内存队列延迟更高,存在数据库瓶颈
  • 适用场景:需要高可靠性的生产环境

设计二:尝试次数计数器

bool TDatasyncQueueClient::Return(TDatasyncQueueEntry assignment, ...) {
    assignment.SetProcessAt(TInstant::Now());
    assignment.SetAttempt(assignment.GetAttempt() + 1);
    return Upsert(assignment, session);
}

选择:失败时增加 Attempt 计数并重新入队

权衡考量

  • 优点:实现简单,提供重试机制,可追踪失败次数
  • 缺点:只能提供"至少一次"投递,无法保证"恰好一次"
  • 适用场景:幂等操作或可重复执行的任务

设计三:会话机制

NDrive::TEntitySession TDatasyncQueueClient::BuildSession(bool readonly) const {
    return NDrive::TEntitySession(Database->CreateTransaction(readonly));
}

选择:使用会话封装数据库事务

权衡考量

  • 优点:统一的事务管理,简化错误处理
  • 缺点:增加调用复杂度,需要正确管理生命周期

净室重构:Go 实现

为了展示设计思想,我用 Go 重新实现了核心逻辑:

package main

import (
	"fmt"
	"sync"
	"time"
)

type QueueEntry struct {
	AssignmentID string
	CreatedAt   time.Time
	ProcessAt   time.Time
	Attempt     int
}

type QueueClient struct {
	mu   sync.Mutex
	data map[string]*QueueEntry
}

func NewQueueClient() *QueueClient {
	return &QueueClient{data: make(map[string]*QueueEntry)}
}

func (c *QueueClient) Get(amount int) map[string]*QueueEntry {
	c.mu.Lock()
	defer c.mu.Unlock()

	result := make(map[string]*QueueEntry)
	now := time.Now()

	for id, entry := range c.data {
		if len(result) >= amount {
			break
		}
		if !entry.ProcessAt.After(now) {
			result[id] = entry
		}
	}
	return result
}

func (c *QueueClient) Delete(assignmentID string) bool {
	c.mu.Lock()
	defer c.mu.Unlock()

	if _, ok := c.data[assignmentID]; ok {
		delete(c.data, assignmentID)
		return true
	}
	return false
}

func (c *QueueClient) Push(assignmentID string) bool {
	c.mu.Lock()
	defer c.mu.Unlock()

	now := time.Now()
	c.data[assignmentID] = &QueueEntry{
		AssignmentID: assignmentID,
		CreatedAt:   now,
		ProcessAt:   now,
		Attempt:     1,
	}
	return true
}

// Return 是冲突解决的核心:失败后重新入队
func (c *QueueClient) Return(entry QueueEntry) bool {
	c.mu.Lock()
	defer c.mu.Unlock()

	now := time.Now()
	entry.ProcessAt = now
	entry.Attempt++  // 关键:增加尝试次数

	c.data[entry.AssignmentID] = &entry
	return true
}

func (c *QueueClient) Size() int {
	c.mu.Lock()
	defer c.mu.Unlock()
	return len(c.data)
}

func main() {
	client := NewQueueClient()

	// Push entries
	client.Push("file-001")
	client.Push("file-002")
	fmt.Printf("Queue size: %d\n", client.Size())

	// Get and process
	entries := client.Get(2)
	for id, entry := range entries {
		fmt.Printf("Processing: %s (attempt %d)\n", id, entry.Attempt)
		client.Delete(id)
	}

	// Simulate failure - use Return
	failedEntry := QueueEntry{AssignmentID: "file-002", Attempt: 1}
	client.Return(failedEntry)
	fmt.Printf("After Return, queue size: %d\n", client.Size())

	// Verify attempt counter increased
	entries = client.Get(10)
	for id, entry := range entries {
		fmt.Printf("Entry: %s, Attempt: %d\n", id, entry.Attempt)
	}
}

运行结果:

Queue size: 2
Processing: file-001 (attempt 1)
Processing: file-002 (attempt 1)
After Return, queue size: 1
Entry: file-002, Attempt: 2

何时使用数据库-backed 队列

适合场景

  • 需要高可靠性,不能丢失数据
  • 跨多个进程或服务共享队列
  • 需要持久化队列状态用于恢复

不适合场景

  • 超低延迟要求的场景
  • 纯内存队列足够的小型系统
  • 已有消息队列基础设施

总结

工业级文件同步队列的设计充满权衡:

  • 数据库-backed vs 内存队列:可靠性 vs 性能
  • 至少一次 vs 恰好一次:简单性 vs 准确性
  • 尝试计数器 vs 去重表:轻量 vs 精确

在 Go 中,我们可以更简洁地实现类似设计,但核心权衡是相同的——没有完美的方案,只有适合场景的选择。