import asyncio
import re
from enum import Enum
from typing import Optional
# 用patchright替换playwright
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError
from utils.redisdb import redis_cli
from config import env
import logging
import json
from datetime import datetime
from retry import retry
from utils.lark_bot import sender
from other_spider.scheduler import scheduled_task
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Boss Anouncement')
class ChallengePlatform(Enum):
"""Cloudflare challenge platform types."""
JAVASCRIPT = "non-interactive"
MANAGED = "managed"
INTERACTIVE = "interactive"
class BossAlert:
"""
币安上币公告告警到研究院的群里
"""
spider_name = 'BinanceAlert To Research'
author = 'drake.shi'
def __init__(self):
self.redis_cli = redis_cli()
self.black_list_key = 'binance:listing:black'
# 添加代理配置
self.PROXY_URL = 'h5'
# 解析代理URL
self.proxy = self._parse_proxy_url(self.PROXY_URL)
self._timeout = 30
self.api = 'https://www.zhipin.com/web/geek/jobs?city={}&query=%E5%90%88%E7%BA%A6%E5%B7%A5%E7%A8%8B%E5%B8%88'
self.lark_hook = 'https://open.larksuite.com/open-apis/bot/v2/hook/141f'
self.codes = [100010000, 101010100, 101020100, 101280100, 101280600, 101210100, 101030100, 101110100, 101190400, 101200100, 101230200, 101250100, 101270100, 101180100, 101040100]
def _parse_proxy_url(self, proxy_url):
"""
解析代理URL为playwright需要的格式
"""
if not proxy_url:
return None
# 解析 http://username:password@host:port 格式
if proxy_url.startswith('http://'):
proxy_url = proxy_url[7:] # 移除 http://
if '@' in proxy_url:
auth_part, server_part = proxy_url.split('@', 1)
username, password = auth_part.split(':', 1)
server, port = server_part.split(':', 1)
return {
"server": f"http://{server}:{port}",
"username": username,
"password": password
}
else:
# 没有认证信息的代理
server, port = proxy_url.split(':', 1)
return {
"server": f"http://{server}:{port}"
}
def _get_turnstile_frame(self, page) -> Optional[Frame]:
"""
Get the Cloudflare turnstile frame.
Returns
-------
Optional[Frame]
The Cloudflare turnstile frame.
"""
frame = page.frame(
url=re.compile(
"https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
),
)
return frame
async def cookies(self, page) -> Optional[str]:
"""The cookies from the current page."""
cookies = await page.context.cookies()
if not cookies:
return None
for cookie in cookies:
if cookie["name"] == "cf_clearance":
return cookie["value"]
return None
async def detect_challenge(self, page) -> Optional[str]:
"""
Detect the Cloudflare challenge platform on the current page.
Returns
-------
Optional[ChallengePlatform]
The Cloudflare challenge platform.
"""
html = await page.content()
for platform in ChallengePlatform:
if f"cType: '{platform.value}'" in html:
return platform.value
return None
async def solve_challenge(self, page) -> None:
"""Solve the Cloudflare challenge on the current page."""
verify_button_pattern = re.compile(
"Verify (I am|you are) (not a bot|(a )?human)"
)
verify_button = page.get_by_role("button", name=verify_button_pattern)
challenge_spinner = page.locator("#challenge-spinner")
challenge_stage = page.locator("#challenge-stage")
start_timestamp = datetime.now()
cookies = await self.cookies(page)
challenge_type = await self.detect_challenge(page)
while (
cookies is None
and challenge_type is not None
and (datetime.now() - start_timestamp).seconds < self._timeout
):
if await challenge_spinner.is_visible():
await challenge_spinner.wait_for(state="hidden")
turnstile_frame = self._get_turnstile_frame(page)
if await verify_button.is_visible():
await verify_button.click()
await challenge_stage.wait_for(state="hidden")
elif turnstile_frame is not None:
await page.mouse.click(210, 290)
await challenge_stage.wait_for(state="hidden")
await page.wait_for_timeout(250)
async def detect(self, page):
"""
破解CloudFlare
"""
clearance_cookie = await self.cookies(page)
if clearance_cookie is None:
challenge_platform = await self.detect_challenge(page)
if challenge_platform is None:
logging.error("No Cloudflare challenge detected.")
return
logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")
try:
await self.solve_challenge(page)
except PlaywrightError as err:
logging.error(err)
def parse(self, data):
"""
解析职位数据并且告警
"""
name_check_list = [
'智能合约',
'Solidity',
'区块链合约'
]
jobList = data['zpData']['jobList']
for item in jobList:
salaryDesc = item['salaryDesc']
encryptJobId = item['encryptJobId']
url = f'https://www.zhipin.com/job_detail/{encryptJobId}.html'
jobName = item['jobName']
# 判断名称是否符合标准
name_ok = False
for keyword in name_check_list:
if keyword in jobName:
name_ok = True
if not name_ok:
logger.info(f'不匹配:{jobName}')
continue
if 'K' not in salaryDesc:
logger.info(f'薪资太低:{salaryDesc} {jobName}')
try:
salaryDesc_up = salaryDesc.split('-')[-1].split('K')[0]
salaryDesc_up_int = int(salaryDesc_up)
if salaryDesc_up_int < 20:
logger.info(f'薪资太低:{salaryDesc} {jobName}')
continue
except:
logger.info(f'薪资解析异常:{salaryDesc} {jobName}')
continue
brandName = item['brandName']
brandScaleName = item['brandScaleName']
cityName = item['cityName']
content = f"{salaryDesc}\n{jobName}\n{brandName}\n{brandScaleName}\n{cityName}\n{url}"
alert_status = self.redis_cli.sismember(self.black_list_key, encryptJobId)
if not alert_status:
sender(content, self.lark_hook)
self.redis_cli.sadd(self.black_list_key, encryptJobId)
logger.info(content)
else:
logger.info(f'已告警,过滤:{content}')
async def on_response(self, response):
"""
监控数据流
"""
if not response.ok:
return
if 'joblist.json' in response.url:
try:
oridata = await response.body()
format_data = json.loads(oridata)
print(format_data)
self.parse(format_data)
except:
pass
async def run_local(self, proxy=None):
async with async_playwright() as p:
# 使用代理配置
proxy_config = proxy
# 必须得是有头浏览器,否则过不了Cloudflare
launch_data = {
"headless": False,
}
# 如果有代理配置,添加到launch参数中
if proxy_config:
launch_data["proxy"] = proxy_config
logger.info(f"使用代理: {proxy_config['server']}")
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
browser = await p.chromium.launch(**launch_data)
# 创建新的上下文
context = await browser.new_context(user_agent=user_agent)
context.set_default_timeout(self._timeout * 1000)
page = await context.new_page()
await page.add_init_script("Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});")
# 监听请求流
page.on('response', self.on_response)
for code in self.codes:
url = self.api.format(code)
logger.info(f'GET {url}')
# 访问目标地址
await page.goto(url)
await asyncio.sleep(3)
await self.detect(page)
await asyncio.sleep(60)
await context.close()
logger.info('关闭浏览器')
await browser.close()
async def run_aws(self):
"""
在AWS服务器启动
"""
proxy = self.proxy
from pyvirtualdisplay import Display
with Display():
await self.run_local(proxy)
@retry(tries=3, delay=3)
def task(self):
if env == 'local':
asyncio.run(self.run_local())
else:
asyncio.run(self.run_aws())
@scheduled_task(start_time=None, duration=20*60)
def run(self):
"""
利用浏览器实现binance 上币公告的采集
"""
self.task()
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)