DEV Community

WDSEGA
WDSEGA

Posted on

Python Web Scraping Ethics: A Complete Guide for 2026

2026年,爬虫技术已经发展到了一个新阶段。AI驱动的反爬系统越来越智能,法律法规也在不断完善。在这个背景下,写爬虫不再只是技术问题,更是法律和道德问题。这篇文章从法律边界、道德规范、技术实践三个维度,帮你建立完整的"道德爬虫"知识体系。

一、法律边界:什么能爬,什么不能爬

中国法律框架

在中国,与爬虫相关的主要法律包括:

  • 《网络安全法》:禁止非法获取计算机信息系统数据
  • 《数据安全法》:对数据收集、存储、使用提出了合规要求
  • 《个人信息保护法》:爬取涉及个人信息的数据需要获得授权
  • 《刑法》第285条:非法获取计算机信息系统数据罪,情节严重的可处三年以上七年以下有期徒刑

关键判断标准

合法爬取的特征

  • 爬取公开数据(不需要登录即可访问的数据)
  • 遵守网站的robots.txt规则
  • 请求频率合理,不对目标服务器造成负担
  • 不爬取个人信息(姓名、电话、身份证号等)
  • 不用于商业竞争目的

可能违法的特征

  • 绕过技术保护措施(验证码、加密参数、登录墙)
  • 爬取非公开数据(需要登录才能看到的内容)
  • 大量爬取个人信息
  • 对目标服务器造成损害(CPU、带宽过载)
  • 将爬取数据用于商业牟利,损害原网站利益

robots.txt的正确理解

robots.txt是网站的"门牌",告诉爬虫哪些页面可以访问,哪些不可以:

# robots.txt示例
User-agent: *
Allow: /public/
Disallow: /private/
Disallow: /api/
Disallow: /user/

# 限制爬取频率
Crawl-delay: 5
Enter fullscreen mode Exit fullscreen mode
import urllib.robotparser

def check_robots_txt(url):
    """检查URL是否允许爬取"""
    rp = urllib.robotparser.RobotFileParser()
    base_url = f"{url.scheme}://{url.netloc}/robots.txt"
    rp.set_url(base_url)
    rp.read()

    can_fetch = rp.can_fetch("*", url.geturl())
    crawl_delay = rp.crawl_delay("*")

    return can_fetch, crawl_delay
Enter fullscreen mode Exit fullscreen mode

重要提醒:robots.txt是道德约束,不是法律强制。但它代表了网站所有者的意愿,遵守它是道德爬虫的第一步。

二、道德规范:做一个好"邻居"

请求频率控制

import asyncio
import aiohttp
import time

class EthicalCrawler:
    def __init__(self, base_delay=1.0, max_concurrent=5):
        self.base_delay = base_delay
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.request_times = []

    async def fetch(self, session, url):
        """带频率控制的请求"""
        async with self.semaphore:
            # 自适应延迟:根据最近请求时间动态调整
            if self.request_times:
                elapsed = time.time() - self.request_times[-1]
                if elapsed < self.base_delay:
                    await asyncio.sleep(self.base_delay - elapsed)

            self.request_times.append(time.time())
            # 只保留最近100次请求时间
            self.request_times = self.request_times[-100:]

            try:
                async with session.get(url) as response:
                    if response.status == 429:
                        # 被限流,等待后重试
                        retry_after = int(response.headers.get("Retry-After", 60))
                        print(f"触发限流,等待{retry_after}")
                        await asyncio.sleep(retry_after)
                        return await self.fetch(session, url)

                    response.raise_for_status()
                    return await response.text()
            except aiohttp.ClientError as e:
                print(f"请求失败: {url} - {e}")
                return None
Enter fullscreen mode Exit fullscreen mode

User-Agent标识

# 好的User-Agent:清楚表明你的身份
HEADERS = {
    "User-Agent": "MyResearchBot/1.0 (学术研究项目; contact@example.com)",
    "Accept": "text/html,application/xhtml+xml",
    "Accept-Language": "zh-CN,zh;q=0.9",
}

# 不好的User-Agent:伪装成浏览器
BAD_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
}
Enter fullscreen mode Exit fullscreen mode

原则:诚实表明你的身份。如果网站管理员有问题,他们可以通过联系方式找到你。

三、技术实践:道德爬虫的实现

完整的道德爬虫框架


python
import asyncio
import aiohttp
import urllib.robotparser
from urllib.parse import urlparse
import logging
from dataclasses import dataclass
from typing import Optional
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class CrawlConfig:
    base_url: str
    max_concurrent: int = 5
    base_delay: float = 2.0
    timeout: int = 30
    max_retries: int = 3
    respect_robots: bool = True
    user_agent: str = "EthicalCrawler/1.0 (research; contact@example.com)"


class EthicalCrawler:
    def __init__(self, config: CrawlConfig):
        self.config = config
        self.semaphore = asyncio.Semaphore(config.max_concurrent)
        self.visited = set()
        self.robot_parser = urllib.robotparser.RobotFileParser()

        if config.respect_robots:
            self.robot_parser.set_url(f"{config.base_url}/robots.txt")
            try:
                self.robot_parser.read()
                logger.info("已加载robots.txt规则")
            except Exception as e:
                logger.warning(f"无法读取robots.txt: {e}")

    def is_allowed(self, url: str) -> bool:
        """检查URL是否允许爬取"""
        if not self.config.respect_robots:
            return True

        parsed = urlparse(url)
        if parsed.netloc != urlparse(self.config.base_url).netloc:
            logger.warning(f"跨域请求: {url}")
            return False

        return self.robot_parser.can_fetch(self.config.user_agent, url)

    async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]:
        """获取页面内容"""
        if url in self.visited:
            return None

        if not self.is_allowed(url):
            logger.info(f"robots.txt禁止访问: {url}")
            return None

        async with self.semaphore:
            self.visited.add(url)
            logger.info(f"正在爬取: {url}")

            for attempt in range(self.config.max_retries):
                try:
                    async with session.get(
                        url,
                        headers={"User-Agent": self.config.user_agent},
                        timeout=aiohttp.ClientTimeout(total=self.config.timeout)
                    ) as response:
                        if response.status == 429:
                            retry_after = int(
                                response.headers.get("Retry-After", 60)
                            )
                            logger.warning(f"限流,等待{retry_after}秒")
                            await asyncio.sleep(retry_after)
                            continue

                        if response.status >= 400:
                            logger.warning(f"HTTP {response.status}: {url}")
                            return None

                        # 遵守延迟
                        await asyncio.sleep(self.config.base_delay)
                        return await response.text()

                except Exception as e:
                    logger.error(f"请求异常(第{attempt+1}次): {e}")
                    if attempt < self.config.max_retries - 1:
                        await asyncio.sleep(2 ** attempt)  # 指数退避

        return None

    async def crawl(self, urls: list[str]) -> list[tuple[str, str]]:
        """批量爬取URL"""
        results = []
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_page(session, url) for url in urls]
            pages = await asyncio.gather(*tasks, return_exceptions=True)


---
*本文首发于[我的技术博客](https://wdsega.github.io),欢迎访问获取更多技术文章。*
*如果你是内容创作者或自由职业者,推荐看看我整理的[Creator Pro Bundle](https://segauser.gumroad.com/l/rrhmbb)工具包,包含AI提示词系统、内容创作工具、副业指南和自动化脚本,源码全开放。*
Enter fullscreen mode Exit fullscreen mode

Top comments (0)