DEV Community

Kamalesh-Seervi
Kamalesh-Seervi

Posted on • Originally published at kamaleshseervi.Medium on

Real-Time Trading App: Golang, Kafka, Websockets — Setting up Consumer & Websockets(PART-3)

Real-Time Trading App: Golang, Kafka, Websockets — Setting up Consumer & Websockets(PART-3)

Step-by-step guide on configuring a Kafka consumer in Golang for real-time data processing.

golang
PART-3

Creating Consumer Service & Websockets

In this ongoing series, we’ve delved into the introductory aspects of our tech stack and explored the high-level architecture. Our journey began with the implementation of a producer service using Kafka and Golang. Having accomplished this initial phase, the focus now shifts to the creation of a consumer service. We will delve into the process of actively listening for Kafka events and seamlessly transmitting real-time ticker data to the frontend using websockets.

Why did we choose to employ websockets instead of directly integrating Kafka on the frontend?

  • In the initial post, it was highlighted that Kafka lacks substantial support for web browsers due to various reasons. Notably, Kafka primarily utilizes TCP, and browsers tend to keep TCP connections open for very brief durations. Additionally, Kafka’s distribution policy, where messages are spread across consumers, poses a challenge when each browser tab is treated as a separate consumer. This distribution not only affects tabs but also extends to other devices consuming the data.
  • Moreover, Kafka consumers demand a significant amount of resources to manage offsets, message reads, and overall states. In contrast, websockets offer a lightweight alternative. This brief explanation serves as a justification for opting to use websockets to efficiently deliver real-time data to the frontend. Now, let’s delve into the implementation details of the consumer side.

Folder Structure:

core/settings.go

package core

import (
 "fmt"
 "log"
 "os"
 "strings"

 "github.com/joho/godotenv"
)

var TICKERS []string
var KAFKA_HOST string
var KAFKA_PORT string

func Load() {

 err := godotenv.Load("../.env")
 if err != nil {
  log.Fatal("Failed to load environment file")
 }
 t := os.Getenv("TICKERS")
 TICKERS := strings.Split(t, ",")
 LoadTikers(TICKERS)

 KAFKA_HOST = "127.0.0.1"
 KAFKA_PORT = "9092"
 fmt.Println(TICKERS)
}
Enter fullscreen mode Exit fullscreen mode

core/ticker.go

package core

import "strings"

var tickerSet map[string]struct{}

func GetAllTickers() []string {
 tickerList := []string{}
 for key := range tickerSet {
  tickerList = append(tickerList, key)
 }
 return tickerList
}

func IsTickerAllowed(ticker string) bool {
 _, ok := tickerSet[strings.ToLower(ticker)]
 return ok
}

func LoadTikers(tickers []string) {
 if tickerSet == nil {
  tickerSet = make(map[string]struct{})
 }
 for _, t := range tickers {
  tickerSet[strings.ToLower(strings.Trim(strings.Trim(t, "\\"), "\""))] = struct{}{}
 }
}
Enter fullscreen mode Exit fullscreen mode

In our main.go file, the settings.go module serves the purpose of loading all environment variables, ensuring a streamlined configuration process. On the other hand, ticker.go houses a comprehensive set of tickers and functions that will prove instrumental in upcoming stages of our implementation.

Now, let’s integrate and configure these components within our main.go.

Initial code for main.go

package main

import (
 "github.com/gin-contrib/cors"
 "github.com/gin-gonic/gin"
 "github.com/kamalesh-seervi/consumer/api"
 "github.com/kamalesh-seervi/consumer/core"
)

func main() {
 core.Load()
}
Enter fullscreen mode Exit fullscreen mode

Keeping it simple, we incorporate core.Load() in our main.go to ensure the environment file is loaded seamlessly. As a quick check, let's print the tickers using

fmt.Println(core.GetAllTickers())
Enter fullscreen mode Exit fullscreen mode

Now lets build is the API and Websocket Connection:

api/ticker.go

package api

import (
 "context"
 "fmt"
 "log"
 "strings"

 "github.com/gin-gonic/gin"
 "github.com/gorilla/websocket"
 "github.com/segmentio/kafka-go"

 "github.com/kamalesh-seervi/consumer/core"
)

func GetAllTickers(c *gin.Context) {
 c.JSON(200, core.GetAllTickers())
}

func ListenTicker(c *gin.Context) {
 conn, err := websocket.Upgrade(c.Writer, c.Request, nil, 1024, 1024)
 if err != nil {
  log.Println("WebSocket Upgrade Error: ", err)
  return
 }
 defer conn.Close()

 currTicker := c.Param("ticker")
 log.Println("Current ticker: ", currTicker)

 if !core.IsTickerAllowed(currTicker) {
  conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Ticker is not allowed"))
  log.Println("Ticker not allowed ticker: ", currTicker)
  return
 }

 topic := "trades-" + strings.ToLower(currTicker)
 reader := kafka.NewReader(kafka.ReaderConfig{
  Brokers: []string{core.KAFKA_HOST + ":" + core.KAFKA_PORT},
  Topic: topic,
 })
 reader.SetOffset(-1)
 defer reader.Close()

 conn.SetCloseHandler(func(code int, text string) error {
  reader.Close()
  log.Printf("Received connection close request. Closing connection .....")
  return nil
 })

 go func() {
  code, wsMessage, err := conn.NextReader()
  if err != nil {
   log.Println("Error reading last message from WS connection. Exiting ...")
   return
  }
  fmt.Printf("CODE : %d MESSAGE : %s\n", code, wsMessage)
 }()

 for {
  message, err := reader.ReadMessage(context.Background())
  if err != nil {
   log.Println("Error: ", err)
   return
  }
  fmt.Println("Reading..... ", string(message.Value))

  err = conn.WriteMessage(websocket.TextMessage, message.Value)
  if err != nil {
   log.Println("Error writing message to WS connection: ", err)
   return
  }
 }
}
Enter fullscreen mode Exit fullscreen mode

