周期调度器的动态调整策略
在需要定期刷新配置、统计数据或缓存的系统中,如何设计一个既能动态调整更新周期、又能优雅处理错误的调度器?今天我们来分析一个工业级**周期更新调度器(Periodically Updated)**实现。
核心问题
传统的定时刷新机制通常有固定的时间间隔,但在生产环境中:
- 正常运行时需要较长的刷新周期(减少开销)
- 错误发生时需要更短的周期(快速恢复)
- 需要支持运行时动态调整刷新间隔
设计方案:双周期 + 错误恢复
class TPeriodicallyUpdated {
protected:
virtual bool UpdateImpl() = 0; // 子类实现具体更新逻辑
void SetReloadDuration(TDuration reloadDuration); // 动态调整
private:
TDuration mReloadDuration; // 正常刷新周期
TDuration mErrorReloadDuration; // 错误刷新周期
TDuration mCheckPeriod; // 检查间隔
};
核心机制
双周期策略:
- 正常情况:使用较长周期
mReloadDuration - 错误情况:使用较短周期
mErrorReloadDuration
- 正常情况:使用较长周期
首次更新等待:
bool WaitForFirstUpdate(unsigned timeoutSeconds);等待首次更新完成,确保系统初始化完毕
错误恢复:
catch (yexception &ex) { mNextReloadTime = TInstant::Now() + mErrorReloadDuration; }
权衡分析
优势
- 动态调整:运行时可修改刷新周期
- 错误感知:自动切换到快速恢复模式
- 线程安全:使用 mutex + condition variable
代价
- 线程开销:独立的后台线程
- 复杂性:需要处理首次更新等待
- 资源管理:需要显式调用 Stop()
净室重构:Rust 实现
use std::sync::{Arc, Mutex, Condvar};
use std::time::{Duration, Instant};
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};
pub struct PeriodicallyUpdated<F>
where
F: FnMut() -> bool + Send + 'static,
{
reload_duration: Duration,
error_reload_duration: Duration,
check_period: Duration,
stopping: Arc<AtomicBool>,
updated: Arc<AtomicBool>,
updated_cond: Condvar,
next_reload_time: Mutex<Instant>,
updater: Option<thread::JoinHandle<()>>,
update_fn: Arc<Mutex<F>>,
}
impl<F> PeriodicallyUpdated<F>
where
F: FnMut() -> bool + Send + 'static,
{
pub fn new(
reload_duration: Duration,
error_reload_duration: Duration,
update_fn: F,
) -> Self {
PeriodicallyUpdated {
reload_duration,
error_reload_duration,
check_period: Duration::from_secs(2),
stopping: Arc::new(AtomicBool::new(false)),
updated: Arc::new(AtomicBool::new(false)),
updated_cond: Condvar::new(),
next_reload_time: Mutex::new(Instant::now()),
updater: None,
update_fn: Arc::new(Mutex::new(update_fn)),
}
}
pub fn start(&mut self) {
let stopping = self.stopping.clone();
let updated = self.updated.clone();
let updated_cond = self.updated_cond.clone();
let next_reload = Arc::new(Mutex::new(Instant::now()));
let next_reload_clone = next_reload.clone();
let reload_duration = self.reload_duration;
let error_reload_duration = self.error_reload_duration;
let check_period = self.check_period;
let update_fn = self.update_fn.clone();
self.updater = Some(thread::spawn(move || {
while !stopping.load(Ordering::SeqCst) {
let should_update = {
let next = *next_reload_clone.lock().unwrap();
Instant::now() > next
};
if should_update {
let success = {
let mut fn_lock = update_fn.lock().unwrap();
fn_lock()
};
let mut next_time = next_reload_clone.lock().unwrap();
if success {
*next_time = Instant::now() + reload_duration;
} else {
*next_time = Instant::now() + error_reload_duration;
}
if !updated.load(Ordering::SeqCst) {
updated.store(true, Ordering::SeqCst);
updated_cond.notify_all();
}
}
thread::sleep(check_period);
}
}));
}
pub fn wait_for_first_update(&self, timeout: Duration) -> bool {
let mut updated = self.updated.lock().unwrap();
while !*updated && !self.stopping.load(Ordering::SeqCst) {
updated = self.updated_cond.wait_timeout(updated, timeout).unwrap().0;
}
*updated
}
pub fn stop(&mut self) {
self.stopping.store(true, Ordering::SeqCst);
if let Some(handle) = self.updater.take() {
let _ = handle.join();
}
}
pub fn set_reload_duration(&mut self, duration: Duration) {
self.reload_duration = duration;
}
}
总结
周期更新调度器的动态调整策略体现了自适应系统的设计思想:
- 双周期模式:正常与错误使用不同周期
- 首次更新等待:确保初始化完成
- 错误恢复:异常后自动切换到快速模式
这种设计在配置热更新、缓存刷新等场景中非常有效。
系列: Arch (89/90)
系列页
▼