DEV Community

Kamalesh-Seervi
Kamalesh-Seervi

Posted on • Originally published at kamaleshseervi.Medium on

Real-Time Trading App: Golang, Kafka, Websockets — Setting up Kafka in Golang (PART-2)

Real-Time Trading App: Golang, Kafka, Websockets — Setting up Kafka in Golang (PART-2)

Configuring Kafka in a Golang environment and creating a build file for the producer.

golang kafka

This marks Part 2 of our real-time trade app series in Golang. In this installment, we’ll dive into coding for Kafka, focusing on establishing a connection and creating a producer build file. This step is crucial for testing the API connection to Binance’s web sockets. Stay tuned for hands-on coding and insights into optimizing the integration!

Establishing a connection with the data source.

As previously noted, we’ll utilize Binance’s WebSocket API as our data source. Within your app folder, create a trades subfolder and include the following files: listener.go, publish.go, and ticker.go.

kafka golang

Ticker.go

  • Within ticker.go, you’ll find the model for the ticker data sourced from Binance. This model serves a dual purpose, as it will be employed for both receiving and publishing the data.
package trades

type Ticker struct {
 Symbol string `json:"s"`
 Price string `json:"p"`
 Quantity string `json:"q"` 
 Time int64 `json:"T"`
}
Enter fullscreen mode Exit fullscreen mode

Listener.go

  • As the name implies, listener.go houses the code where we actively listen for specific tickers through a WebSocket connection. Let’s delve into the details, starting with the establishment of the WebSocket connection.
package trades

import (
 "encoding/json"
 "log"
 "net/url"

 "github.com/gorilla/websocket"
)

type RequestParams struct {
 Id int `json:"id"`
 Method string `json:"method"`
 Params []string `json:"params"`
}

var conn *websocket.Conn

const (
 subscribeId = 1
 unSubscribeId = 2
)

func getConnection() (*websocket.Conn, error) {
 if conn != nil {
  return conn, nil
 }

 u := url.URL{Scheme: "wss", Host: "stream.binance.com:443", Path: "/ws"}
 log.Printf("connecting to %s", u.String())
 c, resp, err := websocket.DefaultDialer.Dial(u.String(), nil)
 if err != nil {
  log.Printf("handshake failed with status %d", resp.StatusCode)
  log.Fatal("dial:", err)
 }
 conn = c

 return conn, nil
}

func CloseConnections() {
 conn.Close()
}

func EstablishConnection() (*websocket.Conn, error) {
 newConnection, err := getConnection()
 if err != nil {
  log.Fatal("Failed to get connection %s", err.Error())
  return nil, err
 }
 return newConnection, nil
}

func AddOnConnectionClose(h func(code int, text string) error) {
 conn.SetCloseHandler(h)
}

func unsubscirbeOnClose(conn *websocket.Conn, tradeTopics []string) error {
 message := struct {
  Id int `json:"id"`
  Method string `json:"method"`
  Params []string `json:"params"`
 }{
  Id: unSubscribeId,
  Method: "UNSUBSCRIBE",
  Params: tradeTopics,
 }

 b, err := json.Marshal(message)
 if err != nil {
  log.Fatal("Failed to JSON Encode trade topics")
  return err
 }

 err = conn.WriteMessage(websocket.TextMessage, b)

 return nil
}

func SubScribeAndListen(topics []string) error {
 conn, err := getConnection()
 if err != nil {
  log.Fatal("Failed to get connection %s", err.Error())
  return err
 }

 conn.SetPongHandler(func(appData string) error {
  log.Println("Received pong:", appData)
  pingFrame := []byte{1, 2, 3, 4, 5}
  err := conn.WriteMessage(websocket.PingMessage, pingFrame)
  if err != nil {
   log.Println(err)
   // no need to fail
  }
  return nil
 })

 tradeTopics := make([]string, 0, len(topics))
 for _, topic := range topics {
  tradeTopics = append(tradeTopics, topic+"@"+"aggTrade")
 }
 log.Println("Listening to trades for ", tradeTopics)
 message := RequestParams{
  Id: subscribeId,
  Method: "SUBSCRIBE",
  Params: tradeTopics,
 }
 log.Println(message)
 b, err := json.Marshal(message)
 if err != nil {
  log.Fatal("Failed to JSON Encode trade topics")
  return err
 }

 err = conn.WriteMessage(websocket.TextMessage, b)
 if err != nil {
  log.Fatal("Failed to subscribe to topics " + err.Error())
  return err
 }

 defer unsubscirbeOnClose(conn, tradeTopics)
 defer conn.Close()

 for {
  _, payload, err := conn.ReadMessage()
  if err != nil {
   log.Println(err)
   return err
  }

  trade := Ticker{}

  err = json.Unmarshal(payload, &trade)
  if err != nil {
   log.Println(err)
   return err
  }

  log.Println(trade.Symbol, trade.Price, trade.Quantity)
 }
}

Enter fullscreen mode Exit fullscreen mode

This code defines the listener functionality for the real-time trading app, focusing on establishing a WebSocket connection and handling incoming ticker data from Binance. Let’s break it down:

  1. Imports:
  • Necessary packages are imported, including “github.com/gorilla/websocket” for WebSocket communication and “github.com/segmentio/kafka-go” for Kafka integration.

2. Data Structures:

  • RequestParams: Struct to represent the parameters for the WebSocket request.
  • conn: Variable to store the WebSocket connection.

3. Constants:

  • subscribeId and unSubscribeId : Constants representing subscription and unsubscription identifiers.