Let’s break down the functionality of the ListenTicker function:

  1. WebSocket Upgrade:
  • The function begins by attempting to upgrade the HTTP connection to a WebSocket connection using websocket.Upgrade.
  • If the upgrade fails, an error is logged, and the function returns.
  1. Parameters and Ticker Validation:
  • The current ticker is extracted from the request parameters.
  • The function checks if the current ticker is allowed using the core.IsTickerAllowed function.
  • If the ticker is not allowed, a message is sent to the WebSocket client indicating that the ticker is not allowed, and the function returns.

3. Kafka Topic Configuration:

  • A Kafka topic is constructed based on the lowercase version of the current ticker.
  • A Kafka reader is created using the kafka.NewReader function, configured with the Kafka broker information and topic.
  • The reader’s offset is set to -1 to read messages from the latest available.

4. WebSocket Connection Configuration:

  • A close handler is set on the WebSocket connection to handle closure events. It ensures that the Kafka reader is closed when the WebSocket connection is closed.

5. Goroutine for Handling WebSocket Messages:

  • A goroutine is launched to handle WebSocket messages. It reads the last message from the WebSocket connection, logs the details, and exits.

6. Reading Kafka Messages and Broadcasting to WebSocket:

  • The function enters a loop where it continuously reads messages from the Kafka topic using reader.ReadMessage.
  • Each Kafka message is then written to the WebSocket connection using conn.WriteMessage.

7. Error Handling:

  • Errors during the WebSocket message reading or writing process are logged.

Now let’s link all this and expose ws and api connections.

api/routing.go

package api

import (
 "github.com/gin-contrib/cors"
 "github.com/gin-gonic/gin"
)

func AddRoutes(router *gin.Engine) {
 router.Use(cors.Default())

 apiV1 := router.Group("/api/v1")
 {
  apiV1.GET("/tickers", GetAllTickers)
 }

 // WebSocket route
 router.GET("/ws/trades/:ticker", func(c *gin.Context) {
  if c.Request.Header.Get("Upgrade") != "websocket" {
   c.JSON(400, gin.H{"error": "WebSocket upgrade required"})
   return
  }

  // Specific WebSocket logic here
  ListenTicker(c)
 })
}
Enter fullscreen mode Exit fullscreen mode

Final main.go

package main

import (
 "github.com/gin-contrib/cors"
 "github.com/gin-gonic/gin"
 "github.com/kamalesh-seervi/consumer/api"
 "github.com/kamalesh-seervi/consumer/core"
)

func main() {
 core.Load()
 router := gin.Default()

 // CORS middleware
 config := cors.DefaultConfig()
 config.AllowOrigins = []string{"*"}
 config.AllowMethods = []string{"GET", "POST", "HEAD", "PUT", "DELETE", "PATCH", "OPTIONS"}
 config.AllowHeaders = []string{"Origin", "Content-Type", "Accept", "Content-Length", "Accept-Language", "Accept-Encoding", "Connection", "Access-Control-Allow-Origin"}
 config.AllowCredentials = true
 router.Use(cors.New(config))

 // Add routes

 api.AddRoutes(router)

 // Start server
 router.Run(":8000")
}
Enter fullscreen mode Exit fullscreen mode
  • Open your web browser and navigate to the URL 127.0.0.1:8000/api/v1/tickers. You will receive a response similar to the following.


Tickers

Before proceeding, ensure that you have built the producer service and confirmed that it is actively running to facilitate data pushing to Kafka. If you are running the service with Docker, remember to update the .env file, replacing 127.0.0.1 with kafka. If you are running it locally, you can disregard this step.

To swiftly execute this, use the provided command.

sudo docker-compose up -d 
&
Run Both producer and consumer build files.
Enter fullscreen mode Exit fullscreen mode

Now let’s test the web socket stream.


Websocket stream


Consumer Live read data

  • If you successfully receive the data, your backend is now seamlessly linked to Kafka.

With this, we are approaching the final stages. The data flows from the backend to the frontend, and in the upcoming article, we will delve into visualizing and interpreting the data using charts to better understand its dynamics.

Conclusion

In conclusion, this series of articles has guided us through the establishment of a robust tech stack and a well-designed high-level architecture. We initiated the implementation with the creation of a producer service, integrating Kafka and Golang for efficient data transmission. The decision to employ websockets on the frontend was motivated by the limitations of directly using Kafka in browsers, considering factors such as TCP connection handling and resource utilization.

With the backend successfully linked to Kafka, we now have a functional system where data flows seamlessly from the producer service to the frontend through websockets. The backend is not only capable of fetching real-time data but also validating and broadcasting it to connected clients.

GitHub - Kamalesh-Seervi/Real-time-trade-app: Kafka, WebSockets

Top comments (0)