DEV Community

Akarshan Gandotra
Akarshan Gandotra

Posted on • Updated on

The Curious Case of Celery Work-flows

Hello Community,
While most humans would take a break, I decided to skip rest and spend the next few hours (on a Saturday) discussing the implementation of the different cases in Celery Work-flows.
I find documentation a little hard to follow, hence decided to write this blog.

When Celery work-flow?

If celery tasks have some dependency on one another based on the order of execution. The order of execution can be decided based on:

  • A task needs to be run before another
  • A task needs to return some data critical for the execution of another.
  • Tasks need to be executed in parallel to optimize performance.
  • Conditional execution of task based on the result of previous one.

Celery is a powerful task queue that enables a more complex workflow than executing a task. Work-flow enables us to orchestrate various tasks.

Back to Basics ⚡️

Let's take a refresher course on Celery. The first thing we are going to discuss is terms relevant in the context of this blog.

Task vs Subtask vs Sharedtask

Task vs Subtask
Task vs Sharedtask

Signature

Let us consider an example where we have to create a Celery Work-flow where we want two tasks say Task_A and Task_B executed one after another. Task_A returns a result to be consumed or required by Task_B.

Image description

We won't be having result_from_task_a as the result/return value of the previous task (Task A) while defining the workflow. So, here we will use the signatures that allow us to provide a placeholder for the argument.
Signatures can be written as



task.signature(arg1, arg2...)
# or
task.s(arg1, arg2...)


Enter fullscreen mode Exit fullscreen mode

Cases of Celery Work-flow Creation 💁

1. Tasks execution in Series (Chain)

Image description

It's a simple case where we want to execute one task after another. Either the previous task returns some value required by the current task or we need to create a workflow where tasks are executed in a sequence.

For this, we use chain. The chain function takes in a list of signatures.

Suppose we have to create a workflow similar to the above diagram. Task A returns a result which is then passed to Task B. The workflow will be as follows:



from celery import chain
from task import task_a, task_b, task_c

work_flow = chain(task_a(arg1, arg2), task_b(result, arg4, arg5), task_c(arg6, arg7))

# another way to express a chain using pipes

work_flow = task_a(arg1, arg2) | task_b(result, arg4) | task_c(arg5, arg6)


Enter fullscreen mode Exit fullscreen mode

Notice the first argument of task_b is the result of task_a.

2. Tasks execution in Parallel (Group)

Image description

There can be a case where we require to execute tasks in parallel. We may run different tasks or the same task with distinct arguments in parallel.
Image description

We use Group to execute tasks in parallel. The group function takes in a list of signatures.

The scenarios where we can use Group are

  • Waiting for responses from I/0 bound tasks like reading from DB or calling external API or downloading a file from external storage.
  • Running heavy computations on different nodes.


from celery import group
from task import task_a, task_b, task_c

work_flow = group(task_a(arg1, arg2), task_b(arg4, arg5), task_c(arg6, arg7))

# execute same task with different arguments in group
work_flow = group(task_a(arg1), task_a(arg2), task_a(arg3))


Enter fullscreen mode Exit fullscreen mode

3. Tasks execution both in Series and Group (Combination of Chain, Group and Cord)

Image description

Now let's talk about a case where we have a combination of tasks that need to be executed both in series and parallel. Considering the above workflow diagram as an example, we can further discuss the below sub-scenarios.

3.1 Task execution before Group Tasks

It's a case where we want to run group tasks after a task's execution. In other words, we need to chain a task with group tasks.
In this case, we use both group and chain. Observing the workflow diagram we have to execute Task 0 and after its execution, we need to run Task A, Task B, and Task C.

The work-flow can be written as,



from celery import group, chain
from task import task_zero, task_a, task_b, task_c

work_flow = chain(task_zero, group(task_a(result, arg1), task_b(result, arg2), task_c(result, arg3)))

# if we want to express a chain using pipes
work_flow = task_zero | group(task_a(result, arg1), task_b(result, arg2), task_c(result, arg3))


Enter fullscreen mode Exit fullscreen mode

Notice how task_zero passed the result to tasks in the group.

3.2 Task execution after Group Tasks

After the group tasks get executed, another task aggregates the results they return. This task is callback (or callback task).
We use chord to define both group and callback tasks. It has two parts (header and callback). Header is a group of tasks whereas callback is a task that gets executed once the group task completes its execution. The results from group tasks are aggregated and passed to callback task as the first argument as a list.
The workflow diagram wants us to run Callback Task after Task A, Task B, and Task C.

Complete work-flow can be written as,



from celery import group, chain, chord
from task import task_zero, task_a, task_b, task_c

header = group(task_a(result, arg1), task_b(result, arg2), task_c(result, arg3))

callback = callback(result, argz)

work_flow = chain(task_zero, chord(header)(callback))


Enter fullscreen mode Exit fullscreen mode

4. Generating Group tasks from Chain task (Map)

Image description

