DEV Community

Evans Tjabadi
Evans Tjabadi

Posted on

Supercharge Celery Beat with a custom scheduler - rdbbeat

Have you ever wanted to dynamically add schedules for a large base of users in your service and found out that celery-beat is limited? And you then stumble into django-celery-beat, but you are using fastapi or flask? This article is just made for you!

TLDR: Use rdbbeat library to persist dynamic schedules in your RDB.

  • pip install rdbbeat
  • Run celery-beat with the custom scheduler:
    • python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler.
  • Run a celery worker and use rdbbeat models and controller to add/get schedules.

Introduction

celery-beat runs periodic tasks from schedules that are defined before run time. We want a tool that can add schedules dynamically to our systems during operation.

rdbbeat extends the celery-beat scheduling system in many ways to solve the above problem in a generic pattern. Firstly, the library is built with sqlalchemy models and persists the schedules in your RDB. The library implements listeners on the database models to capture addition/deletion and/or modification of schedules during run time.

Usage Example

We can look at this example:

Build a company's admin tool that sends "Happy Birthday" messages to employees on their birthdays via email.

We can use rdbbeat to add schedules for each employee's birthday dynamically during their on-boarding. The schedules are persisted in the database and can be modified or deleted at any time.

Complete code for this example can be found on Github.

1. Basic service setup with flask and celery

mkdir rdbbeat-flask-example
cd rdbbeat-flask-example
python -m venv venv
source venv/bin/activate
pip install flask celery # view requirements.txt for other dependencies
Enter fullscreen mode Exit fullscreen mode

2. Basic model and DB setup

  • Create and run a database server (I used postgres)
  • Create an employee model in SQLAlchemy
# server/models.py
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()


class Employee(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80), nullable=False)
    surname = db.Column(db.String(80), nullable=False)
    date_of_birth = db.Column(db.Date, nullable=False)

    def to_dict(self):
        return {
            "id": self.id,
            "name": self.name,
            "surname": self.surname,
            "date_of_birth": self.date_of_birth.strftime("%Y-%m-%d"),
        }
Enter fullscreen mode Exit fullscreen mode

3. Basic Flask app setup

  • A simple flask app with a blueprint for employee routes
# server/app.py
import os

from dotenv import load_dotenv
from flask import Flask
from flask_cors import CORS
from flask_migrate import Migrate

from server.db_connection import DATABASE_URL
from server.models import db

load_dotenv()


app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = DATABASE_URL
app.config["SECRET_KEY"] = os.getenv("SECRET_KEY")
# Celery configuration
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0"
app.config["CELERY_RESULT_BACKEND"] = "database"
app.config["CELERY_RESULT_DBURI"] = DATABASE_URL
app.config["CELERY_TRACK_STARTED"] = True
app.config["CELERY_SEND_EVENTS"] = True
app.config["BROKER_TRANSPORT_OPTIONS"] = {"visibility_timeout": 3600}
app.config["CELERY_DEFAULT_QUEUE"] = "default"

migrate = Migrate(app, db, directory="server/migrations", compare_type=True)


CORS(app)
db.init_app(app)

from server.views import employee_router  # noqa isort:skip

app.register_blueprint(employee_router)


@app.route("/")
def index():
    return "Learn to use the celery-rdbbeat scheduler!"
Enter fullscreen mode Exit fullscreen mode

NB: Just adding celery config that we will use later

  • The db_connection.py file to holds the creation of DB connections with session management

  • The views.py file holds the employee routes

from dateutil.parser import parse
from flask import Blueprint, jsonify, request
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task

from server.db_connection import session_scope
from server.models import Employee

employee_router = Blueprint("employee_router", __name__, url_prefix="/api/v1")


@employee_router.post("/employees/")
def create_employee():
    employee_data = request.get_json()
    date_of_birth = parse(employee_data["date_of_birth"]).date()

    employee = Employee(
        name=employee_data["name"],
        surname=employee_data["surname"],
        date_of_birth=date_of_birth,
    )
    with session_scope() as session:
        session.add(employee)
        session.commit()

    return jsonify(db_employee.to_dict()), 201

@employee_router.get("/employees/<int:employee_id>")
def get_employee(employee_id):
    with session_scope() as session:
        employee = session.query(Employee).get(employee_id)
        if not employee:
            return jsonify({"error": "Employee not found"}), 404

        return jsonify(employee.to_dict())
Enter fullscreen mode Exit fullscreen mode

Test Point

  • Use flask-migration or pure alembic to create migrations and run them
# with flask-migrate
flask db init
flask db migrate -m "create employee table" # create migrations
flask db upgrade # create the table in the DB
Enter fullscreen mode Exit fullscreen mode
  • Check that your table is created in the DB (I used TablePlus)
  • Run the flask app
export FLASK_APP=server/app.py
flask run # (or python -m flask run)
Enter fullscreen mode Exit fullscreen mode
  • At this point, you should be able to create employees and get them from the DB using the routes we created above. You can use Postman here or Insomnia. I just used CURL.
# Create an employee
curl -X POST -H "Content-Type: application/json" -d '{"name": "John", "surname": "Doe", "date_of_birth": "1990-01-01"}' http://localhost:5000/api/v1/employees/
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
Enter fullscreen mode Exit fullscreen mode
# Get an employee with id=1
curl http://localhost:5000/api/v1/employees/1
# {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
Enter fullscreen mode Exit fullscreen mode

