← Back to Blog

Cross-Process State Migration: Atomicity Checks and Lease Mechanisms

In distributed systems, the biggest headache isn't "how to transfer data," but "how to ensure data integrity." Especially when dealing with Cross-Process State Migration—like transferring ownership of a task from Worker A to Global Storage, or from Worker A to Worker B—we must face the ultimate challenge of Atomicity.

What if Node A crashes halfway through a write? What if a network partition makes A think it succeeded, but the global state wasn't updated? Today, let's explore how to leverage Rust's type system and Lease Mechanisms to build a robust safety layer for atomic checks.

The Core Problem: Zombie Workers and Split-Brain

In a typical distributed task processing system, a Worker claims a task, processes it, and updates the global state. The biggest risk here is the Zombie Worker: a Worker that has lost ownership of a task due to network issues (Lease Expired), but doesn't know it, and still attempts to write stale state, potentially overwriting valid writes from a new Owner.

We need two mechanisms for defense:

  1. Lease: A lock with an automatic expiration time.
  2. Atomic Commit: All or nothing.

Clean Room Implementation (Rust): Leveraging the Type System for Safety

Rust's Ownership and Drop semantics are perfectly suited for this scenario. We can design a LeaseGuard where, once the lease expires or is Dropped, no further write operations are possible.

1. Defining State and Leases

First, we define a simulated global store and lease manager.

use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::collections::HashMap;

#[derive(Debug, Clone)]
struct TaskState {
    id: u64,
    data: String,
    version: u64, // For optimistic locking control
}

// Simulated Distributed Storage
struct GlobalStore {
    // Key: TaskID, Value: (OwnerID, LeaseExpiry, State)
    tasks: Mutex<HashMap<u64, (String, Instant, TaskState)>>,
}

impl GlobalStore {
    fn new() -> Self {
        GlobalStore {
            tasks: Mutex::new(HashMap::new()),
        }
    }

    // Try to acquire a task lease
    fn acquire_lease(&self, task_id: u64, worker_id: String, duration: Duration) -> Result<TaskState, String> {
        let mut store = self.tasks.lock().unwrap();
        let now = Instant::now();

        if let Some((owner, expiry, state)) = store.get(&task_id) {
            // If lease hasn't expired and owner isn't self, reject
            if *expiry > now && owner != &worker_id {
                return Err(format!("Task locked by {} until {:?}", owner, expiry));
            }
        }

        // Grant lease
        let initial_state = store.entry(task_id).or_insert((
            worker_id.clone(),
            now + duration,
            TaskState { id: task_id, data: "".to_string(), version: 0 },
        ));
        
        // Update lease time
        initial_state.0 = worker_id;
        initial_state.1 = now + duration;
        
        Ok(initial_state.2.clone())
    }

    // Atomic Commit: Check if lease is still valid + version match
    fn commit(&self, task_id: u64, worker_id: String, new_state: TaskState) -> Result<(), String> {
        let mut store = self.tasks.lock().unwrap();
        let now = Instant::now();

        if let Some((owner, expiry, current_state)) = store.get_mut(&task_id) {
            // Critical Check 1: Is lease expired?
            if *expiry < now {
                return Err("Lease expired during processing".to_string());
            }
            // Critical Check 2: Is Owner still me? (Prevent Split-Brain)
            if owner != &worker_id {
                return Err("Lease stolen by another worker".to_string());
            }
            // Critical Check 3: Is version sequential?
            if current_state.version + 1 != new_state.version {
                 return Err("Optimistic lock failure: version mismatch".to_string());
            }

            // Safe Commit
            *current_state = new_state;
            println!("Commit success for task {}", task_id);
            Ok(())
        } else {
            Err("Task not found".to_string())
        }
    }
}

2. Encapsulating Safe Worker Logic

Now we write the Worker logic. Here we simulate a "processing delay" to see how the lease mechanism protects data.

struct Worker {
    id: String,
    store: Arc<GlobalStore>,
}

impl Worker {
    fn process_task(&self, task_id: u64) {
        // 1. Acquire Lease
        let lease_duration = Duration::from_millis(500);
        match self.store.acquire_lease(task_id, self.id.clone(), lease_duration) {
            Ok(mut state) => {
                println!("[{}] Acquired lease for task {}", self.id, task_id);

                // 2. Simulate Local Processing (State Migration / Computation)
                // Assume processing takes 300ms (safe) or 600ms (expired)
                std::thread::sleep(Duration::from_millis(300)); 

                // 3. Modify State
                state.data = format!("Processed by {}", self.id);
                state.version += 1;

                // 4. Attempt Atomic Commit
                match self.store.commit(task_id, self.id.clone(), state) {
                    Ok(_) => println!("[{}] Task {} migration complete.", self.id, task_id),
                    Err(e) => eprintln!("[{}] Commit failed: {}", self.id, e),
                }
            }
            Err(e) => eprintln!("[{}] Failed to acquire lease: {}", self.id, e),
        }
    }
}

3. RAII and Commit-or-Revert

In a more advanced Rust implementation, we would wrap TaskState in a LeaseGuard<'a>. When LeaseGuard is dropped, if it hasn't been commited, it can automatically trigger rollback logic or log an audit entry.

// Pseudo-code example:
// let mut guard = store.acquire_guard(task_id)?;
// guard.data = "new data";
// guard.commit()?; // Consumes the guard, only explicit commit takes effect
// If panic or early return happens here, guard is dropped, update is automatically abandoned.

This pattern leverages Rust's affine types (linear types), forcing developers to explicitly handle the terminal state of a migration, completely eliminating low-level errors like "forgetting to commit" or "forgetting to release lock."

Conclusion

The safety of cross-process state migration cannot be guaranteed merely by "careful coding"; it must rely on mechanisms:

  1. Fencing Token / Epoch: In our example, version, ensuring an old Leader cannot overwrite a new Leader's writes.
  2. Lease Duration: The lease must be shorter than any tolerable downtime recovery time (TTL).
  3. Check-on-Write: Validity of the lease must be verified again at the moment of writing (Double Check).

In Rust, we can encode this logic into the type system, letting the compiler reject unsafe state migration code for us. That is the romance of systems programming.