Async task queues @ Insurami 🤖
At Insurami, we provide landlords and their tenants with an alternative to traditional lease deposits. Landlords can refer new and existing tenants on our platform deposit guarantee quote.
What this means in our backend is a sequence of I/O operations and network requests to third-party services that can take more time than the user might expect. Additionally, there could be situations where a HTTP request fails temporarily, in which case the entire processing flow is broken forever and data in the backend is rendered in an invalid state.
To tackle this problem, we wanted to breakdown the backend processing code into individual steps that can be run independently in a sequence of steps, retried if necessary and the entire flow rolled-back if needed. Following these principles we can start looking at our backend in a transactional way, giving us confidence that our data is consistent.
Task queues provide a way to execute code asynchronously by sending messages to a queue, which are picked up and processed by a process running in the background. There are many task queue frameworks available, but we chose Celery as it is a very established solution and compatible with AWS message queues, which we have already integrated in our platform. The AWS Simple Queue System will work as our message broker, facilitating the communication between processes.
Celery provides the means to execute code blocks sequentially with the
chain method. When the execution chain is started, every block of code runs as a task, and its result passed as the input to the next block.
Every task can be configured to retry when exceptions are raised inside them. It is possible to provide a maximum number of retries, so tasks don't keep failing indefinitely when recovery is not possible.
It is also possible to set an interval between retries, to avoid repeating executions too close in time. A well-known strategy to calculate the interval between retries is to have an exponential back-off, so tasks are allowed a longer interval as more retries are fired. The algorithm to generate such intervals was introduced in this post. Celery version 4 introduces a native auto-retry option with exponential backoff.
We have briefly exposed the main concepts on the implementation of async task queues. Now is the time to see what all that looks like in terms of code:
To add the auto-retry configuration and a custom
on_failure handler we override the
celery.Task class with our own
Abstracts a task in our execution chain.
- We decided on a maximum of 10 retries.
- The retry backoff will start at 4 seconds. So the interval sequence in seconds between retries will be 4, 8, 16, and so on.
- A maximum interval of 600 seconds.
- The exception class which triggers an auto-retry is
Exception, so effectively every exception will trigger one.
- In order to make the exponential backoff sequence work you need to remove the randomness factor introduced with
To handle the failure after the maximum retries we override the
on_failure handle, which we use to send a custom issue into our Sentry account to track issues in async task queues. This would also be the place to handle roll-backs on our task failure. Since the method receives the name of the task, the traceback and other relevant info it’s easy to infer the operations needed to leave the DB in a correct state.
To instantiate the celery object:
The make_celery method returns an instance of the task runner
make_celery method takes your flask app and returns a celery instance called. We will call it
runner. Looking at it in more detail,
The Celery constructor needs:
backendparameter, which in our case is a postgres URI to store metadata of task execution.
brokerparameter, which is the SQS URI for messaging between the tasks.
- You must include the list of modules with the tasks you want to execute.
- The default queue name.
- Additional config for transport options.
- The AWS region of the broker.
- You can specify a queue name prefix, to separate messages by environment, for instance.
- Do not forget to override the default
celery.Taskwith our defined
Next, your code broken-down to individual methods:
These are all individual tasks executed in sequence
The @task decorator turns our methods into tasks. The last step is defining our chain. To do this we write yet another function.
This module starts the chain processing
When we initiate the chain in another task, we will send it to the queue, which will start the processing of the steps defined above.
In more detail, what is passed to the chain is the steps’ signatures -the
.s() methods- separated by commas. The chain itself is started by invoking
apply_async(). If you want to know more about signatures and
apply_async check the celery.Task class docs.
Once all your code is in place a request handler can call the method
start_new_processing which would trigger the async task queue.
The UX in our referral frontend is now much faster since the client is not blocked by operations in the backend. In addition to that, we have been able to automatically recover from a temporary problem with one of the third-party services we use as part of our risk analysis on tenants.