Используйте transaction.on_commit
В одном из файлов встречается использование sleep
.
@celery.task(name="users.rebuild_tags")
def rebuild_tags(student_id: str | int) -> None:
# preventing race condition when task looks for user which isn't saved into db yet
time.sleep(1) # ❌
user = apps.get_model("users.User").objects.get(pk=student_id)
generate_tags(user)
Нормально использовать sleep
после операции, которая требует ожидания, но здесь такого нет, а это уже плохо пахнет.
Причина появления sleep
указана в комментарии. Как же так получается, что student_id
уже есть, а пользователя связанного с этим id в БД ещё нет? Всё дело в транзакциях. Пока транзакция не завершится, созданные ею изменения не видны.
Рассмотрим следующий пример
class UserView(APIView):
def post(request):
user = create_user(request)
rebuild_tags.delay(student_id=user.id) # ❌
long_time_db_operation()
return Response({"id": user.id}, status=201)
По умолчанию в django все http-запросы заворачиваются в транзакции. Это значит, что созданный пользователь виден только в представлении (view), которое его создала. Код за пределами представления не видит этого пользователя, пока не завершится транзакция. Для простоты, будем считать, что транзакция завершается, когда выполниться return
.
Задача rebuild_tags
ставится в очередь раньше, чем будет выполнен return
, а значит возможна ситуация, когда задача выполнится раньше завершения транзакции. sleep
должен был решить эту проблему, но представим себе, что long_time_db_operation
может иметь переменную длительность в зависимости от загруженности базы данных, тогда в некоторых случаях длительность может быть больше одной секунды и тогда sleep
не поможет.
Есть надёжный вариант
class UserView(APIView):
def post(request):
user = create_user(request)
task = rebuild_tags.s(student_id=user.id).delay
transaction.on_commit(task) # ✅
long_time_db_operation()
return Response({"id": user.id}, status=201)
transaction.on_commit
- принимает на вход функцию, которую нужно выполнить после завершения транзакции, поэтому у .delay
нет скобок
Параметры задачи переезжают в метод .s
, это позволяет задаче запуститься с нужными параметрами.
Таким образом, не важно сколько времени занимает транзакция, фоновая задача будет запущена, только после успешного завершения транзакции.
Offtop. С точки зрения TDD вариант со sleep правильный, т.к. TDD требует писать минимальный код, который будет проходить тесты. Но если написать тест, который проверяет, что задача не ставится в очередь при падении транзакции, то тогда без on_commit
не обойтись.
Используйте именованные аргументы
Ещё посмотреть на события celery
во flower
, то информация о задаче может выглядеть rebuild_tags args=(1) kwargs={}
. Когда параметров много, это могло выглядеть так args=(10, 1000, true, 1, 100)
. Неизвестно что означают эти числа. Когда перед глазами сотни событий celery, это не очень хорошо. Исправить это очень легко, достаточно добавить *,
перед аргументами задачи. В нашем примере это будет так:
@celery.task(name="users.rebuild_tags")
def rebuild_tags(*, student_id: str | int) -> None: # ✅
...
Во flower это будет отображаться так rebuild_tags args=() kwargs={"student_id": 1}
- так намного понятней.
Рассмотрим ещё один пример. Пусть в celery beat добавлена задача, которая должна отправить письма тем пользователям у которых сейчас 8 утра по их местному времени.
@celery.task(name="Рассылка")
def send_mails() -> None: # ❌
users = find_users(8) # Найти пользователей, у сейчас которых подходящее локальное время
# Странная, но необходимая проверка, которая мешает тестированию
if now().minute = 0:
...
Такую задачу можно тестировать вручную лишь одну минуту в сутки, в остальное время она не срабатывает. Перепишем задачу так, чтобы сохранить её функционал.
@celery.task(name="Рассылка")
def send_mails(*, hour: int = 8, minute: int = 0) -> None: # ✅
users = find_users(hour) # Найти пользователей, у сейчас которых подходящее локальное время
if now().minute = minute: # Странная проверка, которая мешает тестированию
...
Теперь, можно задавать именованные аргументы через админку celery beat
в блоке Arguments конкретной задачи в формате json: {"hour":17, "minute":47}
. Так можно тестировать задачу в любое время.
Top comments (0)