← Back to Blog
EN中文

滑动窗口平滑算法的工业实现

在机器人和物联网设备中,传感器数据(如电压、温度、CPU负载)往往伴随着高频噪声。为了获得稳定的读数,我们通常需要对数据进行平滑处理。最直观的方法是记录过去 N 秒的所有数据点并计算平均值,但这种方法在数据量大时会消耗大量内存和计算资源(O(N))。

本文将介绍一种在工业级机器人库中广泛使用的 O(1) 增量平滑算法,并使用 Python 进行演示。

核心思想:指数衰减与增量更新

该算法的核心在于不存储历史窗口内的所有原始数据,而是维护当前平滑值(Value)、累积变化量(Delta)以及上一次更新的时间戳(LastTimestamp)。

当新的数据到来或需要读取当前值时,算法根据经过的时间 dt 计算衰减系数。

公式简述: NewValue = OldValue * DecayFactor + Input * (1 - DecayFactor)

但在更复杂的实现中(如某大型搜索公司的 TSmoothingValue),它不仅平滑值本身,还平滑“变化率”,使得输出对趋势的响应更加灵敏且平滑。

净室实现(Python)

我们将实现一个简化版的 SmoothingValue 类,重点展示如何处理时间窗口衰减。

1. 类定义与初始化

import time
import math

class SmoothingValue:
    def __init__(self, window_size_seconds=10.0):
        self.window = window_size_seconds
        self.value = 0.0
        self.last_ts = time.time()
        self.initialized = False

    def update(self, new_sample):
        now = time.time()
        
        if not self.initialized:
            self.value = new_sample
            self.last_ts = now
            self.initialized = True
            return

        dt = now - self.last_ts
        if dt < 0: dt = 0  # 防止时间回拨

        # 计算衰减系数 alpha
        # 当 dt 趋近于 window 时,alpha 趋近于 0(旧值被遗忘)
        # 当 dt 趋近于 0 时,alpha 趋近于 1(保持旧值)
        # 典型的指数移动平均 (EMA) 公式: alpha = exp(-dt / window)
        alpha = math.exp(-dt / self.window)

        # 更新平滑值
        self.value = self.value * alpha + new_sample * (1 - alpha)
        self.last_ts = now

    def get(self):
        # 获取当前值时,也可以根据时间进行衰减(如果没有新数据)
        # 这里简化为直接返回最后更新的值
        return self.value

2. 为什么比 sum(list) / len(list) 好?

假设机器人以 100Hz 的频率运行,10秒的窗口意味着我们需要存储 1000 个浮点数。

  • 普通平均:每次计算都需要遍历 1000 个数求和,复杂度 O(N)。
  • 增量平滑:只需要存储 valuelast_ts 两个变量,每次更新仅需几次浮点运算,复杂度 O(1)。

在资源受限的嵌入式控制器上,O(1) 的算法是刚需。

3. 应用示例:平滑 CPU 负载

import random
import time

def simulate_cpu_load():
    # 模拟一个有噪声的 CPU 负载信号
    base_load = 50.0
    noise = random.uniform(-10, 10)
    return base_load + noise

smoother = SmoothingValue(window_size_seconds=5.0)

print("Time\tRaw\tSmoothed")
for _ in range(20):
    raw = simulate_cpu_load()
    smoother.update(raw)
    print(f"{time.time():.2f}\t{raw:.1f}\t{smoother.get():.1f}")
    time.sleep(0.5)

工业级细节

在实际的 C++ 库实现中(如 TSmoothingValue),还会考虑:

  1. 非线性时间流逝:处理系统时间跳变或暂停的情况。
  2. 加权平均:对于某些传感器,最新的数据权重可能需要更高。
  3. 线程安全:在多线程读写时加锁(如 std::atomicstd::mutex)。

通过这种方式,我们可以在极低的开销下,获得高质量的平滑信号,为机器人的 PID 控制或状态监测提供可靠输入。