2026年,爬虫技术已经发展到了一个新阶段。AI驱动的反爬系统越来越智能,法律法规也在不断完善。在这个背景下,写爬虫不再只是技术问题,更是法律和道德问题。这篇文章从法律边界、道德规范、技术实践三个维度,帮你建立完整的"道德爬虫"知识体系。
一、法律边界:什么能爬,什么不能爬
中国法律框架
在中国,与爬虫相关的主要法律包括:
- 《网络安全法》:禁止非法获取计算机信息系统数据
- 《数据安全法》:对数据收集、存储、使用提出了合规要求
- 《个人信息保护法》:爬取涉及个人信息的数据需要获得授权
- 《刑法》第285条:非法获取计算机信息系统数据罪,情节严重的可处三年以上七年以下有期徒刑
关键判断标准
合法爬取的特征:
- 爬取公开数据(不需要登录即可访问的数据)
- 遵守网站的robots.txt规则
- 请求频率合理,不对目标服务器造成负担
- 不爬取个人信息(姓名、电话、身份证号等)
- 不用于商业竞争目的
可能违法的特征:
- 绕过技术保护措施(验证码、加密参数、登录墙)
- 爬取非公开数据(需要登录才能看到的内容)
- 大量爬取个人信息
- 对目标服务器造成损害(CPU、带宽过载)
- 将爬取数据用于商业牟利,损害原网站利益
robots.txt的正确理解
robots.txt是网站的"门牌",告诉爬虫哪些页面可以访问,哪些不可以:
# robots.txt示例
User-agent: *
Allow: /public/
Disallow: /private/
Disallow: /api/
Disallow: /user/
# 限制爬取频率
Crawl-delay: 5
import urllib.robotparser
def check_robots_txt(url):
"""检查URL是否允许爬取"""
rp = urllib.robotparser.RobotFileParser()
base_url = f"{url.scheme}://{url.netloc}/robots.txt"
rp.set_url(base_url)
rp.read()
can_fetch = rp.can_fetch("*", url.geturl())
crawl_delay = rp.crawl_delay("*")
return can_fetch, crawl_delay
重要提醒:robots.txt是道德约束,不是法律强制。但它代表了网站所有者的意愿,遵守它是道德爬虫的第一步。
二、道德规范:做一个好"邻居"
请求频率控制
import asyncio
import aiohttp
import time
class EthicalCrawler:
def __init__(self, base_delay=1.0, max_concurrent=5):
self.base_delay = base_delay
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.request_times = []
async def fetch(self, session, url):
"""带频率控制的请求"""
async with self.semaphore:
# 自适应延迟:根据最近请求时间动态调整
if self.request_times:
elapsed = time.time() - self.request_times[-1]
if elapsed < self.base_delay:
await asyncio.sleep(self.base_delay - elapsed)
self.request_times.append(time.time())
# 只保留最近100次请求时间
self.request_times = self.request_times[-100:]
try:
async with session.get(url) as response:
if response.status == 429:
# 被限流,等待后重试
retry_after = int(response.headers.get("Retry-After", 60))
print(f"触发限流,等待{retry_after}秒")
await asyncio.sleep(retry_after)
return await self.fetch(session, url)
response.raise_for_status()
return await response.text()
except aiohttp.ClientError as e:
print(f"请求失败: {url} - {e}")
return None
User-Agent标识
# 好的User-Agent:清楚表明你的身份
HEADERS = {
"User-Agent": "MyResearchBot/1.0 (学术研究项目; contact@example.com)",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "zh-CN,zh;q=0.9",
}
# 不好的User-Agent:伪装成浏览器
BAD_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
}
原则:诚实表明你的身份。如果网站管理员有问题,他们可以通过联系方式找到你。
三、技术实践:道德爬虫的实现
完整的道德爬虫框架
python
import asyncio
import aiohttp
import urllib.robotparser
from urllib.parse import urlparse
import logging
from dataclasses import dataclass
from typing import Optional
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class CrawlConfig:
base_url: str
max_concurrent: int = 5
base_delay: float = 2.0
timeout: int = 30
max_retries: int = 3
respect_robots: bool = True
user_agent: str = "EthicalCrawler/1.0 (research; contact@example.com)"
class EthicalCrawler:
def __init__(self, config: CrawlConfig):
self.config = config
self.semaphore = asyncio.Semaphore(config.max_concurrent)
self.visited = set()
self.robot_parser = urllib.robotparser.RobotFileParser()
if config.respect_robots:
self.robot_parser.set_url(f"{config.base_url}/robots.txt")
try:
self.robot_parser.read()
logger.info("已加载robots.txt规则")
except Exception as e:
logger.warning(f"无法读取robots.txt: {e}")
def is_allowed(self, url: str) -> bool:
"""检查URL是否允许爬取"""
if not self.config.respect_robots:
return True
parsed = urlparse(url)
if parsed.netloc != urlparse(self.config.base_url).netloc:
logger.warning(f"跨域请求: {url}")
return False
return self.robot_parser.can_fetch(self.config.user_agent, url)
async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]:
"""获取页面内容"""
if url in self.visited:
return None
if not self.is_allowed(url):
logger.info(f"robots.txt禁止访问: {url}")
return None
async with self.semaphore:
self.visited.add(url)
logger.info(f"正在爬取: {url}")
for attempt in range(self.config.max_retries):
try:
async with session.get(
url,
headers={"User-Agent": self.config.user_agent},
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as response:
if response.status == 429:
retry_after = int(
response.headers.get("Retry-After", 60)
)
logger.warning(f"限流,等待{retry_after}秒")
await asyncio.sleep(retry_after)
continue
if response.status >= 400:
logger.warning(f"HTTP {response.status}: {url}")
return None
# 遵守延迟
await asyncio.sleep(self.config.base_delay)
return await response.text()
except Exception as e:
logger.error(f"请求异常(第{attempt+1}次): {e}")
if attempt < self.config.max_retries - 1:
await asyncio.sleep(2 ** attempt) # 指数退避
return None
async def crawl(self, urls: list[str]) -> list[tuple[str, str]]:
"""批量爬取URL"""
results = []
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks, return_exceptions=True)
---
*本文首发于[我的技术博客](https://wdsega.github.io),欢迎访问获取更多技术文章。*
*如果你是内容创作者或自由职业者,推荐看看我整理的[Creator Pro Bundle](https://segauser.gumroad.com/l/rrhmbb)工具包,包含AI提示词系统、内容创作工具、副业指南和自动化脚本,源码全开放。*
Top comments (0)