Python Task Scheduler: Run Any Script Automatically (No Cron Needed)
Cron is powerful but cryptic. Here's how to build a Python-based task scheduler that's easier to configure and more powerful than cron.
Simple Task Scheduler with schedule Library
# pip install schedule
import schedule
import time
import threading
from datetime import datetime
from typing import Callable
class TaskScheduler:
def __init__(self):
self.jobs = []
self.running = False
def add_job(
self,
func: Callable,
interval: str,
*args,
**kwargs
):
"""Add a job to the scheduler.
interval examples: 'every 10 minutes', 'daily at 09:30', 'every monday at 08:00'
"""
parts = interval.lower().split()
if parts[0] == 'every':
if len(parts) == 2 and parts[1] == 'day':
job = schedule.every().day.do(func, *args, **kwargs)
elif len(parts) == 3:
amount = int(parts[1])
unit = parts[2].rstrip('s') # Remove plural
job = getattr(schedule.every(amount), unit).do(func, *args, **kwargs)
elif 'at' in parts:
day = parts[1]
time_str = parts[3]
job = getattr(schedule.every(), day).at(time_str).do(func, *args, **kwargs)
elif parts[0] == 'daily':
time_str = parts[2]
job = schedule.every().day.at(time_str).do(func, *args, **kwargs)
self.jobs.append(job)
return job
def start(self, blocking: bool = True):
"""Start the scheduler."""
self.running = True
if blocking:
self._run_loop()
else:
thread = threading.Thread(target=self._run_loop, daemon=True)
thread.start()
def _run_loop(self):
while self.running:
schedule.run_pending()
time.sleep(1)
def stop(self):
self.running = False
# Usage
def send_report():
print(f"[{datetime.now()}] Sending daily report...")
# Your report logic here
def cleanup_files():
print(f"[{datetime.now()}] Cleaning up temp files...")
# Your cleanup logic here
def check_api_status():
print(f"[{datetime.now()}] Checking API health...")
# Your health check logic here
scheduler = TaskScheduler()
scheduler.add_job(send_report, 'daily at 08:00')
scheduler.add_job(cleanup_files, 'every monday at 02:00')
scheduler.add_job(check_api_status, 'every 15 minutes')
scheduler.start(blocking=True)
Job Queue with Retry Logic
import queue
import traceback
from dataclasses import dataclass, field
from typing import Any, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Job:
func: Callable
args: tuple = field(default_factory=tuple)
kwargs: dict = field(default_factory=dict)
max_retries: int = 3
retry_count: int = 0
name: str = ""
def __post_init__(self):
if not self.name:
self.name = getattr(self.func, '__name__', 'unknown')
class RetryJobQueue:
def __init__(self, workers: int = 4):
self.queue = queue.Queue()
self.workers = workers
self.results = {}
def submit(self, func: Callable, *args, max_retries: int = 3, **kwargs):
"""Add a job to the queue."""
job = Job(func=func, args=args, kwargs=kwargs, max_retries=max_retries)
self.queue.put(job)
logger.info(f"Job queued: {job.name}")
return job
def _worker(self):
while True:
job = self.queue.get()
if job is None:
break
try:
result = job.func(*job.args, **job.kwargs)
logger.info(f"Job completed: {job.name}")
self.results[job.name] = {"status": "success", "result": result}
except Exception as e:
job.retry_count += 1
if job.retry_count < job.max_retries:
logger.warning(f"Job failed, retrying ({job.retry_count}/{job.max_retries}): {job.name}")
time.sleep(2 ** job.retry_count) # Exponential backoff
self.queue.put(job)
else:
logger.error(f"Job permanently failed after {job.max_retries} retries: {job.name}")
self.results[job.name] = {"status": "failed", "error": str(e)}
finally:
self.queue.task_done()
def start(self):
threads = []
for _ in range(self.workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
threads.append(t)
return threads
def wait_complete(self):
self.queue.join()
# Usage
job_queue = RetryJobQueue(workers=2)
job_queue.start()
# Submit jobs
job_queue.submit(send_report, max_retries=3)
job_queue.submit(cleanup_files, max_retries=2)
job_queue.wait_complete()
print("All jobs complete:", job_queue.results)
APScheduler for Production Use
For production systems, use APScheduler:
# pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
scheduler = BackgroundScheduler()
# Cron-style scheduling
scheduler.add_job(
send_report,
CronTrigger(hour=8, minute=0), # Daily at 8 AM
id='daily_report',
name='Daily Report Sender'
)
# Interval scheduling
scheduler.add_job(
check_api_status,
IntervalTrigger(minutes=15),
id='health_check',
name='API Health Check'
)
scheduler.start()
# Keep running
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
scheduler.shutdown()
Windows Task Scheduler vs. This
| Feature | Windows Task Scheduler | Python Scheduler |
|---|---|---|
| Cross-platform | ❌ | ✅ |
| Programmatic config | Complex | Simple |
| Retry logic | Manual | Built-in |
| Logging | Limited | Full control |
| Dynamic scheduling | Hard | Easy |
| Monitoring | Basic | Custom |
Deploy with Docker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY scheduler.py .
COPY tasks/ ./tasks/
CMD ["python", "scheduler.py"]
Run as a service that automatically restarts: docker run -d --restart always my-scheduler.
Want Pre-Built Automation Scripts?
This scheduler is part of my Python automation toolkit used across multiple production systems.
👉 Get 50+ Python automation scripts — task schedulers, file organizers, email automators, database tools, and more.
Stop manually running scripts. Automate everything once, run forever.
Top comments (0)