DEV Community

Cover image for Apache Kafka WebSocket data ingestion using Spring Cloud Stream
Bartosz Gajda
Bartosz Gajda

Posted on • Originally published at bartoszgajda.com

Apache Kafka WebSocket data ingestion using Spring Cloud Stream

In this tutorial I want to show you how to connect to WebSocket data source and pass the events straight to Apache Kafka. For this, I will use the Spring Cloud Stream framework. Enjoy!

For this tutorial, I use:

  • IntelliJ IDEA
  • Meetup.com RSVP API

Creating Spring Cloud Stream project

First step is to create a Spring project with the necessary dependencies. I highly suggest using Spring Initializr to generate a clean Spring project. This comes with an integration in IntelliJ IDEA as well. I will be using Java 8 for this project. The pom.xml file should include the following:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.RC1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

For this project we only want the WebSocket starter, Cloud Stream package and the corresponding binder to Apache Kafka. You can use different binders (like RabbitMQ), but this tutorial covers the Apache Kafka. Make sure to Maven resolves these dependencies, before going any further.

Adding application properties

Next we add some basic configuration to our Spring app. Note: I have converted the application.properties file into application.yml file . This is totally optional, both versions of configuration files are accepted by Spring. If using .properties file, you will need a different structure, which you should be able to infer from my configuration though.

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          transaction:
            producer:
              partition-count: 1
      bindings:
        rsvp-in:
          destination: rsvp
          contentType: application/json
        rsvp-out:
          destination: rsvp
          contentType: application/json
server:
  port: 8081

In this file, we set the address (brokers) of Apache Kafka broker (localhost:9092 *in this case), and the bindings to topics. Spring is using them to figure out where to push and from where to fetch the events. I have named my topic (destination) *rsvp, although you can change it if needed. The rsvp-in and rsvp-out bindings will be explained in next chapter. The content type in my case is application/json. The WebSocket source I am using sends events in JSON format, hence I set this to application/json. If your use case requires something else, please look up the Spring Cloud Stream docs for more information.

In my case, I am using a local version of Apache Kafka broker. This tutorial doesn’t include information on how to set it up. The official docs of Apache Kafka explains how to do it.

Configuring Apache Kafka output and input channels

First of all it’s worth to show you how this tutorial’s project is structured. The project I am using includes two inner packages: config, and service. In Config we set up the Spring according to what has been added to application.yml file. Service includes the classes that connect to WebSocket and push the events to Apache Kafka.

Let’s start with configuration part. The RsvpStreams interface includes our binding to Apache Kafka's input and output. You should see that they are the same as specified in application.yml file. Both of them should be equal so check you spelling if any error happens.

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface RsvpStreams {
  String INPUT = "rsvp-in";
  String OUTPUT = "rsvp-out";

  @Input(INPUT)
  SubscribableChannel inboundRsvp();

  @Output(OUTPUT)
  MessageChannel outboundRsvp();
}

I will be using only OUTPUT for this project. INPUT may come in handy if you want to ingest some data from Apache Kafka for testing/debugging purposes.

Final configuration can be found in StreamsConfig file. Here we bind the interface created before to concrete class, which will be used in later steps.

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(RsvpStreams.class)
public class StreamsConfig {
}

Creating Apache Kafka service

Next we create a service that will be taking a single event and pushing it straight to Kafka topic — RsvpService. You are free to change its name to anything that makes sense for you. The service itself is quite straight forward and should be easy to understand. Note that the Logger is totally optional - I have included it for debugging purposes.

import com.bartoszgajda.honors.messaging.config.RsvpStreams;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

import java.util.logging.Level;
import java.util.logging.Logger;

@Service
public class RsvpService {
  private static final Logger logger = Logger.getLogger(RsvpService.class.getName());
  private final RsvpStreams rsvpStreams;

  public RsvpService(RsvpStreams rsvpStreams) {
    this.rsvpStreams = rsvpStreams;
  }

  public void sendRsvp(final String rsvp) {
    logger.log(Level.INFO, "New RSVP");
    MessageChannel messageChannel = rsvpStreams.outboundRsvp();
    messageChannel.send(MessageBuilder.withPayload(rsvp).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
  }
}

In the file above, first we inject the RsvpStreams class, that will be used to create a correct message channel to route our WebSocket data. The sendRsvp function takes a string *parameter, being the actual event ingested from WS and sets up the correct channel. *outbound *in this case because we want to push data to Apache Kafka, and not ingest from it. Then sends a message to Apache Kafka using send method. This method takes a payload as a parameter (any type can be used there), adds *Content-Type header of application/json and submits the data to Apache Kafka.

On thing you may notice is that I have mentioned that I will be using JSON data type which is closer to Java’s Object *that *String. That is completely true. The reason for doing so is that I don’t parse the events from JSON *format into some concrete *class type. This is because other components in my system do that. If you need to parse the *JSON *you can do that and still be able to send this to Apache Kafka. The send method accepts generic types, so you are free to model that data as needed.

Adding WebSocket service

The last big part of this tutorial is to create a service that will connect to WebSocket data source and use the RsvpService to send the data to Apache Kafka. In my case, this service is called WebsocketService.

import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

@Service
public class WebsocketService extends AbstractWebSocketHandler {
  private final RsvpService rsvpService;

  public WebsocketService(RsvpService rsvpService) {
    this.rsvpService = rsvpService;
  }

  @Override
  public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
    this.rsvpService.sendRsvp((String)message.getPayload());
  }
}

In this service, we extend the AbstractWebSocketHandler interface, which requires to implement a handleMessage method. First we inject the RsvpService which was created in previous step. Then in the method extended from interface, we get two parameters: WebSocketSession and WebSocketMessage. In this case, we are interested in message. This message is casted to String type, and passed to sendRsvp method, which takes care of pushing it to Apache Kafka- as simple as that.

Hooking up services into Spring App

Last bit is to hook up the services created to ApplicationRunner, so that it gets executed along with Spring app itself. First thing needed though is the WebSocket endpoint. In my case it's Meetup.com's RSVP, but in your case make sure to change it to your desired endpoint. In the ApplicationRunner we create a new WebSocketClient and pass the WebsocketService as a parameter, along with desired endpoint.

import com.bartoszgajda.honors.messaging.service.WebsocketService;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;

@SpringBootApplication
public class MessagingApplication {
    private static final String MEETUP_RSVP_ENDPOINT = "ws://stream.meetup.com/2/rsvps";

    public static void main(String[] args) {
        SpringApplication.run(MessagingApplication.class, args);
    }

    @Bean
    public ApplicationRunner initializeConnection(WebsocketService websocketService) {
        return args -> {
            WebSocketClient socketClient = new StandardWebSocketClient();
            socketClient.doHandshake(websocketService, MEETUP_RSVP_ENDPOINT);
        };
    }
}

And that’s basically it — now you should be able to run the application and successfully ingest data from WebSocket and push them to local instance of Apache Kafka.

Summary

I hope you have found this post useful. If so, don’t hesitate to like or share this post. Additionally you can follow me on my social media if you fancy so :)

Top comments (0)