DEV Community

Oleg Agafonov for SIP3

Posted on • Updated on

Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.

I've been working with the Vert.x framework for more than 4 years but I won't stop being excited how simple, lightweight and elegant it is (especially the event loop thread model). In this blog post I will tell you how we implemented PeriodicallyExpiringHashMap data structure in less than 100 lines of code. But first let me give you a bit of a context about why do we need it.

Problem

SIP3 is a very advanced VoIP monitoring and troubleshooting platform. To provide detailed information about calls quality we need to:

  1. Aggregate RTP packets into RTP streams in a real-time
  2. Periodically walk though all the RTP streams and terminate ones that haven't been updated for a certain period of time.

Let's stay away from telecom specific and take a look at a simplified code example:

class RtpStreamHandler : AbstractVerticle() {

    var expirationDelay: Long = 1000
    var aggregationTimeout: Long = 30000

    private val rtpStreams = mutableMapOf<String, RtpStream>()

    override fun start() {
        vertx.setPeriodic(expirationDelay) {
            val now = System.currentTimeMillis()

            rtpStreams.filterValues { rtpStream -> rtpStream.updatedAt + aggregationTimeout < now }
                .forEach { (rtpStreamId, rtpStream) ->
                    terminateRtpStream(rtpStream)
                    rtpStreams.remove(rtpStreamId)
                }
        }

        vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
            val rtpPacket = event.body()
            handleRtpPacket(rtpPacket)
        }
    }

    fun handleRtpPacket(rtpPacket: RtpPacket) {
        val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
        rtpStream.addPacket(rtpPacket)
    }

    fun terminateRtpStream(rtpStream: RtpStream) {
        vertx.eventBus().localSend("on_rtp_stream", rtpStream)
    }
}
Enter fullscreen mode Exit fullscreen mode

Now let's imagine that we constantly have 30K of active RTP streams. Also every second we terminate approximately a thousand of old steams but get a thousand of new ones instead. In these circumstances our code doesn't look very efficient and we certainly need a better solution.

Solution

As you can see from the first code snippet once an RTP stream was updated it won't be terminated at least for the next aggregationTimeout. This means that we can simply do not bother about it for some time.

And this is the key idea behind the SIP3 PeriodicallyExpiringHashMap implementation:

class PeriodicallyExpiringHashMap<K, V> private constructor(
    vertx: Vertx,
    private val delay: Long,
    private val period: Int,
    private val expireAt: (K, V) -> Long,
    private val onExpire: (K, V) -> Unit
) {

    private val objects = mutableMapOf<K, V>()
    private val expiringSlots = (0 until period).map { mutableMapOf<K, V>() }.toList()
    private var expiringSlotIdx = 0

    init {
        vertx.setPeriodic(delay) {
            terminateExpiringSlot()
            expiringSlotIdx += 1
            if (expiringSlotIdx >= period) {
                expiringSlotIdx = 0
            }
        }
    }

    fun getOrPut(key: K, defaultValue: () -> V): V {
        return objects.getOrPut(key) {
            defaultValue.invoke().also { expiringSlots[expiringSlotIdx][key] = it }
        }
    }

    private fun terminateExpiringSlot() {
        val now = System.currentTimeMillis()

        expiringSlots[expiringSlotIdx].apply {
            forEach { (k, v) ->
                val expireAt = expireAt(k, v)

                when {
                    expireAt <= now -> {
                        objects.remove(k)?.let { onExpire(k, it) }
                    }
                    else -> {
                        var shift = ((expireAt - now) / delay).toInt() + 1
                        if (shift >= period) {
                            shift = period - 1
                        }
                        val nextExpiringSlotIdx = (expiringSlotIdx + shift) % period

                        expiringSlots[nextExpiringSlotIdx][k] = v
                    }
                }
            }
            clear()
        }
    }

    data class Builder<K, V>(
        var delay: Long = 1000,
        var period: Int = 60,
        var expireAt: (K, V) -> Long = { _: K, _: V -> Long.MAX_VALUE },
        var onExpire: (K, V) -> Unit = { _: K, _: V -> }
    ) {
        fun delay(delay: Long) = apply { this.delay = delay }
        fun period(period: Int) = apply { this.period = period }
        fun expireAt(expireAt: (K, V) -> Long) = apply { this.expireAt = expireAt }
        fun onExpire(onExpire: (K, V) -> Unit) = apply { this.onExpire = onExpire }

        fun build(vertx: Vertx) = PeriodicallyExpiringHashMap(vertx, delay, period, expireAt, onExpire)
    }
}
Enter fullscreen mode Exit fullscreen mode

Here are the benefits of this data structure:

  1. Now we just have a bunch of time slots. So, instead of walking through all the objects in our map every expirationDelay we can walk trough a single slot. So, instead of checking on 30K objects every second we will check on 1K only.
  2. We don't need to create a copy of original map every time we decide to walk though it. In the previous example it also was an issue, because rtpSteams.filtervalues creates a copy of the original map.
  3. The last and the most important. Our implementation will stay consistent within a particular verticle context. That means you can simply extend it and implement the rest of the methods (including tricky ones, like size()).

Conclusions

Finally let's see how our verticle will look like with the new PeriodicallyExpiringHashMap data structure:

class RtpStreamHandler : AbstractVerticle() {

    var expirationDelay: Long = 1000
    var aggregationTimeout: Long = 30000

    private lateinit var rtpStreams: PeriodicallyExpiringHashMap<String, RtpStream>

    override fun start() {
        rtpStreams = PeriodicallyExpiringHashMap.Builder<String, RtpStream>()
            .delay(expirationDelay)
            .period((aggregationTimeout / expirationDelay).toInt())
            .expireAt { _, rtpStream -> rtpStream.updatedAt + aggregationTimeout }
            .onExpire { _, rtpStream -> terminateRtpStream(rtpStream) }
            .build(vertx)

        vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
            val rtpPacket = event.body()
            handleRtpPacket(rtpPacket)
        }
    }

    fun handleRtpPacket(rtpPacket: RtpPacket) {
        val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
        rtpStream.addPacket(rtpPacket)
    }

    fun terminateRtpStream(rtpStream: RtpStream) {
        vertx.eventBus().localSend("on_rtp_stream", rtpStream)
    }
}
Enter fullscreen mode Exit fullscreen mode

And here are the load tests results (purple - is our new implementation):

Load Tests

The tests look great and the code looks clean and simple thanks to the Vert.x event loop thread model.

👨‍💻 Happy coding,
Your SIP3 team.

Top comments (2)

Collapse
 
mkulak profile image
Misha

Folks, try LinkedHashMap - all your entries will be iterated in the order of insertion. Which coincides with the order of expiration.
Which means you only need to iterate and remove expired until you reach the first non-expired entry. Which makes checking for expired entries amortized constant. And it also simplifies the code

Collapse
 
agafox profile image
Oleg Agafonov

Thank you for a good hint. Unfortunately it won't work in our case because our entries are not ordered. Also there are more complicated scenarios I haven't shown in this post.

Like when we aggregate SIP transaction the expiration time will be different depending on the transaction's state: github.com/sip3io/sip3-salto-ce/bl...

That's why we use a method called touch to update the entry expiration slot. And it's not really possible to implement with LinkedHashMap only: github.com/sip3io/sip3-commons/blo...