This blog post demonstrates how to use Change Streams
in MongoDB
with the official Go driver. I will be using Azure Cosmos DB
since it has wire protocol support for the MongoDB API (server version 3.6) which includes Change Streams as well.
Like some of my other blogs, I am going to split it into two parts, just to make it easier to digest the material. Part 1 (this post) will provide an introduction, overview of the Change streams processor service and walk you through how to run the application so that you can witness Changes streams in action.
In part 2, I will go over the code and explain how things work behind the scenes.
The code is available on GitHub
Over the course of these blogs, with the help of a practical application, you will learn about:
-
Change Streams
and related concepts - Understand the application and how it uses the Change Streams APIs
- Setup Azure Cosmos DB and work through an end to end exmaple
- Get to know some of the corner cases/limitations/constraints you should be aware of
What are Change streams?
MongoDB change streams feature provides applications instant access to data changes (create, update, deletes). They can react to these changes by subscribing to them at a global (deployment), database or collection scope. This can be used to for a variety of solutions ranging from traditional ETL jobs to CQRS
(Command and Query Responsibility Segregation) based architecture, real-time stream processing, cache invalidation and much more!
This abstracts developers from the complexity of using the MongoDB
oplog
How are they useful?
One of the many ways you can leverage Change streams is to build a custom solution which can listen to MongoDB database change events and push them to a scalable data ingestion platform such as Azure Event Hubs for Kafka. You can then build traditional Kafka client applications (Consumer, Streams API) or leverage Serverless processing with Azure Functions (smiliar to this one)
I might blog about other scenarios and their respective solutions as follow-up posts.
At the time of writing, Azure Cosmos DB supports change stream integration with Azure Functions for its
SQL API
Quick overview of the Change Processor Service
The application is a change processor service that uses the Change stream feature. It's a Go
application that uses the official MongoDB Go driver but the concepts should be applicable to any other language whose native driver supports Change Streams.
It uses the Watch
API to subscribe to the change events feed in a specific Collection
so that it is notified of documents being created, updated and deleted. It extracts the relevant information from the change event payload i.e. the document which was affected and saves it locally to a file. It also demonstrates how to use Resume Tokens
to save processing progress.
Although this application simply saves the change events to a local file (for the sake of simplicity), as mentioned before, you can obviously do much more with this!
Resume tokens are really important...
A Change stream is a potentially infinite series of records. Change stream Resume tokens allow you to "continue processing from where you left off" - think of its as checkpointing
It is very similar to the
offset
concept in systems such as Apache Kafka.
If the processing application stops (or crashes), it might not be desirable to miss the database changes which happened during this period. You can use the token (details in the next section) to ensure that the application starts off from where it left and is able to detect change events during the time period for which it was not operating. In addition to this, if you have a history/changelog of resume tokens, you have the flexibility of choosing from any of these (its like walking back in time) and (re)process data from that point in time.
There are a few things you should know
Please note the following cases where the Change streams behaviour differs from standard MongoDB
:
- You need to provide specific options to the Change streams process
- Delete operation is currently not supported - although there is a workaround for that
- (as of now) You can only watch Collections (not a database or deployment)
Let's try out the application to understand Change streams better!
Setup Azure Cosmos DB
Pre-requisites
- A Microsoft Azure account - go ahead and sign up for a free one!
-
Azure CLI
orAzure Cloud Shell
- you can either choose to install the Azure CLI if you don't have it already (should be quick!) or just use the Azure Cloud Shell from your browser. Go
installed
You need to create an Azure Cosmos DB account with the MongoDB API support enabled along with a Database and Collection. Follow these steps to set up Azure Cosmos DB using the Azure portal:
- Create an Azure Cosmos DB account
- Add a database and collection and get the connection string
Learn more about how to Work with databases, containers, and items in Azure Cosmos DB
If you want to use the Azure CLI or Cloud Shell, here is the sequence of commands which you need to execute:
Create an Azure Cosmos DB account (notice --kind MongoDB
)
az cosmosdb create --resource-group <RESOURCE_GROUP> --name <COSMOS_DB_NAME> --kind MongoDB
az cosmosdb mongodb database create --account-name <COSMOS_DB_ACCOUN> --name <COSMOS_DB_NAME> --resource-group <RESOURCE_GROUP>
Finally, create a collection within the database
az cosmosdb mongo collection create --account-name <COSMOS_DB_ACCOUNT> --database-name <COSMOS_DB_NAME> --name <COSMOS_COLLECTION_NAME> --resource-group-name <RESOURCE_GROUP> --shard <SHARDING_KEY_PATH>
Get the connection string and save it. You will be using it later
az cosmosdb list-connection-strings --name <COSMOS_DB_ACCOUNT> --resource-group <RESOURCE_GROUP> -o tsv --query connectionStrings[0].connectionString
Try out the Change processor service
All you need to do is clone the GitHub repo, build and run the service
Start the service
git clone https://github.com/abhirockzz/mongodb-changestreams-processor
go build -o change-processor
Before you run the app, export environment variables:
export MONGODB_URI=[enter the Azure Cosmos DB connection string]
export MONGODB_DATABASE=[name of the Azure Cosmos DB database you created]
export MONGODB_COLLECTION=[name of the Azure Cosmos DB collection you created]
export WITH_RESUME=[use false if you don't want to use resume tokens. this is true by default]
e.g.
export MONGODB_URI="mongodb://ny-mongodb:<primary access key for cosmosdb>@my-mongodb.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@my-mongodb@"
export MONGODB_DATABASE=test_db
export MONGODB_COLLECTION=test_collection
export WITH_RESUME=false
please ensure you have
""
(double quotes) aroundMONGODB_URI
Start the program
./change-processor
The program up and running with the following output: started change stream...
Create a few records
There are many ways of connecting to Azure Cosmos DB to perform database operation. You can obviously do it programmatically using your favorite language support for MongoDB, but for the purpose of testing this service, try the following options:
- The Azure Cosmos DB data explorer (web interface)
- Robo 3T with Azure Cosmos DB's API for MongoDB
- MongoDB Compass to connect to Azure Cosmos DB's API for MongoDB
- Connect to an Azure Cosmos account using Studio 3T
Once you create a record, you should see the following output in the program terminal:
saved change event to file: 'change_events'
You should see the change_events
file and it will have the document which was created. For e.g. the following record created in Azure Cosmos DB....
{
"name" : "foo55",
"email" : "foo55@bar.com",
"status" : "offline"
}
.... will be saved with an _id
(MongoDB Object ID ) in change_events
file:
{
"_id": {
"$oid": "5e8eec7712f1891a100bd449"
},
"name": "foo55",
"email": "foo55@bar.com",
"status": "online"
}
This is just a part of the change event - the complete payload looks like this:
The top-level
_id
field of the change stream event document act as the resume token (more on this below)
{
"_id": {
"_data": {
"$binary": {
"base64": "W3sidG9rZW4iOiJcIjY5XCIiLCJyYW5nZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiJ9fV0=",
"subType": "00"
}
}
},
"fullDocument": {
"_id": {
"$oid": "5e8eec7712f1891a100bd449"
},
"name": "foo56",
"email": "foo56@bar.com",
"status": "online"
},
"ns": {
"db": "test_db1",
"coll": "test_coll1"
},
"documentKey": {
"name": "foo56",
"_id": {
"$oid": "5e8eec7712f1891a100bd449"
}
}
}
Update records
For e.g. if I were to change the above record from "status":"online"
to "status":"offline"
, this will be recorded in the change_events
along with the updated document
Checkpointing with Resume token
To try this out, stop your application (press ctrl+c
). You should see the following output before the program terminates
^Cexit signalled. cancelling context
saved token to file
Check for a file named
token
. It's contents don't matter - just understand that it is there and it contains a resume token in binary form.
Now, create a few records while the change stream processor is not running. Once you have done that, restart the processor and you should see that the records you added were detected, processed and saved in the change_events
file.
Delete records
Delete
operation is not yet supported. One of the ways to handle deletes as suggested here is to treat is as an update
operation, use an additional attribute (e.g. deleted
and set it to true
) and set a TTL
(time-to-live) for the specific document - this way, you can get the change event as a part of the change stream
If I were to try this with the sample record, there should be an entry in change_events
file as such:
{
"_id": {
"$oid": "5e8eec7712f1891a100bd449"
},
"name": "foo55",
"email": "foo55@bar.com",
"status": "offline",
"deleted": true
}
Notice
"deleted": true
Alright! That's all for now. I will cover some of the implementation details in a follow up blog post (Part 2)
Top comments (0)