DEV Community

drake
drake

Posted on

用Patchright绕过CloudFlare的风控

  • 安装依赖
pip install patchright
Enter fullscreen mode Exit fullscreen mode
  • 示例代码

patchright是对playwright的过检测改造,用法与playwright完全一致


import asyncio
import json
import re
import logging
from enum import Enum
from datetime import datetime
from typing import Optional
from traceback import format_exc
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Patchright')

"""
基于https://github.com/Xewdy444/CF-Clearance-Scraper改造
"""

class ChallengePlatform(Enum):
    """Cloudflare challenge platform types."""

    JAVASCRIPT = "non-interactive"
    MANAGED = "managed"
    INTERACTIVE = "interactive"

class FuckCloudFlare:
    """
    绕过CloudFlare CDN的风控
    """
    def __init__(self):
        self.url = 'https://sergiodemo.com/security/challenge/legacy-challenge'
        self._timeout = 30

    async def on_response(self, response):
        """
        拦截响应
        """
        if 'spot/public/getCoinChainList' in response.url:
            logger.info(f'捕获数据接口: {response.url}')
            oridata = await response.body()
            format_data = json.loads(oridata)
            new_data = {}
            ids = []
            id_name_map = {}
            for item in format_data['data']['item']:
                try:
                    symbol = item['coinName']
                    name = item.get('fullName',symbol)
                    # 有些value事空字符串
                    if not name:
                        name = symbol
                    if name in new_data:
                        logger.info(f'name重复:{name}')
                    # 递增关系,越晚上币coinId越大
                    coinId = item['coinId']
                    ids.append(coinId)
                    id_name_map[id] = name
                    new_data[name] = symbol
                except:
                    logger.info(item)
            # 打印最新上的一个币
            max_id = max(ids)
            max_name = id_name_map[id]
            logger.info(f'max id: {max_id} {max_name}')
            # 优化内存
            del new_data
            del ids
            del id_name_map

    def _get_turnstile_frame(self, page) -> Optional[Frame]:
        """
        Get the Cloudflare turnstile frame.

        Returns
        -------
        Optional[Frame]
            The Cloudflare turnstile frame.
        """
        frame = page.frame(
            url=re.compile(
                "https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
            ),
        )
        return frame

    async def cookies(self, page) -> Optional[str]:
        """The cookies from the current page."""
        cookies = await page.context.cookies()
        if not cookies:
            return None
        for cookie in cookies:
            if cookie["name"] == "cf_clearance":
                return cookie["value"]
        return None

    async def detect_challenge(self, page) -> Optional[str]:
        """
        Detect the Cloudflare challenge platform on the current page.

        Returns
        -------
        Optional[ChallengePlatform]
            The Cloudflare challenge platform.
        """
        html = await page.content()
        for platform in ChallengePlatform:
            if f"cType: '{platform.value}'" in html:
                return platform.value
        return None

    async def solve_challenge(self, page) -> None:
        """Solve the Cloudflare challenge on the current page."""
        verify_button_pattern = re.compile(
            "Verify (I am|you are) (not a bot|(a )?human)"
        )

        verify_button = page.get_by_role("button", name=verify_button_pattern)
        challenge_spinner = page.locator("#challenge-spinner")
        challenge_stage = page.locator("#challenge-stage")
        start_timestamp = datetime.now()

        cookies = await self.cookies(page)
        challenge_type = await self.detect_challenge(page)
        while (
            cookies is None
            and challenge_type is not None
            and (datetime.now() - start_timestamp).seconds < self._timeout
        ):
            if await challenge_spinner.is_visible():
                await challenge_spinner.wait_for(state="hidden")

            turnstile_frame = self._get_turnstile_frame(page)

            if await verify_button.is_visible():
                await verify_button.click()
                await challenge_stage.wait_for(state="hidden")
            elif turnstile_frame is not None:
                await page.mouse.click(210, 290)
                await challenge_stage.wait_for(state="hidden")

            await page.wait_for_timeout(250)

    async def detect(self, page):
        """
        破解CloudFlare
        """
        clearance_cookie = await self.cookies(page)
        if clearance_cookie is None:
            challenge_platform = await self.detect_challenge(page)

            if challenge_platform is None:
                logging.error("No Cloudflare challenge detected.")
                return
            logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")

            try:
                await self.solve_challenge(page)
            except PlaywrightError as err:
                logging.error(err)

    async def run_local(self, proxy=None):
        async with async_playwright() as p:
            # 必须得是有头浏览器,否则过不了Cloudflare
            launch_data = {
                "headless": False,
                "proxy": proxy
            }
            user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
            # 目前只支持chromium
            browser = await p.chromium.launch(**launch_data)
            context = await browser.new_context(user_agent=user_agent)
            timeout = 30
            context.set_default_timeout(timeout * 1000)
            page = await context.new_page()
            page.on('response', self.on_response)
            await page.goto(self.url)
            await self.detect(page)
            # 等待页面加载完成
            # await page.wait_for_load_state('networkidle')
            logger.info('关闭浏览器')
            await browser.close()
            logger.info('浏览器已关闭!,1H后重新启动...')


    async def run_aws(self):
        """
        在AWS服务器启动
        """
        proxy = None
        # 使有头浏览器在无图形界面的环境也能正常启动
        from pyvirtualdisplay import Display
        with Display():
            try:
                await self.run_local(proxy)
            except:
                logger.error(f'浏览器异常:{format_exc()}')

    def run(self):
        asyncio.run(self.run_local())

if __name__ == '__main__':
    FuckCloudFlare().run()

Enter fullscreen mode Exit fullscreen mode

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more