Hello Community,
While most humans would take a break, I decided to skip rest and spend the next few hours (on a Saturday) discussing the implementation of the different cases in Celery Work-flows.
I find documentation a little hard to follow, hence decided to write this blog.
When Celery work-flow?
If celery tasks have some dependency on one another based on the order of execution. The order of execution can be decided based on:
- A task needs to be run before another
- A task needs to return some data critical for the execution of another.
- Tasks need to be executed in parallel to optimize performance.
- Conditional execution of task based on the result of previous one.
Celery is a powerful task queue that enables a more complex workflow than executing a task. Work-flow enables us to orchestrate various tasks.
Back to Basics ⚡️
Let's take a refresher course on Celery. The first thing we are going to discuss is terms relevant in the context of this blog.
Task vs Subtask vs Sharedtask
Task vs Subtask
Task vs Sharedtask
Signature
Let us consider an example where we have to create a Celery Work-flow where we want two tasks say Task_A and Task_B executed one after another. Task_A returns a result to be consumed or required by Task_B.
We won't be having result_from_task_a as the result/return value of the previous task (Task A) while defining the workflow. So, here we will use the signatures that allow us to provide a placeholder for the argument.
Signatures can be written as
task.signature(arg1, arg2...)
# or
task.s(arg1, arg2...)
Cases of Celery Work-flow Creation 💁
1. Tasks execution in Series (Chain)
It's a simple case where we want to execute one task after another. Either the previous task returns some value required by the current task or we need to create a workflow where tasks are executed in a sequence.
For this, we use chain. The chain function takes in a list of signatures.
Suppose we have to create a workflow similar to the above diagram. Task A returns a result which is then passed to Task B. The workflow will be as follows:
from celery import chain
from task import task_a, task_b, task_c
work_flow = chain(task_a(arg1, arg2), task_b(result, arg4, arg5), task_c(arg6, arg7))
# another way to express a chain using pipes
work_flow = task_a(arg1, arg2) | task_b(result, arg4) | task_c(arg5, arg6)
Notice the first argument of task_b is the result of task_a.
2. Tasks execution in Parallel (Group)
There can be a case where we require to execute tasks in parallel. We may run different tasks or the same task with distinct arguments in parallel.
We use Group to execute tasks in parallel. The group function takes in a list of signatures.
The scenarios where we can use Group are
- Waiting for responses from I/0 bound tasks like reading from DB or calling external API or downloading a file from external storage.
- Running heavy computations on different nodes.
from celery import group
from task import task_a, task_b, task_c
work_flow = group(task_a(arg1, arg2), task_b(arg4, arg5), task_c(arg6, arg7))
# execute same task with different arguments in group
work_flow = group(task_a(arg1), task_a(arg2), task_a(arg3))
3. Tasks execution both in Series and Group (Combination of Chain, Group and Cord)
Now let's talk about a case where we have a combination of tasks that need to be executed both in series and parallel. Considering the above workflow diagram as an example, we can further discuss the below sub-scenarios.
3.1 Task execution before Group Tasks
It's a case where we want to run group tasks after a task's execution. In other words, we need to chain a task with group tasks.
In this case, we use both group and chain. Observing the workflow diagram we have to execute Task 0 and after its execution, we need to run Task A, Task B, and Task C.
The work-flow can be written as,
from celery import group, chain
from task import task_zero, task_a, task_b, task_c
work_flow = chain(task_zero, group(task_a(result, arg1), task_b(result, arg2), task_c(result, arg3)))
# if we want to express a chain using pipes
work_flow = task_zero | group(task_a(result, arg1), task_b(result, arg2), task_c(result, arg3))
Notice how task_zero passed the result to tasks in the group.
3.2 Task execution after Group Tasks
After the group tasks get executed, another task aggregates the results they return. This task is callback (or callback task).
We use chord to define both group and callback tasks. It has two parts (header and callback). Header is a group of tasks whereas callback is a task that gets executed once the group task completes its execution. The results from group tasks are aggregated and passed to callback task as the first argument as a list.
The workflow diagram wants us to run Callback Task after Task A, Task B, and Task C.
Complete work-flow can be written as,
from celery import group, chain, chord
from task import task_zero, task_a, task_b, task_c
header = group(task_a(result, arg1), task_b(result, arg2), task_c(result, arg3))
callback = callback(result, argz)
work_flow = chain(task_zero, chord(header)(callback))
4. Generating Group tasks from Chain task (Map)
Here is an interesting case where we want to create a group from a list returned by a task, such that for each item in the task result set, a task is to the group.
Let me explain with the help of an example. Suppose we need to iterate through files in the S3 bucket (at the point of task execution) and download them to the server. For this, we define two tasks. The first task, get_files_from_bucket will generate and return a list of file paths in S3.
[
"s3://foo-bucket/path/to/file1",
"s3://foo-bucket/path/to/file2",
"s3://foo-bucket/path/to/file3",
]
Now for every path in the list, we will add a task download_file_from_s3 (another task responsible to download the file from S3 to the server) in a group. These group tasks can then run in parallel and download files to the server.
This is different from 3.1 since there we have clarity on tasks to be executed in the group while planning or defining the workflow. But here the group tasks to be executed are decided by the prior task.
In this case, we will use a Map. Map, unlike chord, chain, and group is a task itself. Map task is provided with result and task to which a list of arguments retrieved from the result is applied. In other words, map creates a temporary task where a list of arguments is applied to the task.
from celery import subtask, group, shared_task
@shared_task
def dmap(result, some_task):
# Map a task over an result and return as a group
# subtask is a task with parameters passed, but not yet started.
some_task = subtask(some_task)
# iterating over result and cloning task
group_task = group(
some_task.clone(
[
arg
]
)
for arg in result
)
return group_task()
from task import get_files_from_bucket, download_file_from_s3, dmap
work_flow = chain(get_files_from_bucket.s(), dmap.s(download_file_from_s3.s())
5. Conditional Tasks execution (Map)
It's a case where based on the result of a task we decide which task to execute next.
In the example of the previous section, we are only downloading a file from S3, we need to expand to other stores as well. We have 3 types of tasks,
- Task get_paths_from_db which will fetch file paths from the database. This task will return a list of paths.
- Task download_file_from_xxxx which downloads file from the given path to the server from respective storage. This task will return the path of the file where it is downloaded to the server.
- Task write_paths_to_file which will aggregate and print the paths of the downloaded file in a txt file. This task in turn acts as a chord.
from celery import subtask, group, shared_task
@shared_task
def dmap(result, end_task):
task_list = []
for arg in result:
storage = get_storage_from_path(arg)
if storage == "s3":
task = sub_task(download_file_from_s3.s(arg))
elif storage == "gcp":
task = sub_task(download_file_from_gcp.s(arg))
elif storage == "azure":
task = sub_task(download_file_from_azure.s(arg))
else:
pass
task_list.append(task)
group_task = group(task_list)
pipeline = group_task | end_task
return pipeline()
from task import dmap, get_paths_from_db, callback
work_flow = dmap.s(get_paths_from_db.s(), write_paths_to_file.s())
Thank you for reading. I am happy to take up suggestions in the comment section.
Cheers 🍻
Top comments (8)
Thank you for wonderful tutorial. It was helpful for me.
Hello! Great examples, really appreciate the level of detail.
Building off of example 5, I have a use case where the subtasks need to be run as soon as the args are ready VS when all tasks are defined (as shown in the example).
Is there a way to call
apply_async
on the tasks and then create a group from the list of AsyncResults?Thank you!
Hi,
have you any idea on this workflow?
assume that all t,v,x, p, q are a task
first_workflow = group(t1,chain(t2 --> v1 --> x1), t3, t4, chain(t5 --> v2 --> x2))
also
second_workflow = chain(group (t1, t2, t3, t4, t5) , p1, q1)
I want to run first_workflow and second_workflow together.
thanks a lot for your clear explanation!
Great work, thanks, but in the last example, how do you fetch the result of the end_task function?
When I do something
worflow.get()
I get a list of IDS, not the final resultsI tried number 4 but it didn't work for me.
The call i had to use to achieve the desired result was:
work_flow = chain(get_files_from_bucket.s(), dmap.s(download_file_from_s3.s()))
Hi,
thanks for pointing my mistake. I have corrected it now. We need to chain get_files_from_bucket with dmap. The result of get_files_from_bucket will be iterated and corresponding group tasks will be created.
really nice article.