4. Functions:

  • getConnection(): Establishes a WebSocket connection to Binance’s streaming service.
  • CloseConnections(): Closes the WebSocket connection.
  • EstablishConnection(): Ensures a connection is established; if not, initiates a new one.
  • AddOnConnectionClose(h func(code int, text string) error): Adds a handler for WebSocket connection closure. unsubscirbeOnClose(conn *websocket.Conn, tradeTopics []string) error: Unsubscribes from specified trade topics upon connection closure.
  • SubScribeAndListen(topics []string) error: Subscribes to specified trade topics and listens for incoming data, then converts and publishes it to Kafka.

5. WebSocket Operations:

 — Connection handling, ping-pong setup, subscription to trade topics, and handling incoming messages are managed.

6. Data Processing:

 — Upon receiving trade data, it’s deserialized into a Ticker struct. The data is then logged, and a goroutine is spawned to convert and publish the data to Kafka.

This comprehensive listener code sets the foundation for actively retrieving and processing real-time trading data from Binance through WebSockets, preparing it for further integration and analysis in the application.

Testing

  • Execute this code by invoking the SubScribeAndListen function in your main.go file.
package main

import (
 "fmt"
 "os"
 "strings"
 "github.com/joho/godotenv"
  "github.com/kamalesh-seervi/trade-app/producer/trades" // <==== add this or it will autoadd
)

func main() {
 err := godotenv.Load("../.env")
 if err != nil {
  fmt.Print("Failed to load environment")
 }
 t := os.Getenv("TICKERS")
 topics := strings.Split(t, ",")
 for i,topic := range topics {
  topics[i] = strings.Trim(strings.Trim(topic,"\\"),"\"") 
 }

trades.SubScribeAndListen( // We need to add this to make it work and listen the websocket.
  topics,
 )

}
Enter fullscreen mode Exit fullscreen mode
  • Build and run the app.
go build . && ./trade-app // change it accordingly
Enter fullscreen mode Exit fullscreen mode


Realtime Data from the Binance API

Publish.go

package trades

import (
 "context"
 "log"
 "github.com/segmentio/kafka-go"
)

var (
 HOST string
 PORT string
);

func LoadHostAndPort(host string, port string){
 HOST = host
 PORT = port
} 

func Publish(t string, message kafka.Message, topic string) error {

 messages := []kafka.Message{
  message,
 }

 w := kafka.Writer{
  Addr: kafka.TCP(HOST + ":" + PORT), //127.0.0.1:9092 or kafka:9092 in docker
  Topic: topic,
  AllowAutoTopicCreation: true,
 }
 defer w.Close()

 err := w.WriteMessages(context.Background(), messages...)
 if err != nil {
  log.Println("Error writing msg to Kafka: ", err.Error())
  return err
 }

 log.Println("Publish msg to Kafka on topic: ", topic)

 return nil
}
Enter fullscreen mode Exit fullscreen mode
  • Integrate this function into the listener.go file to facilitate Kafka publishing.
func SubScribeAndListen(topics []string) error {

  ...
  ...
  ...

 for {
  _, payload, err := conn.ReadMessage()
  if err != nil {
   log.Println(err)
   return err
  }

  trade := Ticker{}

  err = json.Unmarshal(payload, &trade)
  if err != nil {
   log.Println(err)
   return err
  }

  log.Println(trade.Symbol, trade.Price, trade.Quantity)
  go func() { // <=== here
   convertAndPublishToKafka(trade)
  }() 
 }
}

// add this function
func convertAndPublishToKafka(t Ticker) { 
 bytes, err := json.Marshal(t)
 if err != nil {
  log.Println("Error marshalling Ticker data", err.Error())
 }

 Publish(t.Symbol, kafka.Message{
  Key: []byte(t.Symbol + "-" + strconv.Itoa(int(t.Time))),
  Value: bytes,
 }, "trades-"+strings.ToLower(t.Symbol))
}
Enter fullscreen mode Exit fullscreen mode

In the existing code snippet, a goroutine is initiated using the “go” keyword to concurrently execute the convertAndPublishToKafka function. Here's an explanation for your blog:

  1. Logging Trade Data:
log.Println(trade.Symbol, trade.Price, trade.Quantity)
Enter fullscreen mode Exit fullscreen mode

This line logs essential details of the incoming trade data, providing visibility into the traded symbol, price, and quantity.

2. Goroutine for Kafka Publishing:

go func() { convertAndPublishToKafka(trade) }(
Enter fullscreen mode Exit fullscreen mode
  • By using a goroutine, this section ensures non-blocking execution of the convertAndPublishToKafka function. This approach enhances the overall efficiency of the application, allowing it to continue processing incoming trade data without waiting for the Kafka publishing process to complete.

3. convertAndPublishToKafka Function:

func convertAndPublishToKafka(t Ticker) { // ... (existing code) }
Enter fullscreen mode Exit fullscreen mode
  • This function takes the received trade data (Ticker object), marshals it into JSON format, and then publishes it to Kafka. Key details, such as the trading symbol and timestamp, are utilized for Kafka message formatting. Any errors during the marshaling process are appropriately logged.

We’ve wrapped up the implementation of our producer service. If you have any questions, feel free to leave a comment below. I’ll be sharing the complete code shortly as we progress through the series, so stay tuned for that.

What’s coming up next?

Our next focus will be on the consumer service. In this part, we’ll be exposing a WebSocket API for the frontend. Data will be pushed to the frontend through a Kafka listener. Stay tuned for the upcoming content!

GitHub - Kamalesh-Seervi/Real-time-trade-app: Kafka, WebSockets

Top comments (0)