Rate limiting is the top scraping obstacle. Here's how to detect and handle it.
Detection
import requests, time, random
from collections import deque
class RateLimitDetector:
def __init__(self):
self.times = deque(maxlen=100)
self.baseline = None
def check(self, resp):
sig = {"limited":False,"reasons":[],"delay":0}
if resp.status_code == 429:
sig.update(limited=True, delay=int(resp.headers.get("Retry-After",60)))
sig["reasons"].append("HTTP 429")
if resp.status_code == 503:
sig.update(limited=True, delay=30)
sig["reasons"].append("HTTP 503")
if resp.status_code == 200:
for w in ["captcha","recaptcha","unusual traffic","automated requests"]:
if w in resp.text.lower():
sig.update(limited=True, delay=60)
sig["reasons"].append(f"Soft block: {w}")
break
self.times.append(resp.elapsed.total_seconds())
if len(self.times)>10:
if not self.baseline: self.baseline = sum(list(self.times)[:5])/5
if sum(list(self.times)[-5:])/5 > self.baseline*3:
sig["reasons"].append("3x slower"); sig["delay"]=max(sig["delay"],10)
return sig
Adaptive Throttling
class Throttler:
def __init__(self, min_d=0.5, max_d=30, start=1.0):
self.min_d, self.max_d, self.delay = min_d, max_d, start
self.streak = 0
self.det = RateLimitDetector()
def wait(self):
time.sleep(self.delay * random.uniform(0.5, 1.5))
def update(self, resp):
sig = self.det.check(resp)
if sig["limited"]:
self.delay = min(self.delay*2, self.max_d)
if sig["delay"]: self.delay = max(self.delay, sig["delay"])
self.streak = 0; return False
if resp.status_code == 200:
self.streak += 1
if self.streak > 10: self.delay = max(self.delay*0.9, self.min_d)
return True
Token Bucket
class TokenBucket:
def __init__(self, rate, cap):
self.rate, self.cap, self.tokens, self.last = rate, cap, cap, time.time()
def consume(self):
now = time.time()
self.tokens = min(self.cap, self.tokens+(now-self.last)*self.rate)
self.last = now
if self.tokens >= 1: self.tokens -= 1
else: time.sleep((1-self.tokens)/self.rate); self.tokens = 0
Production Scraper
class Scraper:
def __init__(self, rps=1):
self.s = requests.Session()
self.t = Throttler(start=1/rps)
self.b = TokenBucket(rps, 5)
def fetch(self, url, retries=3):
for i in range(retries):
self.b.consume(); self.t.wait()
try:
r = self.s.get(url, timeout=30)
if self.t.update(r): return r
time.sleep(self.t.delay)
except requests.Timeout:
time.sleep(5*(i+1))
return None
sc = Scraper(rps=2)
for u in ["https://example.com/1","https://example.com/2"]:
r = sc.fetch(u)
if r: print(f"OK: {u}")
Pro Solutions
ScraperAPI auto-handles rate limits. ThorData distributes across IPs. ScrapeOps alerts on degradation.
Rules
- Respect Retry-After headers
- Exponential backoff on limits
- Add jitter to avoid bursts
- Monitor response time degradation
- Check robots.txt crawl-delay
- Track per-domain separately
Top comments (0)