Here is an interesting case where we want to create a group from a list returned by a task, such that for each item in the task result set, a task is to the group.

Let me explain with the help of an example. Suppose we need to iterate through files in the S3 bucket (at the point of task execution) and download them to the server. For this, we define two tasks. The first task, get_files_from_bucket will generate and return a list of file paths in S3.



[
    "s3://foo-bucket/path/to/file1",
    "s3://foo-bucket/path/to/file2",
    "s3://foo-bucket/path/to/file3",
]


Enter fullscreen mode Exit fullscreen mode

Now for every path in the list, we will add a task download_file_from_s3 (another task responsible to download the file from S3 to the server) in a group. These group tasks can then run in parallel and download files to the server.
This is different from 3.1 since there we have clarity on tasks to be executed in the group while planning or defining the workflow. But here the group tasks to be executed are decided by the prior task.

In this case, we will use a Map. Map, unlike chord, chain, and group is a task itself. Map task is provided with result and task to which a list of arguments retrieved from the result is applied. In other words, map creates a temporary task where a list of arguments is applied to the task.



from celery import subtask, group, shared_task

@shared_task
def dmap(result, some_task):
    # Map a task over an result and return as a group

    # subtask is a task with parameters passed, but not yet started.
    some_task = subtask(some_task)
    # iterating over result and cloning task

    group_task = group(
        some_task.clone(
            [
                arg
            ]
        )
        for arg in result
    )
    return group_task()


Enter fullscreen mode Exit fullscreen mode


from task import get_files_from_bucket, download_file_from_s3, dmap

work_flow = chain(get_files_from_bucket.s(), dmap.s(download_file_from_s3.s())


Enter fullscreen mode Exit fullscreen mode

5. Conditional Tasks execution (Map)

It's a case where based on the result of a task we decide which task to execute next.

In the example of the previous section, we are only downloading a file from S3, we need to expand to other stores as well. We have 3 types of tasks,

  1. Task get_paths_from_db which will fetch file paths from the database. This task will return a list of paths.
  2. Task download_file_from_xxxx which downloads file from the given path to the server from respective storage. This task will return the path of the file where it is downloaded to the server.
  3. Task write_paths_to_file which will aggregate and print the paths of the downloaded file in a txt file. This task in turn acts as a chord.

Image description



from celery import subtask, group, shared_task

@shared_task
def dmap(result, end_task):
    task_list = []
    for arg in result:
        storage = get_storage_from_path(arg)
        if storage == "s3":
            task = sub_task(download_file_from_s3.s(arg))
        elif storage == "gcp":
            task = sub_task(download_file_from_gcp.s(arg))
        elif storage == "azure":
            task = sub_task(download_file_from_azure.s(arg))
        else:
            pass
        task_list.append(task)

    group_task = group(task_list)
    pipeline = group_task | end_task
    return pipeline()


Enter fullscreen mode Exit fullscreen mode


from task import dmap, get_paths_from_db, callback

work_flow = dmap.s(get_paths_from_db.s(), write_paths_to_file.s())


Enter fullscreen mode Exit fullscreen mode

Thank you for reading. I am happy to take up suggestions in the comment section.

Cheers 🍻

Top comments (8)

Collapse
 
princessmiracle profile image
Nadiia Heckman

Thank you for wonderful tutorial. It was helpful for me.

Collapse
 
claytonsmith profile image
Clayton Smith

Hello! Great examples, really appreciate the level of detail.

Building off of example 5, I have a use case where the subtasks need to be run as soon as the args are ready VS when all tasks are defined (as shown in the example).

Is there a way to call apply_async on the tasks and then create a group from the list of AsyncResults?

Thank you!

Collapse
 
adelminayi profile image
adelminayi

Hi,
have you any idea on this workflow?
assume that all t,v,x, p, q are a task

first_workflow = group(t1,chain(t2 --> v1 --> x1), t3, t4, chain(t5 --> v2 --> x2))
also
second_workflow = chain(group (t1, t2, t3, t4, t5) , p1, q1)

I want to run first_workflow and second_workflow together.

Collapse
 
alfmateos profile image
alfmateos

thanks a lot for your clear explanation!

Collapse
 
christiansicari profile image
Christian Sicari

Great work, thanks, but in the last example, how do you fetch the result of the end_task function?
When I do something worflow.get() I get a list of IDS, not the final results

Collapse
 
adaschevici profile image
Artur Daschevici • Edited

I tried number 4 but it didn't work for me.

The call i had to use to achieve the desired result was:

work_flow = chain(get_files_from_bucket.s(), dmap.s(download_file_from_s3.s()))

Collapse
 
akarshan profile image
Akarshan Gandotra

Hi,
thanks for pointing my mistake. I have corrected it now. We need to chain get_files_from_bucket with dmap. The result of get_files_from_bucket will be iterated and corresponding group tasks will be created.

Collapse
 
redliu312 profile image
redliu312

really nice article.