← Back to Blog
EN中文

周期调度器的动态调整策略

在需要定期刷新配置、统计数据或缓存的系统中,如何设计一个既能动态调整更新周期、又能优雅处理错误的调度器?今天我们来分析一个工业级**周期更新调度器(Periodically Updated)**实现。

核心问题

传统的定时刷新机制通常有固定的时间间隔,但在生产环境中:

  • 正常运行时需要较长的刷新周期(减少开销)
  • 错误发生时需要更短的周期(快速恢复)
  • 需要支持运行时动态调整刷新间隔

设计方案:双周期 + 错误恢复

class TPeriodicallyUpdated {
protected:
    virtual bool UpdateImpl() = 0;  // 子类实现具体更新逻辑
    
    void SetReloadDuration(TDuration reloadDuration);  // 动态调整
    
private:
    TDuration mReloadDuration;        // 正常刷新周期
    TDuration mErrorReloadDuration;   // 错误刷新周期
    TDuration mCheckPeriod;           // 检查间隔
};

核心机制

  1. 双周期策略

    • 正常情况:使用较长周期 mReloadDuration
    • 错误情况:使用较短周期 mErrorReloadDuration
  2. 首次更新等待

    bool WaitForFirstUpdate(unsigned timeoutSeconds);

    等待首次更新完成,确保系统初始化完毕

  3. 错误恢复

    catch (yexception &ex) {
        mNextReloadTime = TInstant::Now() + mErrorReloadDuration;
    }

权衡分析

优势

  • 动态调整:运行时可修改刷新周期
  • 错误感知:自动切换到快速恢复模式
  • 线程安全:使用 mutex + condition variable

代价

  • 线程开销:独立的后台线程
  • 复杂性:需要处理首次更新等待
  • 资源管理:需要显式调用 Stop()

净室重构:Rust 实现

use std::sync::{Arc, Mutex, Condvar};
use std::time::{Duration, Instant};
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};

pub struct PeriodicallyUpdated<F>
where
    F: FnMut() -> bool + Send + 'static,
{
    reload_duration: Duration,
    error_reload_duration: Duration,
    check_period: Duration,
    
    stopping: Arc<AtomicBool>,
    updated: Arc<AtomicBool>,
    updated_cond: Condvar,
    
    next_reload_time: Mutex<Instant>,
    
    updater: Option<thread::JoinHandle<()>>,
    update_fn: Arc<Mutex<F>>,
}

impl<F> PeriodicallyUpdated<F>
where
    F: FnMut() -> bool + Send + 'static,
{
    pub fn new(
        reload_duration: Duration,
        error_reload_duration: Duration,
        update_fn: F,
    ) -> Self {
        PeriodicallyUpdated {
            reload_duration,
            error_reload_duration,
            check_period: Duration::from_secs(2),
            stopping: Arc::new(AtomicBool::new(false)),
            updated: Arc::new(AtomicBool::new(false)),
            updated_cond: Condvar::new(),
            next_reload_time: Mutex::new(Instant::now()),
            updater: None,
            update_fn: Arc::new(Mutex::new(update_fn)),
        }
    }

    pub fn start(&mut self) {
        let stopping = self.stopping.clone();
        let updated = self.updated.clone();
        let updated_cond = self.updated_cond.clone();
        let next_reload = Arc::new(Mutex::new(Instant::now()));
        let next_reload_clone = next_reload.clone();
        
        let reload_duration = self.reload_duration;
        let error_reload_duration = self.error_reload_duration;
        let check_period = self.check_period;
        let update_fn = self.update_fn.clone();

        self.updater = Some(thread::spawn(move || {
            while !stopping.load(Ordering::SeqCst) {
                let should_update = {
                    let next = *next_reload_clone.lock().unwrap();
                    Instant::now() > next
                };
                
                if should_update {
                    let success = {
                        let mut fn_lock = update_fn.lock().unwrap();
                        fn_lock()
                    };
                    
                    let mut next_time = next_reload_clone.lock().unwrap();
                    if success {
                        *next_time = Instant::now() + reload_duration;
                    } else {
                        *next_time = Instant::now() + error_reload_duration;
                    }
                    
                    if !updated.load(Ordering::SeqCst) {
                        updated.store(true, Ordering::SeqCst);
                        updated_cond.notify_all();
                    }
                }
                
                thread::sleep(check_period);
            }
        }));
    }

    pub fn wait_for_first_update(&self, timeout: Duration) -> bool {
        let mut updated = self.updated.lock().unwrap();
        while !*updated && !self.stopping.load(Ordering::SeqCst) {
            updated = self.updated_cond.wait_timeout(updated, timeout).unwrap().0;
        }
        *updated
    }

    pub fn stop(&mut self) {
        self.stopping.store(true, Ordering::SeqCst);
        if let Some(handle) = self.updater.take() {
            let _ = handle.join();
        }
    }

    pub fn set_reload_duration(&mut self, duration: Duration) {
        self.reload_duration = duration;
    }
}

总结

周期更新调度器的动态调整策略体现了自适应系统的设计思想:

  1. 双周期模式:正常与错误使用不同周期
  2. 首次更新等待:确保初始化完成
  3. 错误恢复:异常后自动切换到快速模式

这种设计在配置热更新、缓存刷新等场景中非常有效。