DEV Community

Cover image for Automating ETL Processes with Node.js
Emmanuel Umeh
Emmanuel Umeh

Posted on

Automating ETL Processes with Node.js

The Extract, transfer and load(ETL) processes is the process of extracting, transforming and loading data from multiple source into a centralized container known as data warehouse. This data warehouse is what serve as the repository where all the exchanged and integrated data are been stored and kept for analysis.

By automating these processes, Node.js, a scalable JavaScript runtime language, made it easier for developers to possibly automate those processes with sequential lines of code, thereby creating efficient data pipelines that streamline data integration and analysis.

In this tutorial guide, we'll explore how to efficiently automate ETL processes with Node.js for efficient data pipelines.

Prerequisites:

To efficiently automate ETL processes with Node.js for efficient data pipelines, the following prerequisite is necessary:

Nodejs

  • Ensure Node.js is installed on your local machine.

MongoDB

  • Make sure MongoDB is installed locally into your machine.

Csv-parser

  • Install the csv-parser module, A tool and library for processing CSV data during ETL process

Initialize Node.js Application

Open a terminal, create a directory for the project and also Initialize the Node.js project by running the following command:

npm init --y
Enter fullscreen mode Exit fullscreen mode

Installed required dependencies:

npm install csv-parser mongodb dotenv
Enter fullscreen mode Exit fullscreen mode

Create a data.csv file inside the root directory and add the following text below:

NAME,CAREER
REV ONUCHE, SOFTWARE DEVELOPER,
Damilola Daramola, SOFTWARE DEVELOPER,
David Herbert,TECHNICAL WRITER,
WIsdom Nwokocha,TECHNICAL WRITER

Enter fullscreen mode Exit fullscreen mode

Now, let's break down the ETL processes into phases:

Extract Phase

In the extraction phase, this is the initial phase where raw data is automatically extracted and processed from the data.csv file using the CSV library for easier extraction.

Below are the code snippets with breakdowns to help you understand the full concept of this code:

Setup File System Module(FS):

Initiates the file system module, by importing the fs module. The fs which is known as the file system is a built-in Node.js module that provides file-related operations.

const fs = require('fs');

Enter fullscreen mode Exit fullscreen mode

MongoDB Setup:

Import the MongoClient class from the MongoDB Node.js driver.

const { MongoClient } = require('mongodb');
Enter fullscreen mode Exit fullscreen mode

CSV Parser Setup:

Import the csv-parser module using the code snippet below.

const csvParser = require('csv-parser');
Enter fullscreen mode Exit fullscreen mode

Import the CSV Data Path:

Ensure you created a data.csv file in your project.

const csvDataPath = 'data.csv';
Enter fullscreen mode Exit fullscreen mode

Setup Environment Configuration:

This automatically configure environment variables using the dotenv module.

require('dotenv').config();
Enter fullscreen mode Exit fullscreen mode

Create an extractData function:

Create a function extractData() that will extract data from the data.csv file

const extractData = (csvDataPath) => {
  return new Promise((resolve, reject) => {
    const data = [];

    fs.createReadStream(csvDataPath)
      .pipe(csvParser())
      .on('data', (item) => {
        data.push(item);

      })
      .on('end', () => {
        resolve(data);
      })

      .on('error', (error) => {
        reject(error);
      });
  });
};

Enter fullscreen mode Exit fullscreen mode

Here is what the code explains below:

const extractData = (csvDataPath) => { ... }: This declares a function named extractData using an arrow function. It takes a parameter, csvDataPath, which is the path to the CSV file.

return new Promise((resolve, reject) => { ... }): These returns a new Promise function. it handles asynchronous operations.

const data = []: This create an empty array named data.

fs.createReadStream(csvDataPath): The createReadStream function reads data from the csvDataPath.

.pipe(csvParser()): The pipe method primarily connects the readable stream of the csvDataPath and pass out the output to the csv-parser module for further processing.

.on('data', (item) => { ... }): This event handles each row of data parsed from the CSV file. The data.push(item) function appends the item to an array called data.

.on('end', () => { ... }): This event handler is triggered when the end of the CSV file is reached. It resolves the promise with the populated data array.

.on('error', (error) => { ... }): This event handler , handles an error that might occur during the file reading or parsing process.

Transform Phase

In the transform phase, this is where the extracted data is refined and structured.

// Transform Phase
const transformData = (csvData) => {
  return csvData.map((items) => ({
    name: items.NAME.trim(), // Trim whitespace from the Name valu
    career: items.CAREER.trim(),  // Trim whitespace from the career value
  }));
};
Enter fullscreen mode Exit fullscreen mode

Here is what the code explains below:

const transformData = (csvData) => { : This declares a function transformData that takes a parameter csvData.

return csvData.map((items) => ({}): The map function iterates each item in the all csvData array.

name: items.NAME.trim(): This extracts the Name property from the data.csv file in each item, together with .trim() that trims out any whitespace.

career: item.CAREER.trim(): This also extracts the Career property from the data.csv file each item.

Load Phase:

In the load phase, this is where the extracted data is loaded into the mongoDB database.

Add a .env file:

Create a .env file in the root directory and add your mongoDB url.

MONGODB_URL='YOUR_MONGODB_URL'
Enter fullscreen mode Exit fullscreen mode

Set Up the MongoDB connection URL from environment variables:

Locate the MONGODB_URL using the process.env.YOUR_MONGODB_URL_NAME.

const dbUrl = process.env.MONGODB_URL
Enter fullscreen mode Exit fullscreen mode

Set the MongoDB connection string:

This code uses template literals to call dbUrl instead of rewriting process.env.MONGODB_URL again.

const connectionString = `${dbUrl}`;
Enter fullscreen mode Exit fullscreen mode

Set the database name:

const databaseName = process.env.MONGODB_DATABASE || 'etldb';

Enter fullscreen mode Exit fullscreen mode

This code sets the databaseName variable by checking if the MONGODB_DATABASE environment variable is defined. If defined, it uses that value; otherwise, it set defaults to etldb.

Set the collection name:

This creates a collection name users in the mongodb database.

const collectionName = 'users';
Enter fullscreen mode Exit fullscreen mode

Create a LoadData Function:

Create an async loadData() function with the following parameters: transformedData, databaseName, collectionName, connectionString.

const loadData = async (transformedData, databaseName, collectionName, connectionString) => {
  const client = new MongoClient(connectionString, {
    useNewUrlParser: true,
    useUnifiedTopology: true,
  });

  try {

    await client.connect();
    console.log('Connected to MongoDB successfully.');

    const db = client.db(databaseName);
    const collection = db.collection(collectionName);

    const response = await collection.insertMany(transformedData);
    // console.log("RESPONSE", response);
    console.log(`${response.insertedCount} CSV Data Successfully loaded to MongoDB.`);

  } catch (error) {
    console.error('Error Connecting MongoDB', error);
  }

  await client.close();
};

Enter fullscreen mode Exit fullscreen mode

Call the extractData() to Initiates the ETL process:

extractData(csvDataPath)
  .then((rawData) => {
    const transformedData = transformData(rawData);
    return loadDataToMongoDB(transformedData, databaseName, collectionName, connectionString);
  })
  .catch((error) => {
    console.error('Failed to extract or load data into MongoDB:', error);
  });
Enter fullscreen mode Exit fullscreen mode

This code initiates the ETL process by extracting it, transforming it and also loading it to the MongoDB database:

Entire Code:

const fs = require('fs');
const csvParser = require('csv-parser');
const { MongoClient } = require('mongodb');
const csvDataPath = 'data.csv'; 

require('dotenv').config();

// Extract Phase

const extractData = (csvDataPath) => {
  return new Promise((resolve, reject) => {
    const data = [];

    fs.createReadStream(csvDataPath)
      .pipe(csvParser())
      .on('data', (item) => {
        data.push(item);

      })
      .on('end', () => {
        resolve(data);
      })

      .on('error', (error) => {
        reject(error);
      });
  });
};


// Transform Phase
const transformData = (csvData) => {
  return csvData.map((items) => ({
    name: items.NAME.trim(), // Trim whitespace from the Name value
    career: items.CAREER.trim(),  // Trim whitespace from the career value
  }));
};


const dbUrl = process.env.MONGODB_URL;

const connectionString = `${dbUrl}`;
const databaseName = process.env.MONGODB_DATABASE || 'etldb'; 
const collectionName = 'users'; 

// Load Phase
const loadData = async (transformedData, databaseName, collectionName, connectionString) => {
  const client = new MongoClient(connectionString, {
    useNewUrlParser: true,
    useUnifiedTopology: true,
  });

  try {

    await client.connect();
    console.log('Connected to MongoDB successfully.');

    const db = client.db(databaseName);
    const collection = db.collection(collectionName);

    const response = await collection.insertMany(transformedData);
    // console.log("RESPONSE", response);
    console.log(`${response.insertedCount} CSV Data Successfully loaded to MongoDB.`);

  } catch (error) {
    console.error('Error Connecting MongoDB', error);
  }

  await client.close();
};

extractData(csvDataPath)
  .then((rawData) => {
    const transformedData = transformData(rawData);
    return loadData(transformedData, databaseName, collectionName, connectionString);
  })
  .catch((error) => {
    console.error('Failed to extract or load data into MongoDB:', error);
  });

Enter fullscreen mode Exit fullscreen mode

Run the Server:

Navigate to the terminal and start the server using this command: Make sure you have nodemon installed in your local machine.

nodemon etl.js

Your server is now running locally..

Response:

Image description

Now it gives a response with the amount of data's added into the MongoDB database.

Check your MongoDB:

Check your mongoDB to automatically see the data loaded successfully to the MongoDB.

Image description

Conclusion:

ETL process has been a building block for companies and enterprise, and it is the best way for companies to transform those data into more readable streams or format.

Furthermore, it general use case is mostly in the data warehousing- where data from various sources are been combined into a database for business analysis and customers insights.

As we have come to the end of this tutorial what other pattern do you use to automate ETL processes in Nodejs? Let's us know in the comment section.

Top comments (1)

Collapse
 
sourabh_gupta_4f535b73929 profile image
Sourabh Gupta

Great tutorial on using Node.js for ETL automation! Another approach to consider is using managed data pipeline solutions, especially for scaling ETL processes across various data sources. For example, Estuary offers real-time data integration and pipeline automation that eliminates the need for manual scripting, which can be beneficial when you’re dealing with diverse data sets and require low-latency transformations. It might complement the approach here if you're looking to streamline and simplify the ETL setup. Thanks for the tutorial!