Industrial Implementation of Sliding Window Smoothing
In robotics and IoT devices, sensor data (such as voltage, temperature, CPU load) is often accompanied by high-frequency noise. To obtain stable readings, we typically need to smooth the data. The most intuitive method is to record all data points from the past N seconds and calculate the average, but this approach consumes significant memory and computational resources (O(N)) as data volume increases.
This article introduces an O(1) incremental smoothing algorithm widely used in industrial robotics libraries and demonstrates it using Python.
Core Idea: Exponential Decay and Incremental Updates
The core of this algorithm is not to store all raw data within the history window, but to maintain the current smoothed value (Value), cumulative change (Delta), and the timestamp of the last update (LastTimestamp).
When new data arrives or the current value needs to be read, the algorithm calculates a decay factor based on the elapsed time dt.
Simplified formula:
NewValue = OldValue * DecayFactor + Input * (1 - DecayFactor)
However, in more complex implementations (such as a large search company's TSmoothingValue), it smoothes not only the value itself but also the "rate of change," making the output more responsive and smooth to trends.
Clean Room Implementation (Python)
We will implement a simplified version of the SmoothingValue class, focusing on handling time window decay.
1. Class Definition and Initialization
import time
import math
class SmoothingValue:
def __init__(self, window_size_seconds=10.0):
self.window = window_size_seconds
self.value = 0.0
self.last_ts = time.time()
self.initialized = False
def update(self, new_sample):
now = time.time()
if not self.initialized:
self.value = new_sample
self.last_ts = now
self.initialized = True
return
dt = now - self.last_ts
if dt < 0: dt = 0 # Prevent time regression
# Calculate decay factor alpha
# As dt approaches window, alpha approaches 0 (old value forgotten)
# As dt approaches 0, alpha approaches 1 (keep old value)
# Typical Exponential Moving Average (EMA) formula: alpha = exp(-dt / window)
alpha = math.exp(-dt / self.window)
# Update smoothed value
self.value = self.value * alpha + new_sample * (1 - alpha)
self.last_ts = now
def get(self):
# When getting the value, one could also decay based on time elapsed
# (if no new data). Here simplified to just return last updated value.
return self.value
2. Why Better Than sum(list) / len(list)?
Suppose a robot runs at 100Hz. A 10-second window means we need to store 1000 floating-point numbers.
- Simple Average: Every calculation requires traversing 1000 numbers to sum them, O(N) complexity.
- Incremental Smoothing: Only need to store
valueandlast_tsvariables. Each update takes a few floating-point operations, O(1) complexity.
On resource-constrained embedded controllers, O(1) algorithms are a must-have.
3. Application Example: Smoothing CPU Load
import random
import time
def simulate_cpu_load():
# Simulate a noisy CPU load signal
base_load = 50.0
noise = random.uniform(-10, 10)
return base_load + noise
smoother = SmoothingValue(window_size_seconds=5.0)
print("Time\tRaw\tSmoothed")
for _ in range(20):
raw = simulate_cpu_load()
smoother.update(raw)
print(f"{time.time():.2f}\t{raw:.1f}\t{smoother.get():.1f}")
time.sleep(0.5)
Industrial-Grade Details
In actual C++ library implementations (like TSmoothingValue), additional considerations include:
- Non-linear Time: Handling system time jumps or pauses.
- Weighted Average: For some sensors, newer data might need higher weight.
- Thread Safety: Locking during multi-threaded read/write (e.g.,
std::atomicorstd::mutex).
Through this approach, we can obtain high-quality smoothed signals with extremely low overhead, providing reliable input for robot PID control or status monitoring.