DEV Community

drake
drake

Posted on • Edited on

boss职位监控

import asyncio
import re
from enum import Enum
from typing import Optional
# 用patchright替换playwright
from patchright.async_api import async_playwright, Frame
from patchright.async_api import Error as PlaywrightError
from utils.redisdb import redis_cli
from config import env
import logging
import json
from datetime import datetime
from retry import retry
from utils.lark_bot import sender
from other_spider.scheduler import scheduled_task

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Boss Anouncement')

class ChallengePlatform(Enum):
    """Cloudflare challenge platform types."""
    JAVASCRIPT = "non-interactive"
    MANAGED = "managed"
    INTERACTIVE = "interactive"

class BossAlert:
    """
    币安上币公告告警到研究院的群里
    """
    spider_name = 'BinanceAlert To Research'
    author = 'drake.shi'

    def __init__(self):
        self.redis_cli = redis_cli()
        self.black_list_key = 'binance:listing:black'
        # 添加代理配置
        self.PROXY_URL = 'h5'
        # 解析代理URL
        self.proxy = self._parse_proxy_url(self.PROXY_URL)
        self._timeout = 30
        self.api = 'https://www.zhipin.com/web/geek/jobs?city={}&query=%E5%90%88%E7%BA%A6%E5%B7%A5%E7%A8%8B%E5%B8%88'
        self.lark_hook = 'https://open.larksuite.com/open-apis/bot/v2/hook/141f'
        self.codes = [100010000, 101010100, 101020100, 101280100, 101280600, 101210100, 101030100, 101110100, 101190400, 101200100, 101230200, 101250100, 101270100, 101180100, 101040100]

    def _parse_proxy_url(self, proxy_url):
        """
        解析代理URL为playwright需要的格式
        """
        if not proxy_url:
            return None

        # 解析 http://username:password@host:port 格式
        if proxy_url.startswith('http://'):
            proxy_url = proxy_url[7:]  # 移除 http://

        if '@' in proxy_url:
            auth_part, server_part = proxy_url.split('@', 1)
            username, password = auth_part.split(':', 1)
            server, port = server_part.split(':', 1)

            return {
                "server": f"http://{server}:{port}",
                "username": username,
                "password": password
            }
        else:
            # 没有认证信息的代理
            server, port = proxy_url.split(':', 1)
            return {
                "server": f"http://{server}:{port}"
            }

    def _get_turnstile_frame(self, page) -> Optional[Frame]:
        """
        Get the Cloudflare turnstile frame.
        Returns
        -------
        Optional[Frame]
            The Cloudflare turnstile frame.
        """
        frame = page.frame(
            url=re.compile(
                "https://challenges.cloudflare.com/cdn-cgi/challenge-platform/h/[bg]/turnstile"
            ),
        )
        return frame

    async def cookies(self, page) -> Optional[str]:
        """The cookies from the current page."""
        cookies = await page.context.cookies()
        if not cookies:
            return None
        for cookie in cookies:
            if cookie["name"] == "cf_clearance":
                return cookie["value"]
        return None

    async def detect_challenge(self, page) -> Optional[str]:
        """
        Detect the Cloudflare challenge platform on the current page.
        Returns
        -------
        Optional[ChallengePlatform]
            The Cloudflare challenge platform.
        """
        html = await page.content()
        for platform in ChallengePlatform:
            if f"cType: '{platform.value}'" in html:
                return platform.value
        return None

    async def solve_challenge(self, page) -> None:
        """Solve the Cloudflare challenge on the current page."""
        verify_button_pattern = re.compile(
            "Verify (I am|you are) (not a bot|(a )?human)"
        )
        verify_button = page.get_by_role("button", name=verify_button_pattern)
        challenge_spinner = page.locator("#challenge-spinner")
        challenge_stage = page.locator("#challenge-stage")
        start_timestamp = datetime.now()
        cookies = await self.cookies(page)
        challenge_type = await self.detect_challenge(page)
        while (
            cookies is None
            and challenge_type is not None
            and (datetime.now() - start_timestamp).seconds < self._timeout
        ):
            if await challenge_spinner.is_visible():
                await challenge_spinner.wait_for(state="hidden")
            turnstile_frame = self._get_turnstile_frame(page)
            if await verify_button.is_visible():
                await verify_button.click()
                await challenge_stage.wait_for(state="hidden")
            elif turnstile_frame is not None:
                await page.mouse.click(210, 290)
                await challenge_stage.wait_for(state="hidden")
            await page.wait_for_timeout(250)

    async def detect(self, page):
        """
        破解CloudFlare
        """
        clearance_cookie = await self.cookies(page)
        if clearance_cookie is None:
            challenge_platform = await self.detect_challenge(page)
            if challenge_platform is None:
                logging.error("No Cloudflare challenge detected.")
                return
            logging.info(f"Solving Cloudflare challenge [{challenge_platform}]...")
            try:
                await self.solve_challenge(page)
            except PlaywrightError as err:
                logging.error(err)

    def parse(self, data):
        """
        解析职位数据并且告警
        """
        name_check_list = [
            '智能合约',
            'Solidity',
            '区块链合约'
        ]
        jobList = data['zpData']['jobList']
        for item in jobList:
            salaryDesc = item['salaryDesc']
            encryptJobId = item['encryptJobId']
            url = f'https://www.zhipin.com/job_detail/{encryptJobId}.html'
            jobName = item['jobName']
            # 判断名称是否符合标准
            name_ok = False
            for keyword in name_check_list:
                if keyword in jobName:
                    name_ok = True
            if not name_ok:
                logger.info(f'不匹配:{jobName}')
                continue
            if 'K' not in salaryDesc:
                logger.info(f'薪资太低:{salaryDesc} {jobName}')
            try:
                salaryDesc_up = salaryDesc.split('-')[-1].split('K')[0]
                salaryDesc_up_int = int(salaryDesc_up)
                if salaryDesc_up_int < 20:
                    logger.info(f'薪资太低:{salaryDesc} {jobName}')
                    continue
            except:
                logger.info(f'薪资解析异常:{salaryDesc} {jobName}')
                continue
            brandName = item['brandName']
            brandScaleName = item['brandScaleName']
            cityName = item['cityName']
            content = f"{salaryDesc}\n{jobName}\n{brandName}\n{brandScaleName}\n{cityName}\n{url}"
            alert_status = self.redis_cli.sismember(self.black_list_key, encryptJobId)
            if not alert_status:
                sender(content, self.lark_hook)
                self.redis_cli.sadd(self.black_list_key, encryptJobId)
                logger.info(content)
            else:
                logger.info(f'已告警,过滤:{content}')

    async def on_response(self, response):
        """
        监控数据流
        """
        if not response.ok:
            return
        if 'joblist.json' in response.url:
            try:
                oridata = await response.body()
                format_data = json.loads(oridata)
                print(format_data)
                self.parse(format_data)
            except:
                pass

    async def run_local(self, proxy=None):
        async with async_playwright() as p:
            # 使用代理配置
            proxy_config = proxy

            # 必须得是有头浏览器,否则过不了Cloudflare
            launch_data = {
                "headless": False,
            }

            # 如果有代理配置,添加到launch参数中
            if proxy_config:
                launch_data["proxy"] = proxy_config
                logger.info(f"使用代理: {proxy_config['server']}")

            user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
            browser = await p.chromium.launch(**launch_data)

            # 创建新的上下文
            context = await browser.new_context(user_agent=user_agent)
            context.set_default_timeout(self._timeout * 1000)
            page = await context.new_page()
            await page.add_init_script("Object.defineProperties(navigator, {webdriver:{get:()=>undefined}});")

            # 监听请求流
            page.on('response', self.on_response)

            for code in self.codes:
                url = self.api.format(code)
                logger.info(f'GET {url}')
                # 访问目标地址
                await page.goto(url)
                await asyncio.sleep(3)
                await self.detect(page)
                await asyncio.sleep(60)

            await context.close()
            logger.info('关闭浏览器')
            await browser.close()

    async def run_aws(self):
        """
        在AWS服务器启动
        """
        proxy = self.proxy
        from pyvirtualdisplay import Display
        with Display():
            await self.run_local(proxy)

    @retry(tries=3, delay=3)
    def task(self):
        if env == 'local':
            asyncio.run(self.run_local())
        else:
            asyncio.run(self.run_aws())

    @scheduled_task(start_time=None, duration=20*60)
    def run(self):
        """
        利用浏览器实现binance 上币公告的采集
        """
        self.task()

Enter fullscreen mode Exit fullscreen mode

Top comments (0)