import asyncio
import re
import logging
from enum import Enum
from retry import retry
from datetime import datetime
from typing import Optional
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError
from utils.redisdb import redis_cli
from config import env, config
from other_spider.scheduler import scheduled_task
from utils.spider_failed_alert import ErrorMonitor
from other_spider.spug.gmgn.frankie.config import target_address
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Fuck CF')
"""
基于https://github.com/Xewdy444/CF-Clearance-Scraper改造
"""
class ChallengePlatform(Enum):
"""Cloudflare challenge platform types."""
JAVASCRIPT = "non-interactive"
MANAGED = "managed"
INTERACTIVE = "interactive"
class FuckCF:
"""
从GMGN获取热门代币的前100holders,以及其标签,以及其历史战绩
"""
spider_name = 'Fuck CF Base Class'
author = 'drake shi'
def __init__(self):
self.redis_cli = redis_cli()
self.proxy=config.PROXY_FOR_PLAYWRIGHT
self._timeout = 30
# 数据是否采集成功
self.task_holders_status = True
async def on_response(self, response):
"""
拦截响应
数据结构 gmgn.json
"""
if not response.ok:
return
chain = ''
if 'bsc' in response.url:
chain = 'bsc'
elif 'sol' in response.url:
chain = 'sol'
elif 'base' in response.url:
chain = 'base'
else:
pass
def _get_turnstile_frame(self, page) -> Optional[Frame]:
"""
Get the Cloudflare turnstile frame.
Returns
-------
Optional[Frame]
The Cloudflare turnstile frame.
"""
frame = page.frame(
url=re.compile(
"https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
),
)
return frame
async def cookies(self, page) -> Optional[str]:
"""The cookies from the current page."""
cookies = await page.context.cookies()
if not cookies:
return None
for cookie in cookies:
if cookie["name"] == "cf_clearance":
return cookie["value"]
return None
async def detect_challenge(self, page) -> Optional[str]:
"""
Detect the Cloudflare challenge platform on the current page.
Returns
-------
Optional[ChallengePlatform]
The Cloudflare challenge platform.
"""
html = await page.content()
for platform in ChallengePlatform:
if f"cType: '{platform.value}'" in html:
return platform.value
return None
async def solve_challenge(self, page) -> None:
"""Solve the Cloudflare challenge on the current page."""
verify_button_pattern = re.compile(
"Verify (I am|you are) (not a bot|(a )?human)"
)
verify_button = page.get_by_role("button", name=verify_button_pattern)
challenge_spinner = page.locator("#challenge-spinner")
challenge_stage = page.locator("#challenge-stage")
start_timestamp = datetime.now()
cookies = await self.cookies(page)
challenge_type = await self.detect_challenge(page)
while (
cookies is None
and challenge_type is not None
and (datetime.now() - start_timestamp).seconds < self._timeout
):
if await challenge_spinner.is_visible():
await challenge_spinner.wait_for(state="hidden")
turnstile_frame = self._get_turnstile_frame(page)
if await verify_button.is_visible():
await verify_button.click()
await challenge_stage.wait_for(state="hidden")
elif turnstile_frame is not None:
await page.mouse.click(210, 290)
await challenge_stage.wait_for(state="hidden")
await page.wait_for_timeout(250)
async def detect(self, page):
"""
破解CloudFlare
"""
clearance_cookie = await self.cookies(page)
if clearance_cookie is None:
challenge_platform = await self.detect_challenge(page)
if challenge_platform is None:
logging.error("No Cloudflare challenge detected.")
return
logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")
try:
await self.solve_challenge(page)
except PlaywrightError as err:
logging.error(err)
async def run_local(self, proxy=None):
async with async_playwright() as p:
# 必须得是有头浏览器,否则过不了Cloudflare
launch_data = {
"headless": False,
"proxy": proxy,
"args": [
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-first-run',
'--no-default-browser-check',
'--disable-infobars',
'--disable-extensions',
'--disable-features=VizDisplayCompositor'
]
}
# user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
browser = await p.chromium.launch(**launch_data)
for chain in self.address_list:
addresses = self.address_list[chain]
for address in addresses:
# 每个代币地址都是一个无恒模式,结束则销毁新建
context = await browser.new_context()
context.set_default_timeout(self._timeout * 1000)
page = await context.new_page()
# 监听请求流
page.on('response', self.on_response)
# 每个代币地址最少会再次发起150个左右的请求
logger.info(f'准备处理代币地址 {address} ...')
# 对每一个目标链接初始化该状态
self.task_holders_status = False
# url = f'https://www.gmgn.ai/{chain}/token/{address}?tab=holders'
url = f'https://www.gmgn.ai/vas/api/v1/token_holders/{chain}/{address}?from_app=gmgn&tz_name=Asia%2FShanghai&app_lang=en-US&os=web&limit=100&cost=20&orderby=amount_percentage&direction=desc'
# 访问目标地址
await page.goto(url)
# 过反爬,如果不加就是被block的状态
await page.reload()
await asyncio.sleep(3)
await self.detect(page)
# 初始化 单个代币的任务结束则清空
self.wallet_token_info_jobs = []
await context.close()
# 等待页面加载完成
# await page.wait_for_load_state('networkidle')
logger.info('关闭浏览器')
await browser.close()
async def run_aws(self):
"""
在AWS服务器启动
"""
proxy = self.proxy
from pyvirtualdisplay import Display
with Display():
await self.run_local(proxy)
def check_success(self):
"""
校验爬虫是否拿到数据
"""
if not self.task_holders_status:
logger.error('采集失败')
raise Exception('爬虫没有采集到数据')
@ErrorMonitor(spider_name, author)
@retry(tries=3, delay=3)
def task(self):
if env == 'local':
asyncio.run(self.run_local())
else:
asyncio.run(self.run_aws())
# 10分钟执行一次
@scheduled_task(start_time=None, duration=10*60)
def run(self):
"""
通过采集代币详情页采集top holders标签信息和钱包详情页信息
线上加代理
"""
self.address_list = target_address()
self.task()
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)