import asyncio
import time
import json
import re
import logging
from hashlib import md5
from enum import Enum
from retry import retry
from datetime import datetime
from typing import Optional
from traceback import format_exc
# 用patchright替换playwright
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError
from utils.redisdb import redis_cli
from config import env, config
from other_spider.scheduler import scheduled_task
from utils.spider_failed_alert import ErrorMonitor
# from other_spider.spug.gmgn.utils.config import target_address
from other_spider.spug.gmgn.frankie.config import target_address
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Fuck CF')
"""
基于https://github.com/Xewdy444/CF-Clearance-Scraper改造
"""
class ChallengePlatform(Enum):
"""Cloudflare challenge platform types."""
JAVASCRIPT = "non-interactive"
MANAGED = "managed"
INTERACTIVE = "interactive"
class FuckCF:
"""
从GMGN获取热门代币的前100holders,以及其标签,以及其历史战绩
"""
spider_name = 'Fuck CF Base Class'
author = 'drake shi'
def __init__(self):
self.redis_cli = redis_cli()
self.proxy=config.PROXY_FOR_PLAYWRIGHT
self._timeout = 30
# 数据是否采集成功
self.task_holders_status = True
async def on_response(self, response):
"""
拦截响应
数据结构 gmgn.json
"""
if not response.ok:
return
chain = ''
if 'bsc' in response.url:
chain = 'bsc'
elif 'sol' in response.url:
chain = 'sol'
elif 'base' in response.url:
chain = 'base'
else:
pass
def _get_turnstile_frame(self, page) -> Optional[Frame]:
"""
Get the Cloudflare turnstile frame.
Returns
-------
Optional[Frame]
The Cloudflare turnstile frame.
"""
frame = page.frame(
url=re.compile(
"https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
),
)
return frame
async def cookies(self, page) -> Optional[str]:
"""The cookies from the current page."""
cookies = await page.context.cookies()
if not cookies:
return None
for cookie in cookies:
if cookie["name"] == "cf_clearance":
return cookie["value"]
return None
async def detect_challenge(self, page) -> Optional[str]:
"""
Detect the Cloudflare challenge platform on the current page.
Returns
-------
Optional[ChallengePlatform]
The Cloudflare challenge platform.
"""
html = await page.content()
for platform in ChallengePlatform:
if f"cType: '{platform.value}'" in html:
return platform.value
return None
async def solve_challenge(self, page) -> None:
"""Solve the Cloudflare challenge on the current page."""
verify_button_pattern = re.compile(
"Verify (I am|you are) (not a bot|(a )?human)"
)
verify_button = page.get_by_role("button", name=verify_button_pattern)
challenge_spinner = page.locator("#challenge-spinner")
challenge_stage = page.locator("#challenge-stage")
start_timestamp = datetime.now()
cookies = await self.cookies(page)
challenge_type = await self.detect_challenge(page)
while (
cookies is None
and challenge_type is not None
and (datetime.now() - start_timestamp).seconds < self._timeout
):
if await challenge_spinner.is_visible():
await challenge_spinner.wait_for(state="hidden")
turnstile_frame = self._get_turnstile_frame(page)
if await verify_button.is_visible():
await verify_button.click()
await challenge_stage.wait_for(state="hidden")
elif turnstile_frame is not None:
await page.mouse.click(210, 290)
await challenge_stage.wait_for(state="hidden")
await page.wait_for_timeout(250)
async def detect(self, page):
"""
破解CloudFlare
"""
clearance_cookie = await self.cookies(page)
if clearance_cookie is None:
challenge_platform = await self.detect_challenge(page)
if challenge_platform is None:
logging.error("No Cloudflare challenge detected.")
return
logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")
try:
await self.solve_challenge(page)
except PlaywrightError as err:
logging.error(err)
async def run_local(self, proxy=None):
async with async_playwright() as p:
# 必须得是有头浏览器,否则过不了Cloudflare
launch_data = {
"headless": False,
"proxy": proxy,
"args": [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-first-run',
'--no-default-browser-check',
'--disable-infobars',
'--disable-extensions',
'--disable-features=VizDisplayCompositor'
]
}
# user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
browser = await p.chromium.launch(**launch_data)
for chain in self.address_list:
addresses = self.address_list[chain]
for address in addresses:
# 每个代币地址都是一个无恒模式,结束则销毁新建
context = await browser.new_context()
context.set_default_timeout(self._timeout * 1000)
page = await context.new_page()
# 监听请求流
page.on('response', self.on_response)
# 每个代币地址最少会再次发起150个左右的请求
logger.info(f'准备处理代币地址 {address} ...')
# 对每一个目标链接初始化该状态
self.task_holders_status = False
# url = f'https://www.gmgn.ai/{chain}/token/{address}?tab=holders'
url = f'https://www.gmgn.ai/vas/api/v1/token_holders/{chain}/{address}?from_app=gmgn&tz_name=Asia%2FShanghai&app_lang=en-US&os=web&limit=100&cost=20&orderby=amount_percentage&direction=desc'
# 访问目标地址
await page.goto(url)
# 过反爬,如果不加就是被block的状态
await page.reload()
await asyncio.sleep(3)
await self.detect(page)
# 初始化 单个代币的任务结束则清空
self.wallet_token_info_jobs = []
await context.close()
# 等待页面加载完成
# await page.wait_for_load_state('networkidle')
logger.info('关闭浏览器')
await browser.close()
async def run_aws(self):
"""
在AWS服务器启动
"""
proxy = self.proxy
from pyvirtualdisplay import Display
with Display():
await self.run_local(proxy)
def check_success(self):
"""
校验爬虫是否拿到数据
"""
if not self.task_holders_status:
logger.error('采集失败')
raise Exception('爬虫没有采集到数据')
@ErrorMonitor(spider_name, author)
@retry(tries=3, delay=3)
def task(self):
if env == 'local':
asyncio.run(self.run_local())
else:
asyncio.run(self.run_aws())
# 10分钟执行一次
@scheduled_task(start_time=None, duration=10*60)
def run(self):
"""
通过采集代币详情页采集top holders标签信息和钱包详情页信息
线上加代理
"""
self.address_list = target_address()
self.task()
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)