Introduction
We usually need to accelerate our tasks without using a lot of resources. It's now possible on Cloud Run. Jobs is a serverless brand-new feature of Cloud Run that is GA since March 23rd 2023. In this article, I will first compare Cloud run and Cloud Functions (1st gen and 2nd gen), then I will explain how Cloud Run jobs works. Then, I will show some use cases where you could use Cloud Run jobs instead of other serverless options. Finally, there will be a basic demo to apply what is explained in the previous parts.
Comparison between serverless options
This is a diagram that shows which product is more suitable depending on the job to perform.
How does it work ?
Cloud Run jobs can execute a single task or a group of tasks as well. When you create a job, you set the number of tasks that contains your job. This number is saved as an environment variable that you can directly use in your code without defining it yourself. Each task is identified by its index, starting from 0 which is also saved as an environment variable directly useable in the code. So, after the creation of your job, Cloud Run creates 2 environment variables which are:
- CLOUD_RUN_TASK_COUNT: which is the total number of tasks of the job
- CLOUD_RUN_TASK_INDEX: which is the index of the current task
These environment variables are not visible on the job's configuration page. Theirs names are conventional.
When creating a job, you must select an image to use. This image could be stored on Artifact registry, or Docker. For other container registries, follow the steps described on this page. But Google recommends to use Artifact registry. If you face an issue about violated constraint (low carbon), follow the step described in my previous article.
A job can be split into up to 10,000 tasks. Each task creates a new instance of the image and run independently of the others. If a task fails, the job fails too, even if all the others ended successfully.
Use cases examples of Cloud Run jobs.
1- Large dataset
Let's suppose a situation where we have to process a large dataset of 1milion lines. Cloud Run jobs could help us to split the dataset into several smaller datasets and process them separately. Thereby, we can split the job into 100 tasks and process 10000 lines per task.
2- Replications
Imagine that we want to replicate data from 3 external databases to cloud storage. You can do it with a single cloud run job and assign a task per database. So, depending on the index of the task, the corresponding database credentials will be used, without duplicating code.
3- Unsupported language
Cloud functions supports only 7 languages (Node.js, Python, Go, Java, C#, Ruby and PHP). So, you won't be able to use cloud functions with a bash code. One of the advantages of Cloud run jobs is that the code language does not matter because it uses image's containers. Thereby, you just have to create your image and set an entry point.
We can imagine a lot of use cases of cloud run jobs. Now let's jump into an example to show you directly how to use it from the console.
Example
In this example, we will create a cloud run job with 5 tasks. The python code writes the result of a BigQuery query on cloud storage. The BigQuery dataframe result will be split in 5 parts and each part will be written in a separated file in csv format.
1- Let's write the code
If you use the same code to test, do not forget to set your environment variables when creating the job. The python code below:
# librairies imports
import os
import pandas as pd
import numpy as np
from google.cloud import bigquery, storage
from dotenv import load_dotenv
def run_query(project_id, dataset, table):
# create a bigquery client
client = bigquery.Client()
query = f"""SELECT *
FROM `{project_id}.{dataset}.{table}`
LIMIT 1000
"""
#Notes :
#avoid SELECT * in real problems. we use it here just to illustrate
#LIMIT 1000 does not have impact on the cost, the same amount of data are retrieved but filtered in the result.
# run the sql query
query_job = client.query(query)
# we convert the iterator object into pandas dataframe
rows = []
for row in query_job.result():
rows.append(dict(row.items()))
df = pd.DataFrame(rows)
return df, len(df)
if __name__ == '__main__':
# we load all the environment variables
load_dotenv()
# we get all the environment variables
project_id = os.environ.get("PROJECT_ID")
bucket_name = os.environ.get("BUCKET_NAME")
dataset = os.environ.get("DATASET")
table = os.environ.get("TABLE")
index = int(os.environ.get("CLOUD_RUN_TASK_INDEX"))
nb_task = int(os.environ.get("CLOUD_RUN_TASK_COUNT"))
# the filename root
filename = "test-parallel-task"
# we run the query and get the result as a dataframe and the length of the dataframe
data, n = run_query(project_id, dataset, table)
# the length of each task dataframe
len_task_df = n//nb_task
begin = index*len_task_df
end = begin + len_task_df if index != nb_task-1 else n #we write like this to avoid data loss in case of imperfect division
# we write the corresponding file on cloud storage
data[begin:end].to_csv(f'gs://{bucket_name}/{filename}_{index}.csv')
As you can see in the last line of the code, I'm writing directly on cloud storage using pandas. This is only possible if you add the gcsfs
library in your requirements.txt
. Your requirements.txt
should look like below.
gcsfs==2023.6.0
google-cloud-bigquery==3.11.1
google-cloud-storage==2.9.0
numpy==1.24.3
pandas==2.0.2
python-dotenv==1.0.0
Note : you can use any other language you want to perform this. But for this case, you can only use supported languages for GCP client API.
2- Image creation
To create the image to use, let's write the Dockerfile first:
# we use the version 3.10 of the python image
FROM python:3.10
# we define a work directory
WORKDIR /app
# we copy the code dir into the work directory
COPY requirements.txt /app
# we install the dependencies
RUN pip install --no-cache-dir -r requirements.txt
# we copy the code dir into the work directory
COPY . /app
# we execute the code with the following command
CMD [ "python", "main.py" ]
Then, build your image. If you use GCP Artifact registry, follow the part 1 and 2 of my previous article to build your image.
3- Job creation
From the GCP console, search Cloud Run, select the jobs tab and click on create job
Then, fill the first part of the form. If you use GCP Artifact registry, use the SELECT button to browse and find your image. In the number of tasks field, enter 5.
Now, click on the arrow to reveal the config part. Switch between tabs to configure your job as you want and click on create.
Once created, your job should appear in the job list when you select the JOBS tab on cloud run homepage.
Click on the job and switch between tabs to see job info. The tab History is empty because there is no execution yet. To set a trigger, click on the trigger tab and schedule your job.
Click on EXECUTE to start the job and return to the history tab to see the changes. You should see an execution in progress. If you click on the execution, you will see the progress of each task execution. To check the parallelism, you can click on each task to see the start time. You can also check the logs of each task separately for debug purposes.
Once the job is completed, we can check the result on cloud storage to verify if the files have been created as expected.
We can see that the suffix of the file names are the indexes of the tasks. In the other hand, we can also see the creation date of the files. We see that 3 of them have been created at the same time because of parallelism. Now, open the file and verify if the contents are what is expected, depending on the index of the task.
This example is just a basic one to help you understand how it works. We can perform more complex tasks with it, as described in the use cases part.
Hope this article will help ๐
Top comments (4)
Good job ๐
If I may : you can use code highlighting on dev.to : dev.to/hoverbaum/how-to-add-code-h...
Hey Benoรฎt,
Thanks for sharing
Thanks for sharing!
I have some doubts. How and where you are setting environment variables? like bucket_name, dataset, table, etc.
Can we create different job executions with different bucket and table names?
This is not a good demo, no parallelism demonstrated. The main function runs one job that updates a 1000 records.