1. Introduction & Proof-Of-Concept
When there is a long running task, there are usually below 2 requirements:
- As a user, I want to know the progress of the task
- As a user, I want to get the output of the task if it is finished
We will use the out of the box features Threading and Cache in Python and Django respectively to achieve this. Let's see the demo below:
The logic is as follow:
- Call API to start a task and get task id
- Use task id to continue to get the task's status
- Get the output if the task is finished
2. Steps
2.1. Basic Setup
- Create a Django site called
bgTaskEx
- Create an app called
bgTaskExApi
- Install Django REST framework (
pip install djangorestframework
) - Setup API
- Setup Django Cache
If you are not sure how to do it, you may check the below link:
If you setup correctly, you should see the below file structure:
2.2. Create a Task Handler class
- Create a file named
taskHandler.py
and copy the following code to the file
from enum import Enum
from django.core.cache import cache
import threading
from typing import Union
from uuid import uuid1
class TaskHandler:
def start_task(self, method, args):
task_progress = TaskProgress()
t = threading.Thread( target=method, args=[ *args, task_progress ] )
t.setDaemon(True)
t.start()
return task_progress.get_task_id()
@staticmethod
def get_task_progress( task_id : str ):
return cache.get( task_id )
class TaskProgress:
task_id = str
## default constructor
def __init__(self):
self.task_id = str( uuid1() )
cache.set( self.task_id, self, 3600 )
def set( self,
status : Enum,
progress_message : Union[ str, None ] = None,
output : Union[ str, None ] = None,) -> object:
self.status = status.value
self.progress_message = progress_message
self.output = output
cache.set( self.task_id, self, 3600 )
def get_task_id( self ):
return self.task_id
class Status(Enum):
STARTED = 'STARTED'
RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS'
TaskHandler has 3 classes:
- TaskHandler = start task and get task progress
- TaskProgress = store the task info in the cache
- Status = an enum to define the status
The start_task
process basically open a new thread to run the task and pass the task_progess
into the thread to log the process.
2.3. Create a time-consuming method
- Create a file named
longRunningMethod.py
and copy the following code to the file
from datetime import datetime
import time
from .taskHandler import Status
def long_running_method( input : str, task_progress ):
task_progress.set( Status.STARTED, progress_message="The process has been started" )
for i in range( 20 ):
time.sleep( 0.5 )
task_progress.set( Status.RUNNING, progress_message=f"{ 5 * i + 1 }% has been processed" )
output = f"[{ datetime.now() }] input= { input }, value from Django"
task_progress.set( Status.SUCCESS, output=output )
We give you an example of a time-consuming method, and you can change it to the method you like, but just keep in mind the following points:
- In the parameter, you MUST include
task_progress
- During task execution, please make sure that you have logged the task progress by
task_progress.set( Status.RUNNING, progress_message=f"{ 5 * i + 1 }% has been processed" )
- When the execution finished, please log the output by
task_progress.set( Status.SUCCESS, output=output )
2.4. Create API methods
Create the following 2 methods in views.py
@action(methods=['GET'], detail=False, name='Start the task to background' )
def start_long_running_task( self, request ):
input = request.GET[ 'input' ]
task_id = TaskHandler().start_task( long_running_method, [ input ] )
return JsonResponse({'task_id':task_id})
@action(methods=['GET'], detail=False, name='Get Task Progress' )
def get_task_progress( self, request ):
task_id = request.GET[ 'task_id' ]
task_progress : TaskProgress = TaskHandler.get_task_progress( task_id )
return JsonResponse( vars(task_progress) )
This 2 methods are the API endpoint of the method start_task
and get_task_progress
in TaskHandler
respectively.
That's it.🎉👏
Additionally, you may add a test for those API methods in tests.py
.
import re
import time
from django.test import SimpleTestCase
class MyTests(SimpleTestCase):
def test_long_running_task(self):
input = 'aaaaaa'
print( f'{ input= }' )
task_id = self.__start_task( input )
print( f'{ task_id= }' )
while True:
time.sleep( 1 )
result_dict = self.__get_task_progress_response( task_id )
if result_dict[ 'status' ] == "SUCCESS":
self.print_output(result_dict)
break
self.print_progress_message(result_dict)
def print_output(self, result_dict):
status = result_dict[ 'status' ]
output = result_dict[ "output" ]
print( f'{status=}, { output= }' )
def print_progress_message(self, result_dict):
status = result_dict[ 'status' ]
progress_message = result_dict[ 'progress_message' ]
print( f'{ status= },{ progress_message= }' )
def __start_task( self, input ):
res = self.client.get( f'/bgTaskExAPI/start_long_running_task/?input={ input }' )
self.assertEqual( res.status_code, 200 )
task_id = res.json()[ 'task_id' ]
UUID_V1_PATTERN = re.compile( '[a-f0-9]{8}-[a-f0-9]{4}-1[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$', re.IGNORECASE)
self.assertEqual( UUID_V1_PATTERN.match( task_id ) is not None, True )
return task_id
def __get_task_progress_response( self, task_id : str ):
res = self.client.get( f'/bgTaskExAPI/get_task_progress/?task_id={ task_id }' )
self.assertEqual( res.status_code, 200 )
return res.json()
3. Source code
ivanyu199012/5.-Django_background_task
4. Reason of this approach
During development, you may face the following problem:
You would like to run a task in Django but it will take a long time. When the task is running, user need to check the process status continuously.
The simple request-response approach cannot solve this problem as when the request is sent to Django, if the task is still running, there will be no feedback from the server until it is finished or unfortunately, prompts out error.
After doing some google search, the logic of the existing solutions are about the same (mentioned in the logic section), but there are tones of different libraries for this.
Celery is one of those libraries. It is a popular framework with rich features, which can help you to handle a lot of different long-running tasks in a large scale project.
However, if you have only one or two long-running tasks and the project scale is small, this may not be a good approach as:
- Celery + Django + Redis / RabbitMQ is not easy to setup
- Celery mostly need to use Redis or RabbitMQ, but if you used Windows, you can only use RabbitMQ (need installation😢) (If you insist to use Redis in Windows, yes you can but you need do it through WSL or Docker. Again more complicated😢)
In this case, using Celery to like hitting a fly with a cannon, a little bit too much for it. And for the other libraries, you also need to install it if you want to use it.
I just wonder whether we can use what is existing in Django with a small amount of code to solve this problem.
In python - Simple approach to launching background task in Django - Stack Overflow, nbwoodward proposed an answer for the problem.
In his example, he saved the task progress in the database but not the cache.
- If you need the task progress to be stored persistently, saving it to database will be a good choice.
- Otherwise, using Django cache is easier as you do not need to set up a model and migration.
Top comments (0)