loading...

Publish Vonage Events to Kafka with Spring Boot

cr0wst profile image Steve Crow ・7 min read

Event-Driven Architecture is the asynchronous, dynamic processing of events to communicate between decoupled services.

For example, we might have a unified communication platform that manages millions of users and conversations. We might bring in these conversations from a multitude of sources and process them in many different ways.

Event-Driven Architecture enables us to distribute processing of these events between various services which might:

  • Transfer information to long-term storage like a data lake
  • Perform administrative tasks like stopping spammers in real-time.

Events can come from many different channels. In this guide, you will build a Spring Boot application, using Kotlin, that can receive events from the Vonage Communication APIs and store them in Apache Kafka.

This guide assumes that you have some experience with Spring Boot and the Vonage Communication APIs.

An example of the code can be found on GitHub: cr0wst/vonage-events-kafka

What are the Vonage Communication APIs?

The Vonage Communication APIs enable developers to create things like in-app messaging, voice chat, and integration with third-party chat services.

Many of these actions generate Events, which can be handled by your application via a pre-configured Webhook.

I have created a Demo Application that can generate a random conversation to produce events.

What is Kafka?

Apache Kafka is a horizontally scalable, fault-tolerant tool for building real-time data pipelines and streaming applications.

Kafka allows you to publish and subscribe to topics, which serve as a category for records to be published. A topic can have multiple publishers and consumers. Unlike traditional message brokers, it can store historical data indefinitely.

You can read more about Kafka on the Confluent Docs.

This guide will cover the following concepts:

  • Connecting a Spring Boot Application to Kafka.
  • Creating a controller to serve as a Webhook for incoming Vonage Communication events.
  • Pipelining these events into a Kafka topic for later processing.

Download the Demo Application

The Pre-Setup instructions will guide you through creating a Vonage Application. You can also follow the official documentation for Creating an Application.

Most of this guide assumes that you are already receiving webhook events from Vonage, but you aren't currently sending them to Kafka.

If you don't have an application, I have created a Demo Application, which simulates multiple conversations using the Vonage Conversation API.

Initialize a New Spring Boot Application

Use your IDE or the Spring Initializr to create a new Spring Boot Application with Spring For Apache Kafka and Spring Reactive Web:

The Spring Initializr with Kafka and Reactive Web Selected

Create a Webhook

The Vonage Communication APIs send event messages to a pre-configured webhook.

Configure the Event URL

The Event URL can be defined using the Vonage Dashboard

The Vonage Dashboard showing an application being edited to have its RTC event URL updated

This has to be a publicly-accessible URL. You can use a tool like ngrok to safely tunnel traffic to your application. Check out this post on Working with Webhooks using Ngrok.

I'm going to start a tunnel using Ngrok to listen on port 8080:

ngrok http 8080 -subdomain=cr0wst

Create a Controller

Create a new class called EventWebhookController and add the following code:

package com.smcrow.demo

import org.slf4j.LoggerFactory
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController

@RestController
class EventWebhookController {
    @PostMapping("/webhooks/events")
    fun incomingEvents(@RequestBody body: String) {
        // Handle the Event Here
        log.info(body)
    }

    companion object {
        private val log = LoggerFactory.getLogger(this::class.java)
    }
}

This code registers a route at /webhooks/events that accepts POST requests and logs the request body.

If you were to start both applications, you would see a stream of Vonage events:

Start the Webhook Application:

./mvnw spring-boot:run

Now, start the demo application:

cd ~/Code/vonage-conversation-demo
./mvnw spring-boot:run

The top terminal shows the events from Vonage, and the bottom is the application generating random conversations and messages:

A horizontally split terminal showing the webhook application on top and the automatic conversation app running on the bottom

Now that your application is receiving events from Vonage, it's time to set up Kafka.

Setting up Kafka

If you don't already have access to a Kafka cluster, there are a few solutions.

The simplest way is to sign up for a service like Confluent Cloud. I will be using Confluent for the remainder of this guide and the Confluent Cloud CLI

Create a Cluster

Create a new cluster on Confluent Cloud with the following command:

ccloud kafka cluster create demo --cloud aws --region us-east-2

