DEV Community

Flavio Curella
Flavio Curella

Posted on

4 1

Refining exceptions with Context Decorators

Very often, 3rd party API clients are designed to return a generic "API Exception", with details contained inside some property. One notorious example of this is boto3: most exceptions come as a ClientError with a response.

Usually this is fine, but occasionally I need finer-grained exceptions than what the client is giving me.

One common scenario is configuring a celery task to retry a while after the rate limiting: I want to get errors for all exceptions, but for rate-limiting, I just want to retry the task later. Celery offers a convenient autoretry_for argument for that, but we can't use it just for throttling because boto3 does not return an exception specific enough.

I could wrap the login in a try ... except clause and inspect the exception, but that get repetitive pretty quickly, especially as I add more and more tasks.

For these kind of situations, I create a context decorator. The decorator inspects the exception for me and, if it's the one I'm looking, raises a custom exception that then I can catch:

# myproject/exceptions.py

import contexlib

from botocore.exceptions import ClientError


class ThrottleException(ClientError):
    pass


class raise_throttle(contextlib.ContextDecorator):
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        if exc_type == ClientError:
            if exc_value.response.get("Error", {}).get("Code") == "Throttling":
                raise ThrottleException(
                    exc_value.response, exc_value.operation_name
                ) from exc_value
        # returning `False` makes the decorator
        # raise the original exception, if any.
        return False

This decorator can be used on any function or method that calls the API:

# myproject/myapp/aws.py
import boto3


@raise_throttle()
def upload(content):
    s3 = boto3.resource("s3")
    bucket = s3.Bucket("my-bucket")
    bucket.put_object(
        Body=b"lorem ipsum",
        Key="Hamlet.txt",
    )

And the exception can be caught by celery's autoretry_for:

# myproject/myapp/tasks.py

from myproject.celery import app
from myproject.exceptions import ThrottleException
from myproject.myapp.aws import upload


@app.task(autoretry_for=(ThrottleException,))
def my_task(content):
    upload(content)

But I don't want to retry the task immediately, since the throttling takes a while to be lifted. Therefore I create a Task base class that retries with exponential backoff:

# myproject/tasks.py

import random

from celery import Task


def jitter(jitter_max=1.4):
    return random.uniform(1, jitter_max)


def exponential(retries, factor=3):
    return (factor ** (retries + 1))


def exp_jitter(retries, exp_factor=3, jitter_max=1.4):
    return exponential(retries, exp_factor) * jitter(jitter_max)


class ExpBackoffTask(Task):
    abstract = True
    max_retries = 10

    def retry(self, *args, **kwargs):
        countdown = kwargs.get('countdown', None)
        if countdown is None:
            # if no explicit countdown is given, use an exponential backoff
            # with some random jitter, giving us a top-end under 24 hours at
            # which the last retry will be attempted.
            kwargs['countdown'] = int(
                exp_jitter(self.request.retries)
            )
        return super().retry(*args, **kwargs)

I can then use the custom class as base:

# myproject/myapp/tasks.py

from myproject.celery import app
from myproject.exceptions import ThrottleException
from myproject.tasks import ExpBackoffTask
from myproject.myapp.aws import upload


@app.task(base=ExpBackoffTask, autoretry_for=(ThrottleException,))
def my_task(content):
    upload(content)

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

Qodo Takeover

Introducing Qodo Gen 1.0: Transform Your Workflow with Agentic AI

Rather than just generating snippets, our agents understand your entire project context, can make decisions, use tools, and carry out tasks autonomously.

Read full post