DEV Community

drake
drake

Posted on

定时调度装饰器

import time
import logging
from datetime import datetime, timedelta
from config import env

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Timer Scheduler')

def scheduled_task(start_time=None, duration=None, weekdays=None):
    """
    定时调度装饰器
    (以下三种调度方式任选其一,其他参数按需配置)

    :param start_time: 启动时间,格式为 'HH:MM'
    :param duration: 多长时间调度一次(秒)
    :param weekdays: 指定周几执行,格式为整数列表 [0,1,2,3,4,5,6],0表示周一,6表示周日
                     如 [1,3,5] 表示周二、周四、周六执行
                     如果不指定,则每天都执行

    调度方式说明:
    1. 周几的几点执行:提供 start_time 和 weekdays 参数
    2. 每天的几点执行:只提供 start_time 参数 
    3. 每隔 N 秒执行一次:只提供 duration 参数
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            if env == 'local':
                logger.info('开发环境,直接启动(放弃定时调度)')
                func(*args, **kwargs)
                return
            logger.info('程序启动,等待调度中...')
            while True:
                # 定时调度
                if start_time:
                    # 获取当前时间和调度时间范围
                    today_now = datetime.now()

                    # 检查是否指定了周几,如果指定了,则检查当前是否是指定的周几
                    if weekdays is not None:
                        # 获取当前是周几 (0表示周一,6表示周日)
                        current_weekday = today_now.weekday()  # 0-6 对应周一至周日
                        if current_weekday not in weekdays:
                            # 不是指定的周几,等待一段时间再检查
                            time.sleep(60)  # 等待 1 分钟再检查
                            continue
                        else:
                            logger.info(f'今天是周{current_weekday+1},符合调度计划 {weekdays}')    

                    # 解析调度时间
                    start_hour, start_minute = map(int, start_time.split(':'))
                    start = today_now.replace(hour=start_hour, minute=start_minute, second=0, microsecond=0)
                    end = start + timedelta(minutes=10)

                    # 如果当前时间在调度时间范围内,执行任务
                    if start <= today_now < end:
                        logger.info('激活调度')
                        func(*args, **kwargs)
                        logger.info('调度结束')

                        # 等待下一个周期
                        time.sleep(60 * 60)  # 等待 1 小时,避免重复执行
                    else:
                        # 不在执行时间范围内,等待一段时间再检查
                        time.sleep(10)
                else:
                    logger.info('激活调度')
                    func(*args, **kwargs)
                    logger.info('调度结束')
                    time.sleep(duration)
        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

Top comments (0)