← Back to Blog
EN中文

异步爬虫中的自适应频率控制与自稳态设计

编写一个每秒能发出 1000 个请求的爬虫很简单,但编写一个能长期稳定运行、不被目标网站封禁且不拖垮对方服务的爬虫却很难。

传统的静态限流(如每秒固定 10 次)往往缺乏弹性:要么在对方服务器空闲时浪费了吞吐能力,要么在对方负载过高时因请求过快而被 429 封禁。本文将探讨如何在 Python 的 asyncio 环境下,设计一个基于反馈回路的自适应频率控制系统。

静态限流的局限

在 Python 中,我们可以使用 asyncio.Semaphore 或各种 Token Bucket 库来限制并发。

import asyncio

async def worker(sem):
    async with sem:
        # 发送请求
        pass

# 限制最大并发数为 10
sem = asyncio.Semaphore(10)

这种代码的问题在于它不知道“疼”。当服务器返回 503 Service Unavailable429 Too Many Requests 时,它依然会尝试以最大并发数去撞墙,直到 IP 被封禁。

自稳态设计:引入反馈回路

我们需要一个能够感知环境变化的系统。TCP 协议的拥塞控制(Congestion Control)给了我们要给很好的思路:AIMD(Additive Increase Multiplicative Decrease,和式增加,积式减少)

  1. 冷启动:以极低的速率开始。
  2. 成功时:如果请求成功(200 OK),缓慢增加速率(线性增加)。
  3. 失败时:如果遇到限流(429/503),迅速降低速率(指数级减少)。

核心组件实现

我们将实现一个 AdaptiveLimiter。它不再简单的阻塞请求,而是动态调整 sleep 的间隔。

import asyncio
import time
from dataclasses import dataclass

@dataclass
class RateState:
    interval: float  # 当前请求间隔(秒)
    min_interval: float = 0.1
    max_interval: float = 10.0
    backoff_factor: float = 2.0 # 遇到错误时,间隔扩大倍数
    recovery_step: float = 0.05 # 成功时,间隔减少量

class AdaptiveLimiter:
    def __init__(self, state: RateState):
        self.state = state
        self.lock = asyncio.Lock()
        self.last_request_time = 0

    async def wait(self):
        """在发送请求前调用,控制频率"""
        async with self.lock:
            now = time.time()
            elapsed = now - self.last_request_time
            wait_time = self.state.interval - elapsed
            
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            
            self.last_request_time = time.time()

    async def feedback(self, success: bool):
        """根据请求结果调整频率"""
        if success:
            # 和式增加(即间隔减少),提升速度
            new_interval = max(
                self.state.min_interval, 
                self.state.interval - self.state.recovery_step
            )
            if new_interval != self.state.interval:
                self.state.interval = new_interval
                # print(f"Speeding up: interval={self.state.interval:.2f}s")
        else:
            # 积式减少(即间隔增大),降低速度
            new_interval = min(
                self.state.max_interval, 
                self.state.interval * self.state.backoff_factor
            )
            # 加上一点随机抖动,防止多个爬虫同时重试(Thundering Herd)
            self.state.interval = new_interval
            print(f"Backing off: interval={self.state.interval:.2f}s")

整合到 Crawler 中

在实际的爬虫循环中,我们需要将 Limiter 注入到请求流程中。

import aiohttp
import random

async def fetch_url(session, url, limiter):
    # 1. 等待限流许可
    await limiter.wait()
    
    try:
        async with session.get(url) as response:
            status = response.status
            
            if status == 200:
                # 2. 成功反馈
                await limiter.feedback(success=True)
                return await response.text()
            elif status in [429, 503]:
                # 3. 拥塞反馈
                await limiter.feedback(success=False)
                return None
            else:
                # 其他错误通常不影响限流策略
                return None
    except Exception as e:
        # 网络异常也视为需要退避的信号
        await limiter.feedback(success=False)
        return None

async def main():
    state = RateState(interval=1.0) # 初始间隔 1秒
    limiter = AdaptiveLimiter(state)
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(20):
            url = f"http://httpbin.org/status/{200 if random.random() > 0.3 else 429}"
            tasks.append(fetch_url(session, url, limiter))
        
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

异步环境的挑战

在 Python asyncio 中实现这种逻辑有一个隐蔽的陷阱:并发滞后性

当你有 100 个并发任务时,如果第 1 个任务触发了 429 错误并调用了 backoff,其他的 99 个任务可能已经处于 await limiter.wait() 之后的执行阶段,或者正在网络传输中。这意味着这 99 个请求依然会以旧的(过快的)速率打向服务器,导致后续大量的报错。

改进策略:

  1. 全局熔断器:一旦触发 backoff,设置一个短暂的全局标志,暂停所有新请求的发出,直到队列排空。
  2. 令牌桶结合AdaptiveLimiter 不仅控制间隔,还可以动态调整令牌桶的容量。在发生 429 时,不仅增加间隔,还应瞬间清空桶内剩余令牌。

结语

构建自适应系统通过模仿生物的“痛感”反馈机制,使得爬虫能够寻找目标服务器的性能边界(Equilibrium Point)。这不仅保护了你的 IP 资产,也是对目标站点的基本尊重——只取其有余,不竭泽而渔。