Now, let the scheduling fun begin!

4. Running the celery-beat with rdbbeat as a custom scheduler

  • Run the migrations to create rdbbeat tables (note that they live in the scheduler schema, not the public schema)
python -m alembic -n scheduler upgrade head
Enter fullscreen mode Exit fullscreen mode
  • Then run the celery-beat (in a separate terminal)
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler
Enter fullscreen mode Exit fullscreen mode

5. Create a simple celery app and link it to the flask app

# server/celery_worker.py
from celery import Celery

from server.app import app as flask_app
from server.db_connection import session_scope

REDIS_URL = "redis://localhost:6379/0"


def create_celery(flask_app=flask_app):
    celery_app = Celery("flask", broker=REDIS_URL)

    celery_app.conf.task_default_queue = "default"

    celery_app.conf.broker_transport_options = {
        "max_retries": 3,
        "interval_start": 0,
        "interval_step": 0.2,
        "interval_max": 0.2,
    }

    # Provide session scope to `rdbbeat`
    celery_app.conf.session_scope = session_scope
    celery_app.conf.update(flask_app.config)

    TaskBase = celery_app.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with flask_app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery_app.Task = ContextTask

    return celery_app


app = create_celery(flask_app=flask_app)
Enter fullscreen mode Exit fullscreen mode

NB: Be sure to provide the session scope to rdbbeat so that it can access the DB.

  • Create a simple task
# server/tasks.py
from server.celery_worker import app
from server.db_connection import session_scope
from server.models import Employee, db


@app.task(name="birthday_greeting")
def birthday_greeting(employee_id):
    with session_scope() as session:
        employee = session.query(Employee).get(employee_id)
        print(f"Happy birthday, {employee.name} {employee.surname}!")

        # Send email to employee
        # email_service.send_email(template="birthday_greeting", to=employee.email, context={"employee": employee.to_dict()})
        # Remind his manager too, in case they forgot :)
Enter fullscreen mode Exit fullscreen mode
  • Now, modify the views.py file to add a schedule for each employee's birthday message
# server/views.py
...
from rdbbeat.controller import Schedule, ScheduledTask, schedule_task

@employee_router.post("/employees/")
def create_employee():
    employee_data = request.get_json()
    date_of_birth = parse(employee_data["date_of_birth"]).date()

    employee = Employee(
        name=employee_data["name"],
        surname=employee_data["surname"],
        date_of_birth=date_of_birth,
    )
    with session_scope() as session:
        session.add(employee)
        session.commit()

        # Create birthday greeting task
        db_employee = session.query(Employee).get(employee.id)
        schedule = Schedule(
            minute="*",
            hour="*",
            day_of_week="*",
            day_of_month=str(date_of_birth.day),
            month_of_year=str(date_of_birth.month),
            timezone="UTC" # FIX-ME: get timezone from employee
        )

        task_to_schedule = ScheduledTask(
            name=f"{db_employee.id}_birthday_greeting",  # All tasks must have a unique name
            task="birthday_greeting",
            schedule=schedule,
        )
        # Provide task kwargs for when the task is executed
        task_kwargs = {"employee_id": db_employee.id}
        schedule_task(session=session, scheduled_task=task_to_schedule, **task_kwargs)

    return jsonify(db_employee.to_dict()), 201
...
Enter fullscreen mode Exit fullscreen mode

6. Run everything and test!

  • For testing sanity, let's modify the schedule to run every minute
# server/views.py
...
schedule = Schedule(
    minute="*",
    hour="*",
    day_of_week="*",
    day_of_month="*",
    month_of_year="*",
    timezone="UTC"
)
...
Enter fullscreen mode Exit fullscreen mode
  • Check that the flask app in running in 1 terminal
  • Check that the celery-beat is running in another terminal
  • Then run the celery worker in another terminal
python -m celery --app=server.tasks worker --loglevel=info
Enter fullscreen mode Exit fullscreen mode
  • Run those curl commands again to create an employees.

7. It works!!

  • You should see logs like these in terminal 2:
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)

Enter fullscreen mode Exit fullscreen mode
  • And the celery worker logs should be happy like:
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]  
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b]  
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
[2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
Enter fullscreen mode Exit fullscreen mode

8. Advanced stuff

8.1. Single-beat

You can use single-beat to run celery-beat in a single process. It has its benefits - it basically safe guards against multiple celery-beat processes running at the same time.

8.2. Deploy with k8s

If you are running your system in kubernetes, you can start up the celery-beat in on pod, the celery-worker in a different pod and run your server application in a different pod. As long as they all have access to the same DB, they should be able to communicate with each other.

9. Alternatives.

9.1. Redbeat

According to its GitHub's docs, RedBeat is a Celery Beat Scheduler that stores the scheduled tasks and runtime metadata in Redis. The trade-off here is that your task schedules are stores in Redis and you have to ensure that your Redis instance is highly available to avoid losing your schedules.

9.2. Django Celery Beat

rdbbeat does what Django Celery Beat does for django, but for SQLAlchemy. If you are using django framework for your server application, you can use Django Celery Beat to schedule your tasks.

Top comments (0)