Hello everyone,
This Article is the third part of the series Youtube GIF Maker Using Next.js, Node and RabbitMQ.
In this article we will dive into building the worker service of our Youtube to GIF converter. This Article will contain some code snippets but the whole project can be accessed on github which contains the full source code. You can also view the app demo. The following topics will be covered here
- Functionalities
- Flow Chart
- Implementation
- RabbitMQ Service
- Conversion Service
- Closing Thoughts
Functionalities
As you can see, the service worker is responsible for:
- Consuming tasks from the task queue
- Converting a part of a youtube video to a GIF
- Uploading the GIF to a cloud storage
- Updating the job gifUrl and status in database
Flow Chart
This flow chart will simplify how the service worker is works
Implementation
RabbitMQ Service
Consuming Tasks From the Queue
Just like the RabbitMQ Service from the backend server in the previous part of this series, the RabbitMQ Service in the service worker is similar except for one single function, startConsuming()
//rabbitmq.service.ts
import amqp, { Channel, Connection, ConsumeMessage } from 'amqplib';
import Container, { Service } from 'typedi';
import { Job } from '../entities/jobs.entity';
import ConversionService from './conversion.service';
@Service()
export default class RabbitMQService {
private connection: Connection;
private channel: Channel;
private queueName = 'ytgif-jobs';
constructor() {
this.initializeService();
}
private async initializeService() {
try {
await this.initializeConnection();
await this.initializeChannel();
await this.initializeQueues();
await this.startConsuming();
} catch (err) {
console.error(err);
}
}
private async initializeConnection() {
try {
this.connection = await amqp.connect(process.env.NODE_ENV === 'production' ? process.env.RABBITMQ_PROD : process.env.RABBITMQ_DEV);
console.info('Connected to RabbitMQ Server');
} catch (err) {
throw err;
}
}
private async initializeChannel() {
try {
this.channel = await this.connection.createChannel();
console.info('Created RabbitMQ Channel');
} catch (err) {
throw err;
}
}
private async initializeQueues() {
try {
await this.channel.assertQueue(this.queueName, {
durable: true,
});
console.info('Initialized RabbitMQ Queues');
} catch (err) {
throw err;
}
}
public async startConsuming() {
const conversionService = Container.get(ConversionService);
this.channel.prefetch(1);
console.info(' 🚀 Waiting for messages in %s. To exit press CTRL+C', this.queueName);
this.channel.consume(
this.queueName,
async (msg: ConsumeMessage | null) => {
if (msg) {
const job: Job = JSON.parse(msg.content.toString());
console.info(`Received new job 📩 `, job.id);
try {
await conversionService.beginConversion(
job,
() => {
this.channel.ack(msg);
},
() => {
this.channel.reject(msg, false);
},
);
} catch (err) {
console.error('Failed to process job', job.id, err);
}
}
},
{
noAck: false,
},
);
}
}
startConsuming() will consume a message from the queue, parse its JSON object and then delegate the conversion process to the ConversionService.
All the ConversionService needs to do the conversion process is the Job object as well as two callbacks used to either acknowledge or reject the message from the queue (Will be discussed below).
Also notice that in this example we use
this.channel.prefetch(1);
We will talk about this at the end of this part of the series and what it means
Message Acknowledgment
To remove a task from the queue (indicating that the service successfully processed the task either negatively or positively) we need to do manual acknowledgment.
This can be done in amqplib by using either
channel.ack(msg);
To indicate a positive message acknowledgement
or
// Second parameter specifies whether to re-queue the message or not
channel.reject(msg, false);
To indicate a negative message acknowledgement.
Notice that on error we do not re-queue the message back to the queue and we consider it as a 'failed conversion'. But this can be left up to the programmer to handle.
See more on RabbitMQ Message Acknowledgement
Conversion Service
This service contains the core logic of our service worker.
It exposes a function beginConversion() that is called from the RabbitMQ Service when consuming a message
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
...
}
This function will perform all the steps necessary for the conversion, it will then call either onSuccess() or onError() depending on its success or failure.
These are the steps neccesary for converting a youtube video to a GIF:
- Downloading Youtube Video
- The youtube video is downloaded locally
- Converting downloaded video to GIF
- The video is converted into a GIF (only the selected range by start/end times is converted)
- Uploading GIF to Google Cloud Storage
- Updating the database
- call onSuccess() or onError() accordingly
Lets start by downloading the youtube video locally
Downloading Youtube Video
to download the youtube video locally, we use the go-to package for that task, ytdl-core.
a function downloadVideo() is responsible for this, it takes the youtube video url/id and returns a ReadableStream that we can use to save the video file locally as well as its extension i.e: mp4, avi..etc
//conversion.service.ts
import { Readable } from 'stream';
import ytdl from 'ytdl-core';
import YoutubeDownload from '../common/interfaces/YoutubeDownload';
private async downloadVideo({ youtubeId, youtubeUrl }: YoutubeDownload): Promise<{ video: Readable ; formatExtension: string }> {
const info = await ytdl.getInfo(youtubeId);
const format: ytdl.videoFormat = info.formats[0];
if (!format) throw new Error('No matching format found');
const video = ytdl(youtubeUrl, {
format,
});
return { video, formatExtension: format.container };
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
console.info('Started Processing Job :', job.id);
const { video, formatExtension } = await this.downloadVideo({
youtubeId: job.youtubeId,
youtubeUrl: job.youtubeUrl,
});
const srcFileName = `./src/media/temp.${formatExtension}`;
video.on('progress', (chunkLength, downloaded, total) => {
//... Logic for showing progress to the user..i.e progress bar
});
video.pipe(
fs
.createWriteStream(srcFileName)
.on('open', () => {
//Video download started
console.log('Downloading Video');
})
.on('finish', async () => {
//Video finished downloading locally in srcFileName
console.info('Downloaded video for job ', job.id);
//...Logic for converting the locally downloaded video to GIF
})
.on('error', async () => {
//...handle failure logic
}),
);
} catch (err) {
//...handle failure logic
}
}
Converting Video to GIF
To convert local videos to GIFs we will use ffmpeg.wasm which is essentially a Webassembly port of FFmpeg. So you can think of this process as using FFmpeg inside node asynchronously to do the conversion. no spawning external processes, no dependent tools ...etc which is very powerful and simple.
//conversion.service.ts
import { createFFmpeg, fetchFile, FFmpeg } from '@ffmpeg/ffmpeg';
import GifConversion from '../common/interfaces/GifConversion';
//...somewhere in our code
const ffmpeg = createFFmpeg({
log: false,
progress: p => {
progressBar.update(Math.floor(p.ratio * 100));
},
});
await ffmpeg.load();
//Converts a video range to GIF from srcFileName to destFileName
private async convertToGIF({ startTime, endTime, srcFileName, destFileName, formatExtension }: GifConversion) {
try {
console.info('Converting Video to GIF');
this.ffmpeg.FS('writeFile', `temp.${formatExtension}`, await fetchFile(srcFileName));
await this.ffmpeg.run(
'-i',
`temp.${formatExtension}`,
'-vcodec',
'gif',
'-ss',
`${startTime}`,
'-t',
`${endTime - startTime}`,
'-vf',
'fps=10',
`temp.gif`,
);
await fs.promises.writeFile(destFileName, this.ffmpeg.FS('readFile', 'temp.gif'));
console.info('Converted video to gif');
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
console.info('Started Processing Job :', job.id);
const srcFileName = `./src/media/temp.${formatExtension}`;
const destFileName = `./src/media/temp.gif`;
//... Video download logic
// GIF Conversion
await this.convertToGIF({
startTime: job.startTime,
endTime: job.endTime,
srcFileName,
destFileName,
formatExtension,
});
} catch (err) {
//...handle failure logic
}
}
Uploading GIF to Google Cloud Storage
After the local video file is converted to a GIF, we can finally upload it to Google Cloud Storage.
First we will have a CloudStorageService that's responsible for just that!
in our case we use Google Cloud Storage.
import { Storage } from '@google-cloud/storage';
import * as _ from 'lodash';
import { Service } from 'typedi';
@Service()
class CloudStorageService {
private storage;
private BUCKET_NAME;
constructor() {
const privateKey = _.replace(process.env.GCS_PRIVATE_KEY, new RegExp('\\\\n', 'g'), '\n');
this.BUCKET_NAME = 'yourbucketname';
this.storage = new Storage({
projectId: process.env.GCS_PROJECT_ID,
credentials: {
private_key: privateKey,
client_email: process.env.GCS_CLIENT_EMAIL,
},
});
}
async uploadGif(gifImage: Buffer, uploadName: string) {
try {
const bucket = await this.storage.bucket(this.BUCKET_NAME);
uploadName = `ytgif/${uploadName}`;
const file = bucket.file(uploadName);
await file.save(gifImage, {
metadata: { contentType: 'image/gif' },
public: true,
validation: 'md5',
});
return `https://storage.googleapis.com/${this.BUCKET_NAME}/${uploadName}`;
} catch (err) {
throw new Error('Something went wrong while uploading image');
}
}
}
export default CloudStorageService;
we can now use it like that to upload the generated GIF
//conversion.service.ts
import Container from 'typedi';
import CloudStorageService from './cloudStorage.service';
private async uploadGifToCloudStorage(destFileName, uploadName): Promise<string> {
try {
console.info('Uploading gif to cloud storage');
const gifImage = await fs.promises.readFile(destFileName);
const cloudStorageInstance = Container.get(CloudStorageService);
const gifUrl = await cloudStorageInstance.uploadGif(gifImage, `gifs/${uploadName}`);
return gifUrl;
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
const destFileName = `./src/media/temp.gif`;
//... Video download logic
//... Video conversion logic
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
} catch (err) {
//...handle failure logic
}
}
Handling success/failure
Handling success and failure is pretty simple. First, we have to update the job in the database
In case of success:
Set the job status to 'done' and update the gifUrl to the uploaded gif to Google Cloud Storage.
In case of failure:
Set the job status to 'error'
After that we will call onSuccess() or onError() which essentially will handle the positive/negative RabbitMQ message acknowledgment
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
const destFileName = `./src/media/temp.gif`;
//... Video download logic
//... Video conversion logic
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
//Success scenario
await this.jobService.updateJobById(job.id as any, { status: 'done', gifUrl });
console.info(`Finished job ${job.id}, gif at ${gifUrl}`);
onSuccess();
} catch (err) {
//Failure scenario
console.error('Failed to process job', job.id);
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
}
}
Putting it all together
Putting it all together as well as adding a cli progress by with cli-progress the ConversionService looks like this
import Container, { Service } from 'typedi';
import JobsService from './jobs.service';
import ytdl from 'ytdl-core';
import { Readable } from 'stream';
import { Job } from '../entities/jobs.entity';
import { createFFmpeg, fetchFile, FFmpeg } from '@ffmpeg/ffmpeg';
import fs from 'fs';
import cliProgress from 'cli-progress';
import CloudStorageService from './cloudStorage.service';
import GifConversion from '../common/interfaces/GifConversion';
import YoutubeDownload from '../common/interfaces/YoutubeDownload';
const progressBar = new cliProgress.SingleBar({}, cliProgress.Presets.shades_classic);
@Service()
export default class ConversionService {
private ffmpeg: FFmpeg = null;
constructor(private jobService = new JobsService()) {}
public async initializeService() {
try {
this.ffmpeg = createFFmpeg({
log: false,
progress: p => {
progressBar.update(Math.floor(p.ratio * 100));
},
});
await this.ffmpeg.load();
} catch (err) {
console.error(err);
}
}
private async downloadVideo({ youtubeId, youtubeUrl }: YoutubeDownload): Promise<{ video: Readable; formatExtension: string }> {
const info = await ytdl.getInfo(youtubeId);
const format: ytdl.videoFormat = info.formats[0];
if (!format) throw new Error('No matching format found');
const video = ytdl(youtubeUrl, {
format,
});
return { video, formatExtension: format.container };
}
private async convertToGIF({ startTime, endTime, srcFileName, destFileName, formatExtension }: GifConversion) {
try {
console.info('Converting Video to GIF');
this.ffmpeg.FS('writeFile', `temp.${formatExtension}`, await fetchFile(srcFileName));
progressBar.start(100, 0);
await this.ffmpeg.run(
'-i',
`temp.${formatExtension}`,
'-vcodec',
'gif',
'-ss',
`${startTime}`,
'-t',
`${endTime - startTime}`,
'-vf',
'fps=10',
`temp.gif`,
);
progressBar.stop();
await fs.promises.writeFile(destFileName, this.ffmpeg.FS('readFile', 'temp.gif'));
console.info('Converted video to gif');
} catch (err) {
throw err;
}
}
private async uploadGifToCloudStorage(destFileName, uploadName): Promise<string> {
try {
console.info('Uploading gif to cloud storage');
const gifImage = await fs.promises.readFile(destFileName);
const cloudStorageInstance = Container.get(CloudStorageService);
const gifUrl = await cloudStorageInstance.uploadGif(gifImage, `gifs/${uploadName}`);
return gifUrl;
} catch (err) {
throw err;
}
}
public async beginConversion(job: Job, { onSuccess, onError }: { onSuccess: () => void; onError: () => void }) {
try {
await this.jobService.updateJobById(job.id as any, { status: 'processing' });
console.info('Started Processing Job :', job.id);
const { video, formatExtension } = await this.downloadVideo({
youtubeId: job.youtubeId,
youtubeUrl: job.youtubeUrl,
});
const srcFileName = `./src/media/temp.${formatExtension}`;
const destFileName = `./src/media/temp.gif`;
video.on('progress', (chunkLength, downloaded, total) => {
let percent: any = downloaded / total;
percent = percent * 100;
progressBar.update(percent);
});
video.pipe(
fs
.createWriteStream(srcFileName)
.on('open', () => {
console.log('Downloading Video');
progressBar.start(100, 0);
})
.on('finish', async () => {
progressBar.stop();
console.info('Downloaded video for job ', job.id);
await this.convertToGIF({
startTime: job.startTime,
endTime: job.endTime,
srcFileName,
destFileName,
formatExtension,
});
const gifUrl = await this.uploadGifToCloudStorage(destFileName, job.id);
await this.jobService.updateJobById(job.id as any, { status: 'done', gifUrl });
console.info(`Finished job ${job.id}, gif at ${gifUrl}`);
onSuccess();
})
.on('error', async () => {
progressBar.stop();
console.error('Failed to process job', job.id);
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
}),
);
} catch (err) {
await this.jobService.updateJobById(job.id as any, { status: 'error' });
onError();
throw err;
}
}
}
Closing Thoughts
Remember how we used channel.prefetch(1) when we started consuming from the queue
this.channel.prefetch(1);
What this does it makes sure that each queue consumer gets only on message at a time. This ensures that the load will be distributed evenly among our consumers and whenever a consumer is free it will be ready to process more tasks.
Read more about this from RabbitMQ Docs.
This also mean that if we want to scale our conversion process jobs/worker services we can add more replicas of this service.
Read more about this Competing Consumers
Thats it for our service worker! Now we can start digging into the client side of the app!.
Remember that the full source code can be viewed on the github repository
In the next part of the series we will see how we can implement The Next.js Client which will send gif conversion requests and view converted GIFs!.
Top comments (2)
Great post series man, thank you
Thanks for the kind comment man