The output has an ID number and an Endpoint, record these for later. For this guide, I will be using the ID lkc-03wkq and endpoint pkc-ep9mm.us-east-2.aws.confluent.cloud:9092

Create a Topic

Create the vonage.webhook.events topic inside of your Kafka cluster for storing the Vonage webhook events:

ccloud kafka topic create --cluster lkc-03wkq --partitions 1 vonage.webhook.events

Connecting a Spring Boot Application to Kafka

To connect your Spring Boot Application to Confluent Cloud, you'll need to create an API Key and Secret. Run the following command, replacing the resource with your ID from a previous step.

ccloud api-key create --resource=lsrc-7qz91

You will also need your bootstrap server address, which is the endpoint from a previous step.

Using the bootstrap server address, API Key and Secret, add the following properties to application.properties, replacing the appropriate variables.

# Kafka
spring.kafka.properties.ssl.endpoint.identification.algorithm=https
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.bootstrap-servers={{ BOOTSTRAP_SERVER_ADDRESS }}
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username="{{ CLUSTER_API_KEY }}"   password="{{ CLUSTER_API_SECRET }}";
spring.kafka.properties.security.protocol=SASL_SSL

# Producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

For example:

# Kafka
spring.kafka.properties.ssl.endpoint.identification.algorithm=https
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.bootstrap-servers=pkc-ep9mm.us-east-2.aws.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username="my-api-key"   password="my-api-secret";
spring.kafka.properties.security.protocol=SASL_SSL

# Producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Produce Messages from Spring Boot

The Spring Kafka project uses a KafkaTemplate object to produce messages. KafkaTemplate is generic with two generic types: one for the Kafka key, and one for the value.

Update the EventWebhookController to inject a KafkaTemplate<String, String> as a dependency:

package com.smcrow.demo

import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController

@RestController
class EventWebhookController(private val kafkaTemplate: KafkaTemplate<String, String>) {
    @PostMapping("/webhooks/events")
    fun incomingEvents(@RequestBody body: String) {
        // Handle the Event Here
        log.info(body)
    }

    companion object {
        private val log = LoggerFactory.getLogger(this::class.java)
    }
}

Now, update the incomingEvents method to send the event to Kafka:

package com.smcrow.demo

import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController

@RestController
class EventWebhookController(private val kafkaTemplate: KafkaTemplate<String, String>) {
    @PostMapping("/webhooks/events")
    fun incomingEvents(@RequestBody body: String) {
        // Handle the Event Here
        kafkaTemplate.send("vonage.webhook.events", body)
    }

    companion object {
        private val log = LoggerFactory.getLogger(this::class.java)
    }
}

Consuming Kafka Messages Using the CLI

You can use the Confluent Cloud CLI to consume events on the vonage.webhook.events topic.

First, configure the CLI to use your API key from before, replacing my-api-key with your API key and lkc-03wkq with the ID from a previous step:

ccloud api-key use my-api-key --resource lkc-03wkq

Now, start consuming messages:

ccloud kafka topic consume --cluster lkc-03wkq -b vonage.webhook.events

Remember that Kafka is a commit log. The -b flag starts consumption from the beginning. Using this flag means you'll get a back-fill of messages that you missed while you weren't connected.

Start Everything

First, start the Kafka Consumer if you haven't already:

ccloud kafka topic consume --cluster lkc-03wkq -b vonage.webhook.events

Now start the Webhook Application:

cd ~/Code/vonage-events-kafka-demo
./mvnw spring-boot:start

Finally, start the conversation simulator application:

cd ~/Code/vonage-conversation-demo
./mvnw spring-boot:run

The following shows all three running:

A horizontally split terminal. The top is vertically split showing the two applications running. The bottom is showing the kafka consumer output

Conclusion

In this guide, you created a Spring Boot Application that can listen to Vonage Events and publish them to Kafka.

An example of the code can be found on GitHub: cr0wst/vonage-events-kafka

In another guide, I'll show you how to consume these events and store them in a relational database like Snowflake.

Here are some links you might find useful:

Posted on by:

cr0wst profile

Steve Crow

@cr0wst

Steve is a lover of Greyhounds, twisty puzzles, and European Board Games. When not talking math to non-math people, and Java to non-Java people, he can be found sipping coffee and hacking on code.

Discussion

markdown guide