import time
import logging
from datetime import datetime
from croniter import croniter
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.exceptions import DontCloseSpider
from utils.redisdb import redis_cli
from config import env
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Extension')
# 原始配置参考https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/extensions.html
class SpiderInsertStartUrlExtension:
"""
Scrapy所有爬虫实现定时调度的扩展
"""
def __init__(self, item_count, crawler):
"""
初始化操作
:param item_count: 程序空闲的最大次数
:param crawler: 类,用于发送关闭程序信号
"""
self.crawler = crawler
self.count = 0 # 统计空闲次数
self.conn = redis_cli()
@classmethod
def from_crawler(cls, crawler):
"""
必须方法
"""
# 判断是否启用扩展
if not crawler.settings.getbool('MYEXT_ENABLED'):
raise NotConfigured
# 每隔5个item 输出当前采集的数量
item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 5)
ext = cls(item_count, crawler)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle) # 加载空闲信号
return ext
def cron_judgement(self, spider):
"""
定时调度,比如在每日的8:00整调度,支持在spider中定义crontab的语法给变量
比如:每日0点1分启动:cron_job = "1 0 * * *"
比如:每天的 8:30、10:30、18:30、20:30 和 22:30 触发任务 cron_job = "30 8,10,18,20,22 * * *"
:return: True or False True则执行
目前最细颗粒度只支持分钟级别的
"""
# crontab的语法规则,该语法只约束启动时间
# 不存在该类属性说明不需要cron定时调度
cron_job_exist = hasattr(spider,'cron_job')
if not cron_job_exist:
return
# 拿到crontab的语法字符串
cron_expr = spider.cron_job
base_time = datetime.now()
# 解析成为datatime对象
cron = croniter(cron_expr, base_time)
# 校验当前时间是否匹配上了定时的时间规则
now = datetime.now()
# 获取最近的执行时间
# (当时间刚好到达预期的调度时间的时候,
# previous_run值将会刷新为当前的调度时间;
# 如果没有到达目标的执行时间的时候,
# previous_run始终得到的是上一次调度的时间)
# base_time 刚刚到达目标调度时间的时候,那么previous_run也会刷新为该时间,此时时间上吻合,即可激活调度
previous_run = cron.get_prev(datetime)
# 只能处理分钟级别的精确度(忽略秒和微秒)
if now.replace(second=0, microsecond=0) == previous_run.replace(second=0, microsecond=0):
# 当cronjob定义的最小小单位为分钟级别时;爬虫可能在5秒跑完,那么1分钟内爬虫将有12次满足条件的情况,爬虫将被重复拉起12次
# 为避免次情况的发生,通过redis key 在短时间内做去重
if not self.conn.get(f"{spider.name}:spider_opened"):
self.conn.setex(f"{spider.name}:spider_opened",5*60,1)
return True
else:
logger.info(f'{cron_expr} 已经开始执行,由于爬虫速度过快,导致条件再次满足,故做去重处理')
return False
else:
logger.info(f'等待开始调度的时间 {cron_expr} 上一次调度:{previous_run}...')
return False
def interval_time(self,spider):
"""
根据间隔时间调度 比如每30分钟调度一次: schedule_time = 12 * 30
:return: True or False 真则执行
"""
# 存在cron调度则不需要此间隔调度
cron_job = hasattr(spider,'cron_job')
schedule_time = hasattr(spider,'schedule_time')
if cron_job:
return False
if not schedule_time:
return False
# 空闲超过指定时间
if self.count > spider.schedule_time:
# 每次开始执行任务后,都初始化计数器,重新统计下一轮的空闲时长
self.count = 0
return True
else:
return False
def spider_opened(self, spider):
"""
必须方法
只执行一次,爬虫启动的时候执行
支持指定时间启动
"""
# 爬虫首次判断是否需要定时调度
# 判断是否需要定时调度
# 如果需要定时调度,则执行if语句
cron_job = hasattr(spider,'cron_job')
logger.info(f'环境: {env}')
logger.info(f'cronjob: {cron_job}')
# 线上环境才执行cron coinsadjust
if env == 'prod':
if cron_job:
while True:
run = self.cron_judgement(spider)
if run:
break
else:
time.sleep(59)
logger.info("opened spider %s" % spider.name)
# 该变量为了让程序idle的时候输出上次调度的时间,以此判断上次调度是否正常
self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def insert_start_url(self, spider):
"""
从start_requests类方法中 拿到爬虫任务,启动每一个任务
"""
logger.info('激活调度, 从 start_request 类方法 直接生成任务...')
for request in spider.start_requests():
self.crawler.engine.slot.scheduler.enqueue_request(request)
def spider_closed(self, spider):
"""
必须方法
"""
logger.info("closed spider %s" % spider.name)
self.crawler.engine.close_spider(spider, '爬虫已经采集完毕')
def spider_idle(self, spider):
"""
记录信息,作出关闭选择
框架默认5秒执行一次spider_idle
"""
logger.info(f'{spider.name} Idle 爬虫上次启动时间为 {self.started_time}')
self.count += 1
self.spider_run(spider)
raise DontCloseSpider
def spider_run(self,spider):
"""
激活调度爬虫
需要根据条件判断是否激活
"""
# 常规来讲,调度方式二选一
cron = self.cron_judgement(spider)
interval = self.interval_time(spider)
if cron or interval:
# 清空内存中的去重缓存,防止对下一次调度造成干扰
logger.info("清空内存去重缓存...")
fingerprints = spider.crawler.engine.slot.scheduler.df.fingerprints
fingerprints.clear()
# 启动爬虫
self.insert_start_url(spider)
self.started_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Get n8n VPS hosting 3x cheaper than a cloud solution
Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)