A recent project required some scalability and specifically needed the ability to put long-running jobs in a queue. The jobs were doing some heavy lifting around transcoding, but really, we needed to come up with a plan for managing this job queue. We've had some experience with tools like rabbitmq and kue/redis, but ultimately, because we're a small team, we wanted to see if we could avoid having to manage all these moving pieces.
Since we use AWS for some of our infrastructure, we chose to use AWS Step Functions as our queue management tool. The core of our transcoding engine had to run in its own process on a windows machine on EC2, so we decided to wrap up the process inside of an API engine that would then hook into the AWS Step Function.
There are plenty of resources on describing what AWS Step Functions are, so I won't go into depth on that. In short, AWS offers the ability to put messages into a queue where they will sit until they are removed from the queue by a consuming application.
This tutorial will focus on demonstrating how to integrate AWS Step Functions as a way to queue large running processes wrapped up in a web API. For our API, we're very comfortable with using AdonisJS, a fantastic nodejs web framework that has been heavily influenced by the Laravel world.
I'll dive right in and assume that you were either able to download the source code to this tutorial or were able to setup a basic AdonisJS project.
Create an AWS Activity
Our example is just a demonstration that will require a single activity step. The goal here is to demonstrate sending data from a web form and adding it to the AWS State Machine that will then act as a queue. A consuming application will then grab the information sitting in the queue and then output what it finds to our console.
Activities are the small tasks that are the "workers" in the state machine.
Let's call our Activity:
DemoWorkerActivity
Create the State Machine
With the ARN of the Activity, we can create a State Machine document like the one below. There is only one step, so we'll call that step "DemoWorkerRun". Inside of the step will be a single activity "DemoWorkerActivity".
{
"Comment": "Demo State Machine - to show how my application interacts with the queue",
"StartAt": "DemoWorkerRun",
"States": {
"DemoWorkerRun": {
"Type": "Task",
"Resource": "arn:aws:states:us-east-1:XXXXXXXXXX:activity:DemoWorkerActivity",
"End": true
}
}
}
Once the state machine is created, AWS will give you a resource ARN for that:
In this example the state machine identifier is:
"arn:aws:states:us-east-1:XXXXXXXXXX:stateMachine:DemoWorkerStateMachine"
Core nodejs components:
QueueService - this service will be responsible for kicking off a state machine "execution" with an initial payload.
DemoWorker - the worker will act as the consumer of the activity the lives within the state machine. It is the point of contact between the payload and your underlying application.
QueueService
In our adonis project, we'll create a Services folder and create the app/Services/QueueService.js class. This class will start a state machine execution with the an initial custom payload. This class requires the aws-sdk to be installed. You can install it using the adonis tools:
adonis install aws-sdk
'use strict'
const AWS = require('aws-sdk')
const Env = use('Env')
class QueueService {
/**
- Returns result of StepFunctions.startExecution
-
@see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/StepFunctions.html#startExecution-property
*/
async enqueue (name, payload) {
const awsConfig = new AWS.Config({
accessKeyId: Env.get('AWS_ACCESS_KEY'),
secretAccessKey: Env.get('AWS_SECRET_KEY'),
region: Env.get('AWS_REGION')
})
// Initialize the AWS SDK
const StepFunctions = new AWS.StepFunctions(awsConfig)
// Executing the State Machine requires a custom name, some custom input and the ID of the State Machine from AWS
var params = {
name: name,
input: JSON.stringify(payload),
stateMachineArn: Env.get('AWS_STATEMACHINE_ARN')
}
// Execute StepFunction
// @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/StepFunctions.html#startExecution-property
let result = null
try {
result = await StepFunctions.startExecution(params).promise()
} catch (e) {
console.log('Error:', e)
}
return result
}
}
module.exports = QueueService
DemoWorker.js
Let's create a new folder app\Workers and add DemoWorker.js. This class is responsible for consuming the job that is pending inside the state machine. It requires the npm module 'step-function-worker'
Install it into your adonis project:
adonis install step-function-worker
'use strict'
const StepFunctionWorker = require('step-function-worker')
const AWS = require('aws-sdk')
const Env = use('Env')
const Logger = use('Logger')
/*
|--------------------------------------------------------------------------
| DemoWorker
|--------------------------------------------------------------------------
| @see https://github.com/piercus/step-function-worker
|
| For this class to work, a StepFunction activity must first be configured on AWS
*/
class DemoWorker extends StepFunctionWorker {
constructor () {
const options = {}
// Configure the connection to AWS
options.awsConfig = new AWS.Config({
accessKeyId: Env.get('AWS_ACCESS_KEY'),
secretAccessKey: Env.get('AWS_SECRET_KEY'),
region: Env.get('AWS_REGION')
})
// Function that interacts directly with the AWS StepFunction this must be defined
// The call back is responsible for letting the State Machine know it can either
// continue to the next step in execution or fail
options.fn = async (input, cb, heartbeat) => {
// Respond to StepFunction state machine
Logger.info('Custom Worker function:', input)
cb(null, input)
}
// the ID of the Step Function Activity arn:aws:states:us-east-1:XXXXXXXXXXXXX:activity:DemoWorkerActivity
options.activityArn = Env.get('AWS_ACTIVITY_ARN_DEMOWORKERACTIVITY')
super(options)
this._initCallbacks()
}
_initCallbacks () {
this.on('task', this.task)
this.on('ready', this.ready)
this.on('error', this.error)
this.on('failure', this.failure)
this.on('success', this.success)
}
/**
- Called when the worker "wakes up"
- The StepFunctionWorker parent class will pass in the payload
- @param {*} task
*/
task (task) {
// task.input contains the payload from the web
Logger.info('DemoWorker task:', task.input)
}
ready () {
Logger.info('DemoWorker is ready')
}
failure (failure) {
Logger.info('DemoWorker failure:', failure)
}
success (output) {
// output.input will contain the payload from the web
Logger.info('DemoWorker success:', output.input)
}
error (err) {
Logger.info('DemoWorker error:', err)
}
}
module.exports = DemoWorker
Demo Time
I'll skip ahead to our demo. We'll create a form where we will enter in a name for the State Machine Execution. You can customize the name given to the execution job. In our project we used the ObjectID of a MongoDB record that was responsible for maintaining the state of the queued job. Using a naming convention that helped us trace the execution object back to the object in our system that initialized the payload, made it easier to follow our system flow.
Let's fire up out adonis project:
adonis serve --dev
I can now access http://localhost:3333
There I have the following form:
After submitting the form let's take a look at the State Function execution job that has been queued up.
Now we have to fire up our DemoWorker to watch for our custom Activity to get executed.
adonis stepfunctionworker --name=DemoWorker
As soon as the DemoWorker starts, it pulls the pending execution job off the State Function queue.
Let's take another look at the State Machine and see that the execution completed succesfully.
And done! We've accomplished our goal of creating an AWS State Machine and having our custom application initiate and complete a small, but complete job.
Source code is at: https://github.com/openstepmedia/adonisjs-stepfunction-demo
Top comments (0)