In SIP3 we deal with hundreds of thousands SIP messages per second. Each message has to be aggregated into a SIP session and also saved to MongoDB.
In this blog post I won't explain a rational behind choosing MongoDB but will concentrate on how to work with this database (and any other databases) as efficiently as possible.
✍️ Writing to MongoDB
If you want to increase write throughput while working with your database follow these three rules:
- Use a connection pool to the database.
- Use bulk writes.
- Introduce data partitioning.
You can read about the first two in official MongoDB documentation here and here.
However, I would love to say a few words about partitioning. As you can read here, partitioning is a set of rules/practices you can use to divide your data into distinct independent parts. Often case you may want to have a few levels of partitioning as we do it in SIP3:
Level 1: Logical - Splits SIP traffic into calls and registrations. It let's us to configure separately for how long to keep each of the data.
Level 2: Vertical - Splits SIP session into CDRs (Call Details Records) and SIP messages content. It helps us to save on data indexing.
Level 3: Horizontal - Splits our data by time. This split literally kills two birds with one stone: helps to manage historical depth (by easily dropping old collections) and to save on data indexing.
An implementation of data partitioning may vary and fully depends on the application's specific. However, on a code level writing data to partitions is never a problem. That's how we do it in the SIP3 Salto code:
open fun writeToDatabase(prefix: String, packet: Packet, message: SIPMessage) {
val collection = prefix + "_raw_" + timeSuffix.format(packet.timestamp)
val operation = JsonObject().apply {
put("document", JsonObject().apply {
val timestamp = packet.timestamp
put("created_at", timestamp.time)
put("nanos", timestamp.nanos)
val src = packet.srcAddr
put("src_addr", src.addr)
put("src_port", src.port)
src.host?.let { put("src_host", it) }
val dst = packet.dstAddr
put("dst_addr", dst.addr)
put("dst_port", dst.port)
dst.host?.let { put("dst_host", it) }
put("call_id", message.callId())
put("raw_data", String(packet.payload, Charsets.ISO_8859_1))
})
}
vertx.eventBus().localRequest<Any>(RoutesCE.mongo_bulk_writer, Pair(collection, operation))
}
Now, when we know how to write data let's check how to read it.
📚 Reading from MongoDB
No doubts, reading data from partitions is a tough cookie 🍪.
In our projects we prefer to work with MongoDB Java Driver explicitly avoiding all existing ORM implementations.
MongoDB Java Driver introduces MongoCursor<Document>
which is basically a very efficient iterator because it pulls data from the database in batches.
Iterator is a very old pattern but with modern Kotlin and method extensions it indeed got the second wind. In the next blog post I will show how we introduced Iterator.merge
and Iterator.map
Kotlin extension functions (do you like this teaser 😈).
But let's get back to reading data partitioned by time from multiple MongoDB collections. Here is our simple recipe:
1 - Retrieve and cache a list of collections by prefix:
@Cacheable(value = ["listCollectionNames"], key = "#prefix")
open fun listCollectionNames(prefix: String): List<String> {
return client.getDatabase(db).listCollectionNames().asSequence()
.filter { name -> name.startsWith(prefix) }
.sorted()
.toList()
}
2 - Introduce find()
method (optionally make it an extension method of existing MongoClient API):
open fun find(prefix: String, timeRange: Pair<Long, Long>,
filter: Bson, sort: Bson? = null, limit: Int? = null): Iterator<Document> {
// Retrieve list of collections by prefix and time range
val collectionNames = listCollectionNames(prefix).asSequence()
.filter { name -> "${prefix}_${suffix.format(timeRange.first)}" <= name }
.filter { name -> "${prefix}_${suffix.format(timeRange.second)}" >= name }
.iterator()
// Create `Iterator<Document>` which will imitate working with a simple `MongoCursor<Document>`
return object : Iterator<Document> {
var cursor: MongoCursor<Document>? = null
override fun hasNext(): Boolean {
if (cursor?.hasNext() == true) return true
if (collectionNames.hasNext()) {
cursor = client.getDatabase(db)
.getCollection(collectionNames.next())
.run {
filter?.let { find(filter) } ?: find()
}
.apply {
maxTime(maxExecutionTime, TimeUnit.MILLISECONDS)
batchSize(limit ?: batchSize)
sort?.let { sort(it) }
}
.iterator()
return hasNext()
}
return false
}
override fun next(): Document {
if (!hasNext()) throw NoSuchElementException()
return cursor!!.next()
}
}
}
3 - Work with your documents as you are used to do with MongoCursor<Document>
:
fun findInRawBySessionRequest(req: SessionRequest): Iterator<Document> {
val filters = mutableListOf<Bson>().apply {
add(gte("created_at", req.createdAt))
add(lte("created_at", req.terminatedAt))
add(`in`("call_id", req.callId))
}
return mongoClient.find("sip_call_raw", Pair(req.createdAt, req.terminatedAt), and(filters))
}
4 - Profit 👌
As you can see now, we managed to work with partitioned data in the way we usually work with just a single collection.
Please, leave us a comment if you liked the idea and share with us how you implement data partitioning in your projects!
👨💻 Happy coding,
Your SIP3 team.
Top comments (0)