DEV Community

Гимаев Наиль
Гимаев Наиль

Posted on

2. Celery

Используйте transaction.on_commit

В одном из файлов встречается использование sleep.

@celery.task(name="users.rebuild_tags")
def rebuild_tags(student_id: str | int) -> None:
    # preventing race condition when task looks for user which isn't saved into db yet
    time.sleep(1)  # ❌
    user = apps.get_model("users.User").objects.get(pk=student_id)

    generate_tags(user)
Enter fullscreen mode Exit fullscreen mode

Нормально использовать sleep после операции, которая требует ожидания, но здесь такого нет, а это уже плохо пахнет.

Причина появления sleep указана в комментарии. Как же так получается, что student_id уже есть, а пользователя связанного с этим id в БД ещё нет? Всё дело в транзакциях. Пока транзакция не завершится, созданные ею изменения не видны.

Рассмотрим следующий пример

class UserView(APIView):
  def post(request):
    user = create_user(request)
    rebuild_tags.delay(student_id=user.id)  # ❌
    long_time_db_operation()
    return Response({"id": user.id}, status=201)
Enter fullscreen mode Exit fullscreen mode

По умолчанию в django все http-запросы заворачиваются в транзакции. Это значит, что созданный пользователь виден только в представлении (view), которое его создала. Код за пределами представления не видит этого пользователя, пока не завершится транзакция. Для простоты, будем считать, что транзакция завершается, когда выполниться return.
Задача rebuild_tags ставится в очередь раньше, чем будет выполнен return, а значит возможна ситуация, когда задача выполнится раньше завершения транзакции. sleep должен был решить эту проблему, но представим себе, что long_time_db_operation может иметь переменную длительность в зависимости от загруженности базы данных, тогда в некоторых случаях длительность может быть больше одной секунды и тогда sleep не поможет.

Есть надёжный вариант

class UserView(APIView):
  def post(request):
    user = create_user(request)
    task = rebuild_tags.s(student_id=user.id).delay
    transaction.on_commit(task)  # ✅
    long_time_db_operation()
    return Response({"id": user.id}, status=201)
Enter fullscreen mode Exit fullscreen mode

transaction.on_commit - принимает на вход функцию, которую нужно выполнить после завершения транзакции, поэтому у .delay нет скобок
Параметры задачи переезжают в метод .s, это позволяет задаче запуститься с нужными параметрами.
Таким образом, не важно сколько времени занимает транзакция, фоновая задача будет запущена, только после успешного завершения транзакции.

Offtop. С точки зрения TDD вариант со sleep правильный, т.к. TDD требует писать минимальный код, который будет проходить тесты. Но если написать тест, который проверяет, что задача не ставится в очередь при падении транзакции, то тогда без on_commit не обойтись.

Используйте именованные аргументы

Ещё посмотреть на события celery во flower, то информация о задаче может выглядеть rebuild_tags args=(1) kwargs={}. Когда параметров много, это могло выглядеть так args=(10, 1000, true, 1, 100). Неизвестно что означают эти числа. Когда перед глазами сотни событий celery, это не очень хорошо. Исправить это очень легко, достаточно добавить *, перед аргументами задачи. В нашем примере это будет так:

@celery.task(name="users.rebuild_tags")
def rebuild_tags(*, student_id: str | int) -> None:  # ✅
  ...
Enter fullscreen mode Exit fullscreen mode

Во flower это будет отображаться так rebuild_tags args=() kwargs={"student_id": 1} - так намного понятней.

Рассмотрим ещё один пример. Пусть в celery beat добавлена задача, которая должна отправить письма тем пользователям у которых сейчас 8 утра по их местному времени.

@celery.task(name="Рассылка")
def send_mails() -> None:  # ❌
  users = find_users(8) # Найти пользователей, у сейчас которых подходящее локальное время
  # Странная, но необходимая проверка, которая мешает тестированию
  if now().minute = 0: 
    ...
Enter fullscreen mode Exit fullscreen mode

Такую задачу можно тестировать вручную лишь одну минуту в сутки, в остальное время она не срабатывает. Перепишем задачу так, чтобы сохранить её функционал.

@celery.task(name="Рассылка")
def send_mails(*, hour: int = 8, minute: int = 0) -> None:  # ✅
  users = find_users(hour) # Найти пользователей, у сейчас которых подходящее локальное время
  if now().minute = minute: # Странная проверка, которая мешает тестированию
    ...
Enter fullscreen mode Exit fullscreen mode

Теперь, можно задавать именованные аргументы через админку celery beat в блоке Arguments конкретной задачи в формате json: {"hour":17, "minute":47}. Так можно тестировать задачу в любое время.

Top comments (0)