Adaptive Rate Control and Self-Stabilizing Design in Asynchronous Crawlers
Writing a crawler that can send 1,000 requests per second is easy. Writing a crawler that runs stably for a long time, doesn't get banned, and doesn't crash the target service is hard.
Traditional static rate limiting (e.g., fixed 10 requests per second) often lacks elasticity: it either wastes throughput when the target server is idle or gets banned with 429 errors when the target is under heavy load. This article explores how to design a feedback-based adaptive rate control system in Python's asyncio environment.
The Limitations of Static Limiting
In Python, we can use asyncio.Semaphore or various Token Bucket libraries to limit concurrency.
import asyncio
async def worker(sem):
async with sem:
# Send request
pass
# Limit max concurrency to 10
sem = asyncio.Semaphore(10)
The problem with this code is that it doesn't feel "pain." When the server returns 503 Service Unavailable or 429 Too Many Requests, it continues to hit the wall at maximum concurrency until the IP is banned.
Self-Stabilizing Design: The Feedback Loop
We need a system that can perceive environmental changes. The TCP protocol's Congestion Control gives us a great idea: AIMD (Additive Increase Multiplicative Decrease).
- Cold Start: Start at a very low rate.
- On Success: If the request succeeds (200 OK), slowly increase the rate (Additive Increase).
- On Failure: If throttling occurs (429/503), rapidly decrease the rate (Multiplicative Decrease).
Core Component Implementation
We will implement an AdaptiveLimiter. It doesn't just block requests; it dynamically adjusts the sleep interval.
import asyncio
import time
from dataclasses import dataclass
@dataclass
class RateState:
interval: float # Current request interval (seconds)
min_interval: float = 0.1
max_interval: float = 10.0
backoff_factor: float = 2.0 # Factor to increase interval on error
recovery_step: float = 0.05 # Amount to decrease interval on success
class AdaptiveLimiter:
def __init__(self, state: RateState):
self.state = state
self.lock = asyncio.Lock()
self.last_request_time = 0
async def wait(self):
"""Called before sending a request to control frequency"""
async with self.lock:
now = time.time()
elapsed = now - self.last_request_time
wait_time = self.state.interval - elapsed
if wait_time > 0:
await asyncio.sleep(wait_time)
self.last_request_time = time.time()
async def feedback(self, success: bool):
"""Adjust frequency based on request result"""
if success:
# Additive Increase (decrease interval), speed up
new_interval = max(
self.state.min_interval,
self.state.interval - self.state.recovery_step
)
if new_interval != self.state.interval:
self.state.interval = new_interval
# print(f"Speeding up: interval={self.state.interval:.2f}s")
else:
# Multiplicative Decrease (increase interval), slow down
new_interval = min(
self.state.max_interval,
self.state.interval * self.state.backoff_factor
)
# Add some jitter to prevent thundering herd
self.state.interval = new_interval
print(f"Backing off: interval={self.state.interval:.2f}s")
Integration into Crawler
In the actual crawler loop, we need to inject the Limiter into the request flow.
import aiohttp
import random
async def fetch_url(session, url, limiter):
# 1. Wait for rate limit permit
await limiter.wait()
try:
async with session.get(url) as response:
status = response.status
if status == 200:
# 2. Success feedback
await limiter.feedback(success=True)
return await response.text()
elif status in [429, 503]:
# 3. Congestion feedback
await limiter.feedback(success=False)
return None
else:
# Other errors usually don't affect rate strategy
return None
except Exception as e:
# Network exceptions are also signals to back off
await limiter.feedback(success=False)
return None
async def main():
state = RateState(interval=1.0) # Initial interval 1s
limiter = AdaptiveLimiter(state)
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(20):
# Simulate random 429s
url = f"http://httpbin.org/status/{200 if random.random() > 0.3 else 429}"
tasks.append(fetch_url(session, url, limiter))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Challenges in Asynchronous Environments
Implementing this logic in Python asyncio has a hidden trap: Concurrency Lag.
When you have 100 concurrent tasks, if the 1st task triggers a 429 error and calls backoff, the other 99 tasks might already be past the await limiter.wait() stage or in network transit. This means those 99 requests will still hit the server at the old (too fast) rate, causing a flood of subsequent errors.
Improvement Strategies:
- Global Circuit Breaker: Once
backoffis triggered, set a brief global flag to pause the emission of all new requests until the queue clears. - Token Bucket Combination: The
AdaptiveLimitershouldn't just control the interval but also dynamically adjust the capacity of a token bucket. When a 429 occurs, it should not only increase the interval but also instantly drain remaining tokens in the bucket.
Conclusion
Building an adaptive system mimics biological "pain" feedback mechanisms, allowing the crawler to find the Equilibrium Point of the target server's performance. This not only protects your IP assets but is also a basic respect for the target site—taking only what is spare, not draining the pond to catch the fish.