Very often, 3rd party API clients are designed to return a generic "API Exception", with details contained inside some property. One notorious example of this is boto3
: most exceptions come as a ClientError
with a response
.
Usually this is fine, but occasionally I need finer-grained exceptions than what the client is giving me.
One common scenario is configuring a celery task to retry a while after the rate limiting: I want to get errors for all exceptions, but for rate-limiting, I just want to retry the task later. Celery offers a convenient autoretry_for
argument for that, but we can't use it just for throttling because boto3
does not return an exception specific enough.
I could wrap the login in a try ... except
clause and inspect the exception, but that get repetitive pretty quickly, especially as I add more and more tasks.
For these kind of situations, I create a context decorator. The decorator inspects the exception for me and, if it's the one I'm looking, raises a custom exception that then I can catch:
# myproject/exceptions.py
import contexlib
from botocore.exceptions import ClientError
class ThrottleException(ClientError):
pass
class raise_throttle(contextlib.ContextDecorator):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type == ClientError:
if exc_value.response.get("Error", {}).get("Code") == "Throttling":
raise ThrottleException(
exc_value.response, exc_value.operation_name
) from exc_value
# returning `False` makes the decorator
# raise the original exception, if any.
return False
This decorator can be used on any function or method that calls the API:
# myproject/myapp/aws.py
import boto3
@raise_throttle()
def upload(content):
s3 = boto3.resource("s3")
bucket = s3.Bucket("my-bucket")
bucket.put_object(
Body=b"lorem ipsum",
Key="Hamlet.txt",
)
And the exception can be caught by celery's autoretry_for
:
# myproject/myapp/tasks.py
from myproject.celery import app
from myproject.exceptions import ThrottleException
from myproject.myapp.aws import upload
@app.task(autoretry_for=(ThrottleException,))
def my_task(content):
upload(content)
But I don't want to retry the task immediately, since the throttling takes a while to be lifted. Therefore I create a Task base class that retries with exponential backoff:
# myproject/tasks.py
import random
from celery import Task
def jitter(jitter_max=1.4):
return random.uniform(1, jitter_max)
def exponential(retries, factor=3):
return (factor ** (retries + 1))
def exp_jitter(retries, exp_factor=3, jitter_max=1.4):
return exponential(retries, exp_factor) * jitter(jitter_max)
class ExpBackoffTask(Task):
abstract = True
max_retries = 10
def retry(self, *args, **kwargs):
countdown = kwargs.get('countdown', None)
if countdown is None:
# if no explicit countdown is given, use an exponential backoff
# with some random jitter, giving us a top-end under 24 hours at
# which the last retry will be attempted.
kwargs['countdown'] = int(
exp_jitter(self.request.retries)
)
return super().retry(*args, **kwargs)
I can then use the custom class as base:
# myproject/myapp/tasks.py
from myproject.celery import app
from myproject.exceptions import ThrottleException
from myproject.tasks import ExpBackoffTask
from myproject.myapp.aws import upload
@app.task(base=ExpBackoffTask, autoretry_for=(ThrottleException,))
def my_task(content):
upload(content)
Top comments (0)