DEV Community

drake
drake

Posted on

Scrapy2.12.0通过extension来实现定时调度

import time
import logging
from datetime import datetime
from croniter import croniter
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.exceptions import DontCloseSpider
from utils.redisdb import redis_cli
from config import env

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Extension')

# 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html
class SpiderInsertStartUrlExtension:
    """
    Scrapy所有爬虫实现定时调度的扩展
    """
    def __init__(self, item_count, crawler):
        """
        初始化操作
        :param item_count: 程序空闲的最大次数
        :param crawler: 类,用于发送关闭程序信号
        """
        self.crawler = crawler
        self.count = 0      # 统计空闲次数
        self.conn = redis_cli()

    @classmethod
    def from_crawler(cls, crawler):
        """
        必须方法
        """
        # 判断是否启用扩展
        if not crawler.settings.getbool('MYEXT_ENABLED'):
            raise NotConfigured
        # 每隔5个item 输出当前采集的数量
        item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5)
        ext = cls(item_count, crawler)
        crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)  # 加载空闲信号
        return ext

    def cron_judgement(self, spider):
        """
        定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量
        比如:每日0点1分启动:cron_job = "1 0 * * *"
        比如:每天的 8:30、10:30、18:30、20:30 和 22:30 触发任务 cron_job = "30 8,10,18,20,22 * * *"
        :return: True or False True则执行
        目前最细颗粒度只支持分钟级别的
        """
        # crontab的语法规则,该语法只约束启动时间
        # 不存在该类属性说明不需要cron定时调度
        cron_job_exist = hasattr(spider,'cron_job')
        if not cron_job_exist:
            return
        # 拿到crontab的语法字符串
        cron_expr = spider.cron_job
        base_time = datetime.now()
        # 解析成为datatime对象
        cron = croniter(cron_expr, base_time)
        # 校验当前时间是否匹配上了定时的时间规则
        now = datetime.now()
        # 获取最近的执行时间
        # (当时间刚好到达预期的调度时间的时候,
        # previous_run值将会刷新为当前的调度时间;
        # 如果没有到达目标的执行时间的时候,
        # previous_run始终得到的是上一次调度的时间)
        # base_time 刚刚到达目标调度时间的时候,那么previous_run也会刷新为该时间,此时时间上吻合,即可激活调度
        previous_run = cron.get_prev(datetime)
        # 只能处理分钟级别的精确度(忽略秒和微秒)
        if now.replace(second=0, microsecond=0) == previous_run.replace(second=0, microsecond=0):
            # 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次
            # 为避免次情况的发生,通过redis key 在短时间内做去重
            if not self.conn.get(f"{spider.name}:spider_opened"):
                self.conn.setex(f"{spider.name}:spider_opened",5*60,1)
                return True
            else:
                logger.info(f'{cron_expr} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理')
                return False
        else:
            logger.info(f'等待开始调度的时间 {cron_expr} 上一次调度:{previous_run}...')
            return False

    def interval_time(self,spider):
        """
        根据间隔时间调度 比如每30分钟调度一次: schedule_time = 12 * 30
        :return: True or False 真则执行
        """
        # 存在cron调度则不需要此间隔调度
        cron_job = hasattr(spider,'cron_job')
        schedule_time = hasattr(spider,'schedule_time')

        if cron_job:
            return False
        if not schedule_time:
            return False
        # 空闲超过指定时间
        if self.count > spider.schedule_time:
            # 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
            self.count = 0
            return True
        else:
            return False

    def spider_opened(self, spider):
        """
        必须方法
        只执行一次,爬虫启动的时候执行
        支持指定时间启动
        """
        # 爬虫首次判断是否需要定时调度
        # 判断是否需要定时调度
        # 如果需要定时调度,则执行if语句
        cron_job = hasattr(spider,'cron_job')
        logger.info(f'环境: {env}')
        logger.info(f'cronjob: {cron_job}')
        # 线上环境才执行cron coinsadjust
        if env == 'prod':
            if cron_job:
                while True:
                    run = self.cron_judgement(spider)
                    if run:
                        break
                    else:
                        time.sleep(59)
        logger.info("opened spider %s" % spider.name)
        # 该变量为了让程序idle的时候输出上次调度的时间,以此判断上次调度是否正常
        self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def insert_start_url(self, spider):
        """
        从start_requests类方法中 拿到爬虫任务,启动每一个任务
        """
        logger.info('激活调度, 从 start_request 类方法 直接生成任务...')
        for request in spider.start_requests():
            self.crawler.engine.slot.scheduler.enqueue_request(request)

    def spider_closed(self, spider):
        """
        必须方法
        """
        logger.info("closed spider %s" % spider.name)
        self.crawler.engine.close_spider(spider, '爬虫已经采集完毕')

    def spider_idle(self, spider):
        """
        记录信息,作出关闭选择
        框架默认5秒执行一次spider_idle
        """
        logger.info(f'{spider.name} Idle 爬虫上次启动时间为 {self.started_time}')
        self.count += 1
        self.spider_run(spider)
        raise DontCloseSpider

    def spider_run(self,spider):
        """
        激活调度爬虫
        需要根据条件判断是否激活
        """
        # 常规来讲,调度方式二选一
        cron = self.cron_judgement(spider)
        interval = self.interval_time(spider)
        if cron or interval:
            # 清空内存中的去重缓存,防止对下一次调度造成干扰
            logger.info("清空内存去重缓存...")
            fingerprints = spider.crawler.engine.slot.scheduler.df.fingerprints
            fingerprints.clear()
            # 启动爬虫
            self.insert_start_url(spider)
            self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

Enter fullscreen mode Exit fullscreen mode

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

If this article connected with you, consider tapping ❤️ or leaving a brief comment to share your thoughts!

Okay