I felt like procrastinating a bit. So I created an simple background worker based on Django Cache Framework.
Here is how it works:
In a django management command start CacheWorker. Provide a list of functions which you need to use as background tasks:
CacheWorker.start_cache_worker(worker_functions)
Anywhere in your app you can send to background those long running functions which you need to collect in an a list (worker_functions) and pass it to CacheWorker.
Example of such mock functions:
import time
from webapp.logger import log
def func1(a: int, b: int):
time.sleep(5)
log.debug(f"func1 result a + b = {a + b}")
def func2(a: int, b: dict):
time.sleep(15)
log.debug(f"func2 a:{a}, b: {b}")
raise Exception("some error")
Here is how you can send to CacheWorker those functions for execution:
t1 = CacheWorker.set_task("func1", 1, 2)
t2 = CacheWorker.set_task("func2", 1, {"some": 1, "dict": [1, "heloo"]})
log.debug(f"t1: {t1}, t2: {t2}")
What you need to do (if you want to try it). Add this to your settings.py file.
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.filebased.FileBasedCache",
"LOCATION": CACHE_DIR,
"OPTIONS": {
"MAX_ENTRIES": 10_000,
"CULL_FREQUENCY": 5000,
},
"TIMEOUT": 3600,
}
}
Create a new file next to settings.py file and paste this code (add your own logger).
import json
import threading
import time
import uuid
from typing import Callable
from django.core.cache import cache
from webapp.logger import log
class CacheWorker:
PREFIX_TASK = "cw:task"
PREFIX_STATUS = "cw:status"
PREFIX_MESSAGE = "cw:message"
STATUS_PENDING = "PENDING"
STATUS_RUNNING = "RUNNING"
STATUS_SUCCESS = "SUCCESS"
STATUS_FAILED = "FAILED"
@staticmethod
def set_task(name: str, *args, **kwargs):
task_id = uuid.uuid4().hex
cache.set(
f"{CacheWorker.PREFIX_TASK}:{task_id}",
json.dumps({"name": name, "args": args, "kwargs": kwargs}),
)
cache.set(f"{CacheWorker.PREFIX_STATUS}:{task_id}", CacheWorker.STATUS_PENDING)
task_ids = cache.get("cw:tasks") or []
task_ids.append(task_id)
cache.set("cw:tasks", task_ids)
return task_id
@staticmethod
def get_task(task_id: str) -> dict:
cached = cache.get(f"{CacheWorker.PREFIX_TASK}:{task_id}")
if not cached:
return
return json.loads(cached)
@staticmethod
def remove_task(task_id: str):
task_keys = [
f"{CacheWorker.PREFIX_TASK}:{task_id}",
f"{CacheWorker.PREFIX_STATUS}:{task_id}",
f"{CacheWorker.PREFIX_MESSAGE}:{task_id}",
]
cache.delete_many(task_keys)
@staticmethod
def get_task_status(task_id: str):
status = cache.get(f"{CacheWorker.PREFIX_STATUS}:{task_id}")
return status
@staticmethod
def get_task_message(task_id: str):
message = json.loads(
cache.get(f"{CacheWorker.PREFIX_MESSAGE}:{task_id}")
or json.dumps({"message": None})
)
return message
@staticmethod
def set_task_status(task_id: str, status: str):
return cache.set(f"{CacheWorker.PREFIX_STATUS}:{task_id}", status)
@staticmethod
def set_task_message(task_id: str, message: dict):
return cache.set(
f"{CacheWorker.PREFIX_MESSAGE}:{task_id}", json.dumps({"message": message})
)
@staticmethod
def start_cache_worker(worker_functions: list[Callable]):
cache.clear()
worker_functions = {func.__name__: func for func in worker_functions}
log.info(
f"Cache worker started with {len(worker_functions)} functions. Registered functions: {', '.join(list(worker_functions.keys()))}"
)
def execute_task(task_id: str, func: Callable, args: tuple, kwargs: dict):
try:
log.info(f"Triggered task_id: {func.__name__}_{task_id}")
CacheWorker.set_task_status(task_id, CacheWorker.STATUS_RUNNING)
func(*args, **kwargs)
CacheWorker.set_task_status(task_id, CacheWorker.STATUS_SUCCESS)
log.success(
f"Finished task_id: {func.__name__}_{task_id} successfully!"
)
except Exception as ex:
log.exception(ex)
CacheWorker.set_task_status(task_id, CacheWorker.STATUS_FAILED)
log.error(
f"Failed to execute task_id: {func.__name__}_{task_id} because of error: {ex}"
)
running_threads = set()
while True:
task_ids = cache.get("cw:tasks") or []
for task_id in task_ids:
if CacheWorker.get_task_status(task_id) != CacheWorker.STATUS_PENDING:
continue
task = CacheWorker.get_task(task_id)
if not task:
continue
if task["name"] not in worker_functions:
log.warning(
f"Worker function {task['name']} was not found in provided worker functions."
)
continue
thread = threading.Thread(
target=execute_task,
args=(
task_id,
worker_functions[task["name"]],
task["args"],
task["kwargs"],
),
daemon=True,
)
thread.start()
running_threads.add(thread)
running_threads = {t for t in running_threads if t.is_alive()}
time.sleep(2)
Some proof that it works:
Use it at your own risk. It's not scalable.

Top comments (0)