DEV Community

Brad
Brad

Posted on

Python Task Scheduler: Run Any Script Automatically (No Cron Needed)

Python Task Scheduler: Run Any Script Automatically (No Cron Needed)

Cron is powerful but cryptic. Here's how to build a Python-based task scheduler that's easier to configure and more powerful than cron.

Simple Task Scheduler with schedule Library

# pip install schedule
import schedule
import time
import threading
from datetime import datetime
from typing import Callable

class TaskScheduler:
    def __init__(self):
        self.jobs = []
        self.running = False

    def add_job(
        self,
        func: Callable,
        interval: str,
        *args,
        **kwargs
    ):
        """Add a job to the scheduler.

        interval examples: 'every 10 minutes', 'daily at 09:30', 'every monday at 08:00'
        """

        parts = interval.lower().split()

        if parts[0] == 'every':
            if len(parts) == 2 and parts[1] == 'day':
                job = schedule.every().day.do(func, *args, **kwargs)
            elif len(parts) == 3:
                amount = int(parts[1])
                unit = parts[2].rstrip('s')  # Remove plural
                job = getattr(schedule.every(amount), unit).do(func, *args, **kwargs)
            elif 'at' in parts:
                day = parts[1]
                time_str = parts[3]
                job = getattr(schedule.every(), day).at(time_str).do(func, *args, **kwargs)
        elif parts[0] == 'daily':
            time_str = parts[2]
            job = schedule.every().day.at(time_str).do(func, *args, **kwargs)

        self.jobs.append(job)
        return job

    def start(self, blocking: bool = True):
        """Start the scheduler."""
        self.running = True

        if blocking:
            self._run_loop()
        else:
            thread = threading.Thread(target=self._run_loop, daemon=True)
            thread.start()

    def _run_loop(self):
        while self.running:
            schedule.run_pending()
            time.sleep(1)

    def stop(self):
        self.running = False


# Usage
def send_report():
    print(f"[{datetime.now()}] Sending daily report...")
    # Your report logic here

def cleanup_files():
    print(f"[{datetime.now()}] Cleaning up temp files...")
    # Your cleanup logic here

def check_api_status():
    print(f"[{datetime.now()}] Checking API health...")
    # Your health check logic here

scheduler = TaskScheduler()
scheduler.add_job(send_report, 'daily at 08:00')
scheduler.add_job(cleanup_files, 'every monday at 02:00')
scheduler.add_job(check_api_status, 'every 15 minutes')
scheduler.start(blocking=True)
Enter fullscreen mode Exit fullscreen mode

Job Queue with Retry Logic

import queue
import traceback
from dataclasses import dataclass, field
from typing import Any, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Job:
    func: Callable
    args: tuple = field(default_factory=tuple)
    kwargs: dict = field(default_factory=dict)
    max_retries: int = 3
    retry_count: int = 0
    name: str = ""

    def __post_init__(self):
        if not self.name:
            self.name = getattr(self.func, '__name__', 'unknown')

class RetryJobQueue:
    def __init__(self, workers: int = 4):
        self.queue = queue.Queue()
        self.workers = workers
        self.results = {}

    def submit(self, func: Callable, *args, max_retries: int = 3, **kwargs):
        """Add a job to the queue."""
        job = Job(func=func, args=args, kwargs=kwargs, max_retries=max_retries)
        self.queue.put(job)
        logger.info(f"Job queued: {job.name}")
        return job

    def _worker(self):
        while True:
            job = self.queue.get()

            if job is None:
                break

            try:
                result = job.func(*job.args, **job.kwargs)
                logger.info(f"Job completed: {job.name}")
                self.results[job.name] = {"status": "success", "result": result}

            except Exception as e:
                job.retry_count += 1

                if job.retry_count < job.max_retries:
                    logger.warning(f"Job failed, retrying ({job.retry_count}/{job.max_retries}): {job.name}")
                    time.sleep(2 ** job.retry_count)  # Exponential backoff
                    self.queue.put(job)
                else:
                    logger.error(f"Job permanently failed after {job.max_retries} retries: {job.name}")
                    self.results[job.name] = {"status": "failed", "error": str(e)}

            finally:
                self.queue.task_done()

    def start(self):
        threads = []
        for _ in range(self.workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            threads.append(t)
        return threads

    def wait_complete(self):
        self.queue.join()

# Usage
job_queue = RetryJobQueue(workers=2)
job_queue.start()

# Submit jobs
job_queue.submit(send_report, max_retries=3)
job_queue.submit(cleanup_files, max_retries=2)

job_queue.wait_complete()
print("All jobs complete:", job_queue.results)
Enter fullscreen mode Exit fullscreen mode

APScheduler for Production Use

For production systems, use APScheduler:

# pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger

scheduler = BackgroundScheduler()

# Cron-style scheduling
scheduler.add_job(
    send_report,
    CronTrigger(hour=8, minute=0),  # Daily at 8 AM
    id='daily_report',
    name='Daily Report Sender'
)

# Interval scheduling
scheduler.add_job(
    check_api_status,
    IntervalTrigger(minutes=15),
    id='health_check',
    name='API Health Check'
)

scheduler.start()

# Keep running
try:
    while True:
        time.sleep(60)
except KeyboardInterrupt:
    scheduler.shutdown()
Enter fullscreen mode Exit fullscreen mode

Windows Task Scheduler vs. This

Feature Windows Task Scheduler Python Scheduler
Cross-platform
Programmatic config Complex Simple
Retry logic Manual Built-in
Logging Limited Full control
Dynamic scheduling Hard Easy
Monitoring Basic Custom

Deploy with Docker

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY scheduler.py .
COPY tasks/ ./tasks/

CMD ["python", "scheduler.py"]
Enter fullscreen mode Exit fullscreen mode

Run as a service that automatically restarts: docker run -d --restart always my-scheduler.

Want Pre-Built Automation Scripts?

This scheduler is part of my Python automation toolkit used across multiple production systems.

👉 Get 50+ Python automation scripts — task schedulers, file organizers, email automators, database tools, and more.

Stop manually running scripts. Automate everything once, run forever.

Top comments (0)