异步爬虫中的自适应频率控制与自稳态设计
编写一个每秒能发出 1000 个请求的爬虫很简单,但编写一个能长期稳定运行、不被目标网站封禁且不拖垮对方服务的爬虫却很难。
传统的静态限流(如每秒固定 10 次)往往缺乏弹性:要么在对方服务器空闲时浪费了吞吐能力,要么在对方负载过高时因请求过快而被 429 封禁。本文将探讨如何在 Python 的 asyncio 环境下,设计一个基于反馈回路的自适应频率控制系统。
静态限流的局限
在 Python 中,我们可以使用 asyncio.Semaphore 或各种 Token Bucket 库来限制并发。
import asyncio
async def worker(sem):
async with sem:
# 发送请求
pass
# 限制最大并发数为 10
sem = asyncio.Semaphore(10)
这种代码的问题在于它不知道“疼”。当服务器返回 503 Service Unavailable 或 429 Too Many Requests 时,它依然会尝试以最大并发数去撞墙,直到 IP 被封禁。
自稳态设计:引入反馈回路
我们需要一个能够感知环境变化的系统。TCP 协议的拥塞控制(Congestion Control)给了我们要给很好的思路:AIMD(Additive Increase Multiplicative Decrease,和式增加,积式减少)。
- 冷启动:以极低的速率开始。
- 成功时:如果请求成功(200 OK),缓慢增加速率(线性增加)。
- 失败时:如果遇到限流(429/503),迅速降低速率(指数级减少)。
核心组件实现
我们将实现一个 AdaptiveLimiter。它不再简单的阻塞请求,而是动态调整 sleep 的间隔。
import asyncio
import time
from dataclasses import dataclass
@dataclass
class RateState:
interval: float # 当前请求间隔(秒)
min_interval: float = 0.1
max_interval: float = 10.0
backoff_factor: float = 2.0 # 遇到错误时,间隔扩大倍数
recovery_step: float = 0.05 # 成功时,间隔减少量
class AdaptiveLimiter:
def __init__(self, state: RateState):
self.state = state
self.lock = asyncio.Lock()
self.last_request_time = 0
async def wait(self):
"""在发送请求前调用,控制频率"""
async with self.lock:
now = time.time()
elapsed = now - self.last_request_time
wait_time = self.state.interval - elapsed
if wait_time > 0:
await asyncio.sleep(wait_time)
self.last_request_time = time.time()
async def feedback(self, success: bool):
"""根据请求结果调整频率"""
if success:
# 和式增加(即间隔减少),提升速度
new_interval = max(
self.state.min_interval,
self.state.interval - self.state.recovery_step
)
if new_interval != self.state.interval:
self.state.interval = new_interval
# print(f"Speeding up: interval={self.state.interval:.2f}s")
else:
# 积式减少(即间隔增大),降低速度
new_interval = min(
self.state.max_interval,
self.state.interval * self.state.backoff_factor
)
# 加上一点随机抖动,防止多个爬虫同时重试(Thundering Herd)
self.state.interval = new_interval
print(f"Backing off: interval={self.state.interval:.2f}s")
整合到 Crawler 中
在实际的爬虫循环中,我们需要将 Limiter 注入到请求流程中。
import aiohttp
import random
async def fetch_url(session, url, limiter):
# 1. 等待限流许可
await limiter.wait()
try:
async with session.get(url) as response:
status = response.status
if status == 200:
# 2. 成功反馈
await limiter.feedback(success=True)
return await response.text()
elif status in [429, 503]:
# 3. 拥塞反馈
await limiter.feedback(success=False)
return None
else:
# 其他错误通常不影响限流策略
return None
except Exception as e:
# 网络异常也视为需要退避的信号
await limiter.feedback(success=False)
return None
async def main():
state = RateState(interval=1.0) # 初始间隔 1秒
limiter = AdaptiveLimiter(state)
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(20):
url = f"http://httpbin.org/status/{200 if random.random() > 0.3 else 429}"
tasks.append(fetch_url(session, url, limiter))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
异步环境的挑战
在 Python asyncio 中实现这种逻辑有一个隐蔽的陷阱:并发滞后性。
当你有 100 个并发任务时,如果第 1 个任务触发了 429 错误并调用了 backoff,其他的 99 个任务可能已经处于 await limiter.wait() 之后的执行阶段,或者正在网络传输中。这意味着这 99 个请求依然会以旧的(过快的)速率打向服务器,导致后续大量的报错。
改进策略:
- 全局熔断器:一旦触发
backoff,设置一个短暂的全局标志,暂停所有新请求的发出,直到队列排空。 - 令牌桶结合:
AdaptiveLimiter不仅控制间隔,还可以动态调整令牌桶的容量。在发生 429 时,不仅增加间隔,还应瞬间清空桶内剩余令牌。
结语
构建自适应系统通过模仿生物的“痛感”反馈机制,使得爬虫能够寻找目标服务器的性能边界(Equilibrium Point)。这不仅保护了你的 IP 资产,也是对目标站点的基本尊重——只取其有余,不竭泽而渔。