DEV Community

Cover image for How to zip Large Set of Data from MongoDB and upload to S3

How to zip Large Set of Data from MongoDB and upload to S3

Starting this new year, the project we were working on received a new requirement: allowing our users to download 1 month's worth of data as a CSV file. The minimum amount of data for one month was 30 to 50 lakhs or 3 to 5 million transaction records.

Currently, our users were only able to download 1 day's worth of data, which amounted to about 30k to 40k thousands records. If they attempted to download more than that, they would encounter the same issue.

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory

At first, we thought it was simply a memory usage limit issue. So, we attempted to solve it by using below script in our package.json.

"scripts": {
    "start": "cross-env NODE_OPTIONS=--max_old_space_size=4096 webpack"
}
Enter fullscreen mode Exit fullscreen mode

However, this didn't resolve the problem. Therefore, we decided to try streaming the data in our file, compressing it, and then sending it to the user.

const zlib require('zlib');

exports.downloadResource = (res, fileName, fields, data)
const json2csv = new Parser({ fields });
const csv = json2csv.parse(data);


const gzip = zlib.createGzip();

res.header("Content-Type", "text/csv");
res.header("Content-Encoding", "gzip");
=> {
res.header("Content-Disposition", `attachment; filename=${fileName}.gz`);

const buffer = [];


gzip.on('data', (chunk) => {
buffer.push(chunk);
});

gzip.on('end', () => {
for (const chunk of buffer) {
}
});
res.write(chunk);
res.end();

gzip.write(csv);

gzip.end();
};
Enter fullscreen mode Exit fullscreen mode

Here, we used zlib [https://www.npmjs.com/package/zlib] to compress our files. Basically, what we tried to do was push the data to our buffer array as long as the stream of data was available. We were able to stream up to 1 million rows of CSV data. However, we encountered the same error again:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory

This happened because storing all the data in memory at once can consume a significant amount of memory, which is what we were doing here.

If you have any suggestions, please comment below, as I couldn't figure out a solution at that time.

Since we couldn't solve the problem that way, we considered an alternative solution. It's simple: we will write the files to our server using the fs module, zip them, upload them to S3, and then send the S3 link via email to our client.

In this way we would not get any heap out issue, Here's a small flow for our solution:

Image description

Step 1:

Create a file with the fs module. Essentially, we will set up our aggregation pipeline through which our data will flow.

    const pipeline = [
      { $match: condition },
      { $sort: { modified_at: -1 } },
      {
        // Your Aggregation pipeline
      },
    ];
Enter fullscreen mode Exit fullscreen mode

We will write that data into the temporary file. However, we encountered a problem: we cannot write all the data into a single CSV file. It is known that Excel sheets can display a maximum of 1 million or 10 lakh rows, so we could only add up to 1,000,000 rows of data.

Therefore, we came up with the idea to write 1 million rows in each file. If there's more data, we will write it to a new file, but each file will have a maximum of 1 million rows.

      let batchIndex = 1;
      let currentBatch = [];
      let length = 0;
      const Rows = 1000000;

      const createCsvFile = (batchIndex, headers) => {
        const csvFilePath = path.join(
          outputDirectory,
          `some_transaction_batch_${batchIndex}.csv`
        );
        return csvFilePath;
      };

      const writeCsvBatch = async (batch, index) => {
        const filePath = createCsvFile(index);
        const csvBody =
          filteredHeaders?.map((header) => header.label).join(",") +
          "\n" +
          batch.join("\n");

        await fsPromise.writeFile(filePath, csvBody);
      };

Enter fullscreen mode Exit fullscreen mode

It might be hard to understand at first glance, so let me explain what's going on here.

Firstly, const outputDirectory = "Operation" + randomUUID(); generates a unique output directory name using the randomUUID() method from the Crypto interface.
This method generates a v4 UUID (Universally Unique Identifier) using a cryptographically secure random number generator.

Next, we have the createCsvFile function. This function takes an index for a batch and possibly a set of headers. It constructs a file path for a CSV file specific to that batch index within the given output directory. However, it doesn't actually create the CSV file itself; it only returns the path where it would be created.

Then, we have the writeCsvBatch(), which utilizes the createCsvFile(). It writes data into these files, with each file capable of storing up to a maximum of 1,000,000 rows. If the data exceeds this limit, it will write to a new file. This process is handled within this function.

      for await (const doc of data) {
        const data = writeCSVRow(doc, filteredHeaders);
        currentBatch.push(data);
        length++;
        if (currentBatch.length >= Size) {
          await writeCsvBatch(currentBatch, batchIndex++);
          currentBatch = [];
        }
      }
Enter fullscreen mode Exit fullscreen mode

What if the data is less than 1 million of rows or just little more than a million but not 2 million or no data is there in the DB, for such cases we added some small validation

     if (length === 0) {
        return createErrorResponse(
          STATUS_CODES.forbidden,
          "No data to download"
        );
      }

      // Write the remaining records to the last CSV file
      if (currentBatch.length > 0) {
        await writeCsvBatch(currentBatch, batchIndex);
      }
Enter fullscreen mode Exit fullscreen mode

Step 2:

Now that we've obtained the files and written our data to them, we need to zip them using the gzip archiver [https://www.npmjs.com/package/archiver].

The reason for this is that the files alone would be very large. For example, a single CSV file with 1 million rows had a file size of over 500 MB. This size could increase depending on the type of data being written. When we tested it with 3 million rows, the file size was almost 2 GB. Given that the server cannot handle such large files, we need to compress them. After zipping, our file size was reduced to 15 MB.

const archive = archiver("zip", { zlib: { level: -1 } });

const outputZipStream = fs.createWriteStream(outputZipPath);

archive.pipe(outputZipStream);

for (let i = 1; i <= batchIndex; i++) {
  const csvFilePath = path.join(
    outputDirectory,
    `transaction_batch_${i}.csv`
  );

  archive.append(fs.createReadStream(csvFilePath), {
    name: `transaction_batch_${i}.csv`,
  });
}

const finalizePromise = new Promise((resolve, reject) => {
  archive.finalize();
  outputZipStream.on("close", () => {
    resolve();
  });
  archive.on("error", (err) => {
    reject(err);
  });
});

await finalizePromise;

Enter fullscreen mode Exit fullscreen mode

archive first we initialising this archiver object for creating a zip file.

outputZipStream This line creates a writable stream to the desired output zip file specified by outputZipPath, after creation of all the csv files and zipping them into a we will create a zip file in our server

const outputZipPath = new Date().toISOString() + ".zip";
we are using toISOString() so that every generated zip file will be unique

archive.pipe This line pipes the output of the archiver to the output zip stream, which means that data produced by the archiver will be written to the output zip file.

for loop iterates over the batch indices, reading each corresponding CSV file from the file system and appending it to the zip archive with a unique name.

const finalizePromise = new Promise((resolve, reject) => { ... })
This block wraps the finalization process of the zip archive creation in a promise. It listens for the "close" event of the output zip stream, indicating that the archiving process has finished writing to the zip file. If any errors occur during the archiving process, they are caught and rejected.

await finalizePromise: This line waits for the promise to resolve, indicating that the zip archive has been successfully created and finalised.

all these will create a zip archive containing all generated CSV files are zipped into a single archive file

step 3:

upload to s3 bucket, for this step we have a simple function that we are gonna use

const AWS = require("aws-sdk");
const fs = require("fs");

const credentials = {
  accessKey: process.env.AWS_ACCESS_KEY,
  secret: process.env.AWS_SECRET_ACCESS_KEY,
  bucketName: process.env.DOCS_BUCKET_NAME,
};

const s3 = new AWS.S3({
  accessKeyId: credentials.accessKey,
  secretAccessKey: credentials.secret,
});

const uploadFileYt = async (fileName) => {
  const fileContent = fs.readFileSync(fileName);


  const params = {
    Bucket: process.env.DOCS_BUCKET_NAME, 
    Key: fileName,
    Body: fileContent,
  };

  const data = await s3.upload(params).promise();
  return data?.Location;
};

module.exports = {
  uploadFileYt,
};
Enter fullscreen mode Exit fullscreen mode

The function reads file content, constructs upload parameters, calls the S3 upload method, and returns the URL of the uploaded file.

step 4:

Send the link client, the link we get from our s3 bucket that we can send it to our client via a mail or you can directly let the user have that zip file when he clicks on download but as per my requirements we had to send it our users via mail...

const aws = require("aws-sdk");
const nodemailer = require("nodemailer");

exports.sendEmail = (
  toAddresses = ["example@example.com"],
  subject = "Example subject",
  html = `Example body`,
  ccAddresses = [],
  replyToAddress = ["example@example.com"]
) => {
   // Your logic for sending mail
  }
};

