DEV Community

Oleg Agafonov for SIP3

Posted on • Updated on


MongoDB best practices from the SIP3 team.

In SIP3 we deal with hundreds of thousands SIP messages per second. Each message has to be aggregated into a SIP session and also saved to MongoDB.

In this blog post I won't explain a rational behind choosing MongoDB but will concentrate on how to work with this database (and any other databases) as efficiently as possible.

✍️ Writing to MongoDB

If you want to increase write throughput while working with your database follow these three rules:

  • Use a connection pool to the database.
  • Use bulk writes.
  • Introduce data partitioning.

You can read about the first two in official MongoDB documentation here and here.

However, I would love to say a few words about partitioning. As you can read here, partitioning is a set of rules/practices you can use to divide your data into distinct independent parts. Often case you may want to have a few levels of partitioning as we do it in SIP3:
SIP3 partitioning levels

Level 1: Logical - Splits SIP traffic into calls and registrations. It let's us to configure separately for how long to keep each of the data.

Level 2: Vertical - Splits SIP session into CDRs (Call Details Records) and SIP messages content. It helps us to save on data indexing.

Level 3: Horizontal - Splits our data by time. This split literally kills two birds with one stone: helps to manage historical depth (by easily dropping old collections) and to save on data indexing.

An implementation of data partitioning may vary and fully depends on the application's specific. However, on a code level writing data to partitions is never a problem. That's how we do it in the SIP3 Salto code:

open fun writeToDatabase(prefix: String, packet: Packet, message: SIPMessage) {
    val collection = prefix + "_raw_" + timeSuffix.format(packet.timestamp)

    val operation = JsonObject().apply {
        put("document", JsonObject().apply {
            val timestamp = packet.timestamp
            put("created_at", timestamp.time)
            put("nanos", timestamp.nanos)

            val src = packet.srcAddr
            put("src_addr", src.addr)
            put("src_port", src.port)
   { put("src_host", it) }

            val dst = packet.dstAddr
            put("dst_addr", dst.addr)
            put("dst_port", dst.port)
   { put("dst_host", it) }

            put("call_id", message.callId())
            put("raw_data", String(packet.payload, Charsets.ISO_8859_1))

    vertx.eventBus().localRequest<Any>(RoutesCE.mongo_bulk_writer, Pair(collection, operation))
Enter fullscreen mode Exit fullscreen mode

Now, when we know how to write data let's check how to read it.

📚 Reading from MongoDB

No doubts, reading data from partitions is a tough cookie 🍪.

In our projects we prefer to work with MongoDB Java Driver explicitly avoiding all existing ORM implementations.

MongoDB Java Driver introduces MongoCursor<Document> which is basically a very efficient iterator because it pulls data from the database in batches.

Iterator is a very old pattern but with modern Kotlin and method extensions it indeed got the second wind. In the next blog post I will show how we introduced Iterator.merge and Kotlin extension functions (do you like this teaser 😈).

But let's get back to reading data partitioned by time from multiple MongoDB collections. Here is our simple recipe:

1 - Retrieve and cache a list of collections by prefix:

@Cacheable(value = ["listCollectionNames"], key = "#prefix")
open fun listCollectionNames(prefix: String): List<String> {
    return client.getDatabase(db).listCollectionNames().asSequence()
            .filter { name -> name.startsWith(prefix) }

Enter fullscreen mode Exit fullscreen mode

2 - Introduce find() method (optionally make it an extension method of existing MongoClient API):

open fun find(prefix: String, timeRange: Pair<Long, Long>,
              filter: Bson, sort: Bson? = null, limit: Int? = null): Iterator<Document> {
    // Retrieve list of collections by prefix and time range
    val collectionNames = listCollectionNames(prefix).asSequence()
            .filter { name -> "${prefix}_${suffix.format(timeRange.first)}" <= name }
            .filter { name -> "${prefix}_${suffix.format(timeRange.second)}" >= name }

    // Create `Iterator<Document>` which will imitate working with a simple `MongoCursor<Document>` 
    return object : Iterator<Document> {

        var cursor: MongoCursor<Document>? = null

        override fun hasNext(): Boolean {
            if (cursor?.hasNext() == true) return true

            if (collectionNames.hasNext()) {
                cursor = client.getDatabase(db)
                        .run {
                            filter?.let { find(filter) } ?: find()
                        .apply {
                            maxTime(maxExecutionTime, TimeUnit.MILLISECONDS)
                            batchSize(limit ?: batchSize)
                            sort?.let { sort(it) }

                return hasNext()

            return false

        override fun next(): Document {
            if (!hasNext()) throw NoSuchElementException()
            return cursor!!.next()
Enter fullscreen mode Exit fullscreen mode

3 - Work with your documents as you are used to do with MongoCursor<Document>:

fun findInRawBySessionRequest(req: SessionRequest): Iterator<Document> {
    val filters = mutableListOf<Bson>().apply {
        add(gte("created_at", req.createdAt))
        add(lte("created_at", req.terminatedAt))
        add(`in`("call_id", req.callId))

    return mongoClient.find("sip_call_raw", Pair(req.createdAt, req.terminatedAt), and(filters))
Enter fullscreen mode Exit fullscreen mode

4 - Profit 👌

As you can see now, we managed to work with partitioned data in the way we usually work with just a single collection.

Please, leave us a comment if you liked the idea and share with us how you implement data partitioning in your projects!

👨‍💻 Happy coding,
Your SIP3 team.

Top comments (0)