Once I was tasked with improving the database and general app operations of backend services which are using MongoDB as their main database.
These services were part of huge infrastructure where millions of messages coming through the queues and needed to be processed based on the message actions. That means tons of DB ops each second and other additional checks while processing.
This post cross-published with OnePublish
Real Case Scenario
The processing layer of service was using pymongo
to interact with MongoDB and service itself was running in synchronous environment. Even the database operations was handled in bulk still performance was not capable of handling incoming data.
Synchronous code was making things even worse. The code execution waits result from current operation to move forward. That's a serious bottleneck in scalable systems.
This was causing queue overflows and potential data loss every time.
Asynchronous Environment
The solution I implemented was combination of:
- Motor
- Asyncio
- Uvloop
Let's quickly go through the definitions of these items.
PyMongo is the official MongoDB driver for Python, providing a simple and intuitive way to interact with MongoDB databases. It's synchronous, meaning each database operation blocks the execution of your program until it completes, which can be a bottleneck in I/O-bound tasks.
Motor is the asynchronous driver for MongoDB, built on top of PyMongo and designed to take advantage of Python's asyncio
library. Motor allows you to perform non-blocking database operations, making it suitable for high-performance applications that require concurrency.
To illustrate the performance differences, I prepared a stress test using two scripts: one using Motor (asynchronous) and the other using PyMongo (synchronous). Both scripts performed the same task of reading and writing documents to MongoDB in batches.
Both scripts reading 300k documents from source collection and migrating them to new target collection.
Asynchronous Script (Motor)
import logging
import asyncio
import time
from bson import ObjectId
from motor.motor_asyncio import AsyncIOMotorClient
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# MongoDB setup
MONGO_URI = 'mongodb://root:root@localhost:27019'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'
client = AsyncIOMotorClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db["new_collection"]
async def fetch_products(batch_size, last_id=None):
query = {'_id': {'$gt': last_id}} if last_id else {}
cursor = collection.find(query).sort('_id').limit(batch_size)
products = await cursor.to_list(length=batch_size)
return products
async def bulk_write_to_mongo(products):
for product in products:
product['_id'] = ObjectId() # Generate a new ObjectId for each product
try:
result = await target_collection.insert_many(products, ordered=False)
logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
except Exception as e:
logger.error(f'Error inserting products into MongoDB: {e}')
async def process_batches(batch_size, concurrency_limit):
tasks = []
last_id = None
while True:
products = await fetch_products(batch_size, last_id)
if not products:
break
last_id = products[-1]['_id']
tasks.append(bulk_write_to_mongo(products))
if len(tasks) >= concurrency_limit:
await asyncio.gather(*tasks)
tasks = []
# Process remaining tasks if any
if tasks:
await asyncio.gather(*tasks)
async def main():
batch_size = 1000
concurrency_limit = 10
start_time = time.time()
await process_batches(batch_size, concurrency_limit)
end_time = time.time()
logger.info(f'Total time: {end_time - start_time:.2f} seconds.')
if __name__ == '__main__':
asyncio.run(main())
Synchronous Script (PyMongo)
import logging
import time
from bson import ObjectId
from pymongo import MongoClient
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# MongoDB setup
MONGO_URI = 'mongodb://root:root@localhost:27019'
DB_NAME = 'products'
COLLECTION_NAME = 'gmc_products'
TARGET_COLLECTION_NAME = 'new_collection'
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
target_collection = db[TARGET_COLLECTION_NAME]
def fetch_products(batch_size, last_id=None):
query = {'_id': {'$gt': last_id}} if last_id else {}
cursor = collection.find(query).sort('_id').limit(batch_size)
products = list(cursor)
return products
def bulk_write_to_mongo(products):
for product in products:
product['_id'] = ObjectId() # Generate a new ObjectId for each product
try:
result = target_collection.insert_many(products, ordered=False)
logger.info(f'Inserted {len(result.inserted_ids)} products into MongoDB.')
except Exception as e:
logger.error(f'Error inserting products into MongoDB: {e}')
def process_batches(batch_size):
last_id = None
while True:
products = fetch_products(batch_size, last_id)
if not products:
break
last_id = products[-1]['_id']
bulk_write_to_mongo(products)
def main():
batch_size = 1000
start_time = time.time()
process_batches(batch_size)
end_time = time.time()
logger.info(f'Total time: {end_time - start_time:.2f} seconds.')
if __name__ == '__main__':
main()
Results and Analysis
Execution Time of Migrating 300k documents:
- Asynchronous script: 17.15 seconds
- Synchronous script: 23.26 seconds
The asynchronous script completed the task 6.11 seconds faster than the synchronous script. While this might not seem like a significant difference for a single run, it becomes more pronounced in high-load scenarios or when processing large datasets continuously.
Benefits of Using Motor and Asynchronous Environment
- Improved Throughput: Asynchronous operations can handle more tasks concurrently, increasing overall throughput. This is especially beneficial in applications with high I/O operations, such as web servers handling multiple database queries simultaneously.
- Non-Blocking I/O: Asynchronous operations do not block the main thread, allowing other tasks to continue running. This results in better CPU utilization and smoother application performance, particularly under load.
- Scalability: Asynchronous code scales better with the number of concurrent operations. For example, a web application using Motor can handle more simultaneous requests compared to one using PyMongo.
- Resource Efficiency: Asynchronous operations can lead to more efficient use of system resources. For instance, the event loop in asyncio allows the application to switch between tasks, reducing idle times and improving overall efficiency.
Source Code and Video Explanation
You can find the source code on Github repository below:
PylotStuff / motor-asyncio-performance
Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop
motor-asyncio-performance
Boost DB ops performane with AsycioMotorClient & Asyncio & Uvloop
Getting Started
- Run Mongodb in Docker
docker-compose up -d
- Set up Virtual Environment & Install Requirements
python3 -m venv venv
source venv/bin/activate
pip3 install -r requirements.txt
- Populate product data as a source collection
python3 populate.py
Now, you should have 1.8M document in the source collection. What's next?
Benchmark Write Operations
python3 db-migrate-slow.py
python3 db-migrate-fast.py
Benchmark Read Operations (Existence Cheking)
With field existence vs assing default value
python3 db-bad-document-structure.py
python3 db-good-document-structure.py
Youtube - Boost MongoDB Performance: Motor Client vs PyMongo - Which is Faster?
Conclusion
The choice between Motor and PyMongo depends on the specific needs of your application. For applications that require high concurrency and efficient I/O handling, Motor and the asynchronous approach offer significant advantages. However, for simpler applications or scripts where ease of implementation is a priority, PyMongo's synchronous approach might be sufficient.
By leveraging asynchronous operations with Motor, you can build more scalable and performant applications, making it a worthwhile consideration for modern web development.
Top comments (0)