Enter fullscreen mode Exit fullscreen mode

step: 5

Remove them after completion, with this i meant the folder and zip that we will create in our server, so after everything is set and done then we are finally removing them.

finally {
    fs.rm(outputDirectory, { recursive: true, force: true }, (err) => {
      if (err) {
        throw err;
      }
    });
    if (fs.existsSync(outputZipPath)) {
      // File exists, so unlink it
      await fs.promises.unlink(outputZipPath);
      console.log(`${outputZipPath} successfully removed.`);
    }
    if (client) client.close();
  }

Enter fullscreen mode Exit fullscreen mode

fs.rm(outputDirectory, { recursive: true, force: true }): This line removes the outputDirectory and all its contents recursively with force, meaning it deletes files and directories forcefully without prompting for confirmation.

await fs.promises.unlink(outputZipPath): This line deletes the outputZipPath file asynchronously using promises.
if (client) client.close(): This line closes the MongoDB client if it exists.

For us we are directly taking data from our MongoDb Database's collection but for you it might be different you can do it with mongoose too but we found this solution much faster and it fulfilled our need,

incase anything fails it will delete those temporary files because finally will execute no matter what...

What if multiple requests hit at the same time, if you are interested then you can checkout this blog of mine where have implemented the solution to above problem.

Thank you for reading and Follow me on LinkedIn if you’re interested in Web Development. 😊

Top comments (2)

Collapse
 
fridaycandours profile image
Friday candour

Batching to disk save memory usage. Great.

Collapse
 
roshan_ican profile image
〽️ 𝙍𝙀𝙨𝙝𝙖𝙣

Yeah that's indeed true...
Thanks