DEV Community

Alin Climente
Alin Climente

Posted on

Fun, simple, NOT scalable background worker based on Django Cache

I felt like procrastinating a bit. So I created an simple background worker based on Django Cache Framework.

Here is how it works:
In a django management command start CacheWorker. Provide a list of functions which you need to use as background tasks:

CacheWorker.start_cache_worker(worker_functions)
Enter fullscreen mode Exit fullscreen mode

Anywhere in your app you can send to background those long running functions which you need to collect in an a list (worker_functions) and pass it to CacheWorker.

Example of such mock functions:

import time

from webapp.logger import log


def func1(a: int, b: int):
    time.sleep(5)
    log.debug(f"func1 result a + b = {a + b}")


def func2(a: int, b: dict):
    time.sleep(15)
    log.debug(f"func2 a:{a}, b: {b}")

    raise Exception("some error")
Enter fullscreen mode Exit fullscreen mode

Here is how you can send to CacheWorker those functions for execution:


t1 = CacheWorker.set_task("func1", 1, 2)
t2 = CacheWorker.set_task("func2", 1, {"some": 1, "dict": [1, "heloo"]})

log.debug(f"t1: {t1}, t2: {t2}")

Enter fullscreen mode Exit fullscreen mode

What you need to do (if you want to try it). Add this to your settings.py file.

CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.filebased.FileBasedCache",
        "LOCATION": CACHE_DIR,
        "OPTIONS": {
            "MAX_ENTRIES": 10_000,
            "CULL_FREQUENCY": 5000,
        },
        "TIMEOUT": 3600,
    }
}
Enter fullscreen mode Exit fullscreen mode

Create a new file next to settings.py file and paste this code (add your own logger).

import json
import threading
import time
import uuid
from typing import Callable

from django.core.cache import cache
from webapp.logger import log


class CacheWorker:
    PREFIX_TASK = "cw:task"
    PREFIX_STATUS = "cw:status"
    PREFIX_MESSAGE = "cw:message"

    STATUS_PENDING = "PENDING"
    STATUS_RUNNING = "RUNNING"
    STATUS_SUCCESS = "SUCCESS"
    STATUS_FAILED = "FAILED"

    @staticmethod
    def set_task(name: str, *args, **kwargs):
        task_id = uuid.uuid4().hex
        cache.set(
            f"{CacheWorker.PREFIX_TASK}:{task_id}",
            json.dumps({"name": name, "args": args, "kwargs": kwargs}),
        )
        cache.set(f"{CacheWorker.PREFIX_STATUS}:{task_id}", CacheWorker.STATUS_PENDING)

        task_ids = cache.get("cw:tasks") or []
        task_ids.append(task_id)
        cache.set("cw:tasks", task_ids)

        return task_id

    @staticmethod
    def get_task(task_id: str) -> dict:
        cached = cache.get(f"{CacheWorker.PREFIX_TASK}:{task_id}")
        if not cached:
            return
        return json.loads(cached)

    @staticmethod
    def remove_task(task_id: str):
        task_keys = [
            f"{CacheWorker.PREFIX_TASK}:{task_id}",
            f"{CacheWorker.PREFIX_STATUS}:{task_id}",
            f"{CacheWorker.PREFIX_MESSAGE}:{task_id}",
        ]
        cache.delete_many(task_keys)

    @staticmethod
    def get_task_status(task_id: str):
        status = cache.get(f"{CacheWorker.PREFIX_STATUS}:{task_id}")
        return status

    @staticmethod
    def get_task_message(task_id: str):
        message = json.loads(
            cache.get(f"{CacheWorker.PREFIX_MESSAGE}:{task_id}")
            or json.dumps({"message": None})
        )
        return message

    @staticmethod
    def set_task_status(task_id: str, status: str):
        return cache.set(f"{CacheWorker.PREFIX_STATUS}:{task_id}", status)

    @staticmethod
    def set_task_message(task_id: str, message: dict):
        return cache.set(
            f"{CacheWorker.PREFIX_MESSAGE}:{task_id}", json.dumps({"message": message})
        )

    @staticmethod
    def start_cache_worker(worker_functions: list[Callable]):
        cache.clear()

        worker_functions = {func.__name__: func for func in worker_functions}
        log.info(
            f"Cache worker started with {len(worker_functions)} functions. Registered functions: {', '.join(list(worker_functions.keys()))}"
        )

        def execute_task(task_id: str, func: Callable, args: tuple, kwargs: dict):
            try:
                log.info(f"Triggered task_id: {func.__name__}_{task_id}")
                CacheWorker.set_task_status(task_id, CacheWorker.STATUS_RUNNING)
                func(*args, **kwargs)
                CacheWorker.set_task_status(task_id, CacheWorker.STATUS_SUCCESS)
                log.success(
                    f"Finished task_id: {func.__name__}_{task_id} successfully!"
                )
            except Exception as ex:
                log.exception(ex)
                CacheWorker.set_task_status(task_id, CacheWorker.STATUS_FAILED)
                log.error(
                    f"Failed to execute task_id: {func.__name__}_{task_id} because of error: {ex}"
                )

        running_threads = set()

        while True:
            task_ids = cache.get("cw:tasks") or []
            for task_id in task_ids:
                if CacheWorker.get_task_status(task_id) != CacheWorker.STATUS_PENDING:
                    continue

                task = CacheWorker.get_task(task_id)
                if not task:
                    continue

                if task["name"] not in worker_functions:
                    log.warning(
                        f"Worker function {task['name']} was not found in provided worker functions."
                    )
                    continue

                thread = threading.Thread(
                    target=execute_task,
                    args=(
                        task_id,
                        worker_functions[task["name"]],
                        task["args"],
                        task["kwargs"],
                    ),
                    daemon=True,
                )
                thread.start()
                running_threads.add(thread)

                running_threads = {t for t in running_threads if t.is_alive()}

            time.sleep(2)

Enter fullscreen mode Exit fullscreen mode

Some proof that it works:

django cache background worker

Use it at your own risk. It's not scalable.

Top comments (0)