← Back to Blog
EN中文

锁的颗粒度艺术:并发哈希集合的分段分片与预哈希优化

在构建高并发系统时,标准库提供的 Mutex<HashMap<K, V>> 往往是性能杀手。当数十个线程同时争抢一把全局大锁时,CPU 的时间大多浪费在了上下文切换和锁等待上,而非真正的业务逻辑。

今天我们探讨一种在工业界被广泛验证的模式:分段锁(Lock Sharding)预哈希(Pre-hashing) 的组合拳。我们将使用 Rust 来演示如何构建一个 ShardedHashSet,在保证线程安全的同时,将锁的争用降至最低。

核心设计:分而治之

分段锁的核心思想极其朴素:既然一把大锁太慢,那就把它拆成 N 把小锁。

我们将整个哈希空间划分为 $N$ 个分片(Shard)。每个分片是一个独立的 RwLock<HashSet<T>>。当我们需要插入或查找一个元素时:

  1. 计算元素的哈希值。
  2. 根据 hash % shard_count 确定它属于哪个分片。
  3. 仅对该特定分片加锁。

只要数据分布均匀,理论上并发度可以提升 $N$ 倍。

预哈希(Pre-hashing):计算一次,到处使用

在分段设计中,哈希值的计算至关重要。一个常见的陷阱是:在定位分片时计算一次哈希,在分片内部的 HashSet 插入时又计算一次哈希。如果 Key 是长字符串或复杂对象,这种重复计算极其昂贵。

为了极致性能,我们引入 PreHashed 结构,将哈希值缓存起来,随 Key 一起传递。

Rust 实现

我们将使用 parking_lot 库,因为它的 RwLock 比标准库更轻量且支持更好的等待策略。

use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use parking_lot::RwLock;

// 预哈希包装器:携带预计算的哈希值
#[derive(Debug, Clone)]
struct PreHashed<T> {
    value: T,
    hash: u64,
}

impl<T: Hash> PreHashed<T> {
    fn new(value: T) -> Self {
        let mut hasher = DefaultHasher::new();
        value.hash(&mut hasher);
        let hash = hasher.finish();
        Self { value, hash }
    }
}

// 为了让 HashSet 使用我们预计算的哈希值,我们需要自定义 Hash 实现
// 注意:这里我们假设 HashSet 的内部 Hasher 是透传的或者我们仅用它做分片路由
// 在生产环境中,更严谨的做法是实现一个不重新哈希的 Hasher,
// 或者仅在分片路由层使用这个 hash 值。
// 
// 下面的代码演示主要关注分片逻辑。

struct ShardedHashSet<T> {
    shards: Vec<RwLock<HashSet<T>>>,
    shard_mask: usize,
}

impl<T: Hash + Eq + Clone> ShardedHashSet<T> {
    pub fn new(concurrency: usize) -> Self {
        // 确保分片数是 2 的幂,方便位运算
        let shard_count = concurrency.next_power_of_two();
        let mut shards = Vec::with_capacity(shard_count);
        for _ in 0..shard_count {
            shards.push(RwLock::new(HashSet::new()));
        }

        Self {
            shards,
            shard_mask: shard_count - 1,
        }
    }

    fn get_shard_index(&self, hash: u64) -> usize {
        (hash as usize) & self.shard_mask
    }

    pub fn insert(&self, value: T) -> bool {
        // 1. 预计算哈希(只计算一次)
        let pre_hashed = PreHashed::new(value);
        
        // 2. 定位分片
        let idx = self.get_shard_index(pre_hashed.hash);
        let shard = &self.shards[idx];

        // 3. 锁定特定分片并写入
        // 注意:标准库 HashSet 还会再次哈希,但锁的粒度已经细化了
        let mut guard = shard.write();
        guard.insert(pre_hashed.value)
    }

    pub fn contains(&self, value: &T) -> bool {
        let mut hasher = DefaultHasher::new();
        value.hash(&mut hasher);
        let hash = hasher.finish();

        let idx = self.get_shard_index(hash);
        let shard = &self.shards[idx];

        let guard = shard.read();
        guard.contains(value)
    }
}

// 模拟高并发场景
fn main() {
    let set = Arc::new(ShardedHashSet::new(64));
    let mut handles = vec![];

    for i in 0..10 {
        let set_ref = set.clone();
        handles.push(std::thread::spawn(move || {
            for j in 0..1000 {
                set_ref.insert(format!("key-{}-{}", i, j));
            }
        }));
    }

    for h in handles {
        h.join().unwrap();
    }
    
    println!("Done. Check logical size requires iterating all shards.");
}

深度解析

1. 锁的粒度与开销

在上述代码中,如果我们将 concurrency 设为 64,理论上就将锁的冲突概率降低了 64 倍。这对于读取密集型(Read-Heavy)或混合负载极其有利。相比于 ConcurrentHashMap(如 Java 或某些 C++ 实现)复杂的无锁(Lock-Free)CAS 操作,分段锁在实现难度和性能之间取得了一个极佳的平衡点。它避免了复杂的内存序问题,同时提供了比全局锁好得多的扩展性。

2. 扩容的挑战

分段设计最大的痛点在于全量扩容。当单个分片内的 HashSet 需要扩容时,它只锁住自己,这很好。但如果整个 ShardedHashSet 需要增加分片数量(例如从 64 增加到 128),这通常需要“停顿世界”(Stop-the-World),锁住所有分片进行重新哈希。 因此,在工业实践中(如某些 C++ 的 TConcurrentHashSet),通常会预先分配足够多的分片(BucketCount),或者接受单个分片内部哈希表无限增长,以此来避免全局重哈希的昂贵代价。

3. 适用场景

  • 缓存系统:如内存中的对象缓存,读多写少,且 Key 分布随机。
  • 去重服务:高并发写入流式数据进行实时去重。

通过这种“空间换时间”和“化整为零”的策略,我们能够用最简单的同步原语构建出高性能的并发数据结构。这正是系统编程中“简单即是美”的体现。