DEV Community

Sourab Pramanik
Sourab Pramanik

Posted on • Updated on

Event Driven services using Kafka, SurrealDB, Rust, and Go.

Kafka?

To understand the purpose of Kafka you need to experiment with it in various contexts and reason with it. This small project is a very slight overview of one of the use cases of Kafka: streaming messages to drive multiple service-agnostic applications. Streaming messages using Kafka is not required to specify the destination where these messages should land. Similarly, it is not relevant for the receiver to be aware of the sender in any context unless done manually.

Kafka assures the integrity of the messages in the stream in the face of system failure saving us from inconsistent data, data loss, or redundancy. It is much more powerful when used across systems and applications built upon different tech stacks, architecture, and purposes and is distributed over the network. To read more about it check out this eBook.

So to get a basic understanding of what Kafka is and how it operates, we will be creating two different applications and we will see how they can communicate with each other without making any HTTP request internally.

Event-driven services?

A service is a logical unit within a larger application, responsible for executing specific tasks based on given inputs. Each service can communicate with other services using either synchronous communication methods (such as HTTP/REST, gRPC, and GraphQL) or asynchronous communication methods (such as webhooks, message queues, and event streaming). In this article, we will focus on using an event-streaming communication pattern to coordinate various services and perform the necessary operations to achieve the desired outcomes.

Event-driven services are those that communicate with each other in response to specific events being triggered. This communication is independent of the sources generating the events and the entities reacting to them.

Overview

To simulate a real-life example we will be building an inventory service using Rust and Actix that should receive an HTTP request to check if the product with the required units is available in the inventory, if it exists then we take that many products from the whole stock of it and when the application has successfully finished its job then we will produce a message using Kafka.

Also to keep store the data of the products and their available units we will be using a database called SurrealDB. I have chosen SurrealDB because of a specific reason which we will explore later in this article.
Now that we have produced a message from the inventory we need a consumer to consume this message by connecting to the Kafka broker. So for this, we will create a shipment service using Go to simulate the shipping process when the products are released from the inventory but to keep this project short and concise we are not going to build the whole shipment system.

Prerequisites

Basic understanding of programming and a huge amount of curiosity to know the whys and hows. Nothing else.

If you want to refer to the source code while reading this article then here is the repository

Get started

Docker compose

We will be pulling the docker images of Kafka, Zookeeper, and SurrealDB to keep the setup process light and easy.

version: "3.8"

services:
  surrealdb:
    image: surrealdb/surrealdb:latest
    container_name: surrealdb
    command: start --auth --user root --pass root file:/container-dir/dev.db
    ports:
      - "8000:8000"
    volumes:
      - surrealdb-data:/container-dir
    user: root
    environment:
      - SURREALDB_ENV_USER=root
      - SURREALDB_ENV_PASS=root
    networks:
      - net

  zookeeper-1:
    container_name: zookeeper-1
    image: zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    volumes:
      - ./config/zookeeper-1/zookeeper.properties:/kafka/config/zookeeper.properties

  kafka-1:
    container_name: kafka-1
    image: bitnami/kafka
    restart: always
    depends_on:
      - zookeeper-1
    ports:
      - 9092:9092
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper-1:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CREATE_TOPICS=stock_update:1:3
    healthcheck:
      test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server kafka:9092 --list || exit 1"]
      interval: 5s
      timeout: 10s
      retries: 5

networks:
  net:
    name: "net"
    driver: bridge

volumes:
  surrealdb-data:
Enter fullscreen mode Exit fullscreen mode

Upon execution of the compose file using this command the same directory:

docker compose up -d
Enter fullscreen mode Exit fullscreen mode

the docker engine will start three different containers meant for each image and now you will have Kafka at port 9092 and SurrealDB at port 8000 running in your local machine.

Inventory service

Create a new Rust binary application and you can name it inventory_service. Add these dependencies to your Cargo.toml file:

actix-web = "4"
dotenv = "0.15.0"
futures-util = "0.3"
rdkafka = { version = "0.36", features = ["ssl-vendored"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
surrealdb = "1.5.1"
tokio = { version = "1", features = ["full", "macros", "rt-multi-thread"] }
Enter fullscreen mode Exit fullscreen mode

To install them run this command:

cargo install
Enter fullscreen mode Exit fullscreen mode

Create a new .env file in the same path, and add these variables:

SURREAL_URL=127.0.0.1:8000
SURREAL_DB=ecommerce
SURREAL_NS=foo
SURREAL_USER=root
SURREAL_PW=root

KAFKA_BROKER=localhost:9092
KAFKA_TOPIC=stock_update
Enter fullscreen mode Exit fullscreen mode

If you have changed any of these values then make sure to change them in your .env file.

Now being in the root path create a file init.surql which will have all the queries to set up the inventory table, inventory_stock_events table, and seed some dummy products. Check this file here to get the queries you need to add. And once that is done run this command to execute all the statements by importing them using SurrealDB CLI

surreal import --conn http://localhost:8000 --user root --pass root --ns foo --db ecommerce ./ini
t.surql
Enter fullscreen mode Exit fullscreen mode

One important note, we have created inventory_stock_events table that will record the logs for the events whenever there is a change in units for any of the products. This logging happens autonomously so that we don’t have to trigger the logging process manually.

Now let's create the schema of the tables inside the src/ directory. Ad this schema to your schema.rs file:

use serde::{Deserialize, Serialize};
use surrealdb::sql::{Datetime, Id};

#[allow(dead_code)]
#[derive(Debug, Deserialize, Serialize)]
pub struct ProductThing {
    id: Id,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Product {
    pub id: ProductThing,
    pub name: String,
    pub price: u16,
    pub units: u16,
}

#[derive(Debug, Deserialize)]
pub struct UpdateProductStock {
    pub units: u16,
}

#[allow(dead_code)]
#[derive(Debug, Deserialize, Serialize)]
pub struct EventThing {
    id: Id,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct StockEvent {
    pub id: EventThing,
    pub time: Datetime,
    pub action: String,
    pub product: ProductThing,
    pub before_update: u16,
    pub after_update: u16,
}
Enter fullscreen mode Exit fullscreen mode

Create a kafka.rs file and add the Kafka producer function. This can be used with different topics and brokers:

use rdkafka::{producer::FutureProducer, ClientConfig};

pub async fn producer(brokers: &str) -> FutureProducer {
    ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .set("allow.auto.create.topics", "true")
        .create()
        .expect("Producer creation error")
}
Enter fullscreen mode Exit fullscreen mode

Now that we have the base setup let's create handler functions in main.rs :

  • Get inventory products handler

This handler function will return us the list of products and their related attributes.

async fn get_inventory_products(state: web::Data<State>) -> impl Responder {
    let db = &state.db;

    let products: Vec<Product> = match db.select("inventory").await {
        Ok(val) => val,
        Err(e) => {
            dbg!(e);
            return HttpResponse::InternalServerError().body("Server problems!!");
        }
    };
    HttpResponse::Ok().json(products)
}
Enter fullscreen mode Exit fullscreen mode
  • Update stock handler:

This handler function takes the product_id and units in the payload. It will check whether there is a product with this product_id, and units exist or not. And updates the stock units accordingly responding with appropriate messages and status

async fn update_stock(
    product_id: web::Path<String>,
    state: web::Data<State>,
    payload: web::Json<UpdateProductStock>,
) -> impl Responder {
    if product_id.is_empty() {
        return HttpResponse::BadRequest().body("Invalid Product Id");
    }

    let db = &state.db;
    let mut available_units: u16 = 0;

    if let Ok(mut query_product) = db
        .query(format!(
            "SELECT units FROM inventory:{} WHERE units>={}",
            product_id, payload.units
        ))
        .await
    {
        if let Ok(Value::Array(arr)) = query_product.take(0) {
            if !arr.is_empty() {
                if let Value::Object(obj) = &arr[0] {
                    if let Some(Value::Number(units)) = obj.get("units") {
                        available_units = units.to_usize() as u16;
                    }
                }
            } else {
                return HttpResponse::NotFound().body("Product not found or insufficient units");
            }
        } else {
            return HttpResponse::InternalServerError().body("Unexpected query response format");
        }
    } else {
        return HttpResponse::InternalServerError().body("Server Error");
    }

    if let Ok(mut update_product) = db
        .query(format!(
            "UPDATE inventory:{} SET units={}",
            product_id,
            available_units - payload.units,
        ))
        .await
    {
        if let Ok(Value::Array(arr)) = update_product.take(0) {
            if !arr.is_empty() {
                HttpResponse::Ok().body("Product stock updated")
            } else {
                HttpResponse::NotFound().body("Product not found or insufficient units")
            }
        } else {
            HttpResponse::InternalServerError().body("Unexpected query response format")
        }
    } else {
        HttpResponse::InternalServerError().body("Server Error")
    }
}
Enter fullscreen mode Exit fullscreen mode
  • Stream stock change event handler
async fn stream_stock_changes(
    db: &Surreal<Client>,
    stock_producer: &FutureProducer,
) -> surrealdb::Result<()> {
    let kafka_topic = env::var("KAFKA_TOPIC").expect("KAFKA_TOPIC must be set.");

    if let Ok(mut stream) = db.select("inventory_stock_events").live().await {
        while let Some(result) = stream.next().await {
            let res: Result<Notification<StockEvent>> = result;
            let data = &res.unwrap().data;

            stock_producer
                .send(
                    FutureRecord::to(&kafka_topic)
                        .payload(&format!(
                            "Message {}",
                            &serde_json::to_string(data).unwrap()
                        ))
                        .key(&format!("Key {}", 1))
                        .headers(OwnedHeaders::new().insert(Header {
                            key: "header_key",
                            value: Some("header_value"),
                        })),
                    Duration::from_secs(0),
                )
                .await
                .expect("FAILED TO PRODUCE THE MESSAGE");
        }
    } else {
        println!("Failed to stream")
    }

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

In this handler function, we have leveraged the power of the live query select statement of SurrealDB. Using live query we can capture real-time data changes in the inventory_stock_events table without any fail or manual trigger.

Here’s how it operates:

Event flow

When the user makes a change to stock units of any valid product in the inventory table it gets logged by the inventory event log table and when the logging of the record change is successful the application will automatically trigger an event, and when that happens a message with specific changes will be produced and streamed by Kafka broker.

Using all the handler functions we can update the main function to this:

async fn main() -> std::io::Result<()> {
    dotenv().ok();

    // LOAD ENV VARS
    let surreal_url = env::var("SURREAL_URL").expect("SURREAL_URL must be set.");
    let surreal_ns = env::var("SURREAL_NS").expect("SURREAL_NS must be set.");
    let surreal_db = env::var("SURREAL_DB").expect("SURREAL_DB must be set.");
    let surreal_user = env::var("SURREAL_USER").expect("SURREAL_USER must be set.");
    let surreal_password = env::var("SURREAL_PW").expect("SURREAL_PW must be set.");
    let kafka_broker = env::var("KAFKA_BROKER").expect("KAFKA_BROKER must be set.");

    // INIT DATABASE
    let db = Surreal::new::<Ws>(&surreal_url)
        .await
        .expect("Failed to connect to the Surreal client");
    db.signin(Root {
        username: &surreal_user,
        password: &surreal_password,
    })
    .await
    .expect("Failed to authenticate");
    db.use_ns(surreal_ns)
        .use_db(surreal_db)
        .await
        .expect("Failed to access the Database");

    let db_clone = db.clone();

    // CREATE KAFKA PRODUCER
    let stock_producer = producer(&kafka_broker).await;

    // SPAWN A NEW THREAD TO EXECUTE KAFKA PRODUCER
    task::spawn(async move {
        stream_stock_changes(&db_clone, &stock_producer)
            .await
            .expect("failed to stream");
    });

    // EXECUTE SERVER
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(State { db: db.to_owned() }))
            .service(
                web::scope("/inventory")
                    .service(web::resource("").route(web::get().to(get_inventory_products)))
                    .service(
                        web::scope("/{product_id}")
                            .service(web::resource("").route(web::patch().to(update_stock))),
                    ),
            )
    })
    .bind(("127.0.0.1", 3000))?
    .run()
    .await
}

Enter fullscreen mode Exit fullscreen mode

Finally, your main.rs file should look like this here.

Shipping service

This service is not that functional it just consumes the messages by connecting to the Kafka broker. I have kept this application as simple as possible for this article but in real projects, you would be doing much more logical operations with the messages that this application receives.

Create a directory shipment_service , inside the directory and create a new Go binary:

go mod init shipment_service
Enter fullscreen mode Exit fullscreen mode

Install packages:

go get -u github.com/segmentio/kafka-go
go get -u github.com/joho/godotenv
Enter fullscreen mode Exit fullscreen mode

Create .env File

KAFKA_BROKER=localhost:9092
KAFKA_TOPIC=stock_update
KAFKA_GROUP_ID=shipment_service_group
POSTGRES_USER=yourusername
POSTGRES_PASSWORD=yourpassword
POSTGRES_DB=yourdatabase
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
Enter fullscreen mode Exit fullscreen mode

If you have changed any of these values then make sure to change them in your .env file.

Create a main.go file and add the following code:

package main

import (
    "context"
    "encoding/json"
    "log"
    "os"
    "os/signal"
    "strings"
    "syscall"

    "github.com/joho/godotenv"
    "github.com/segmentio/kafka-go"
)

type OriginalMessage struct {
    ID           IDWrapper `json:"id"`
    Time         string    `json:"time"`
    Action       string    `json:"action"`
    Product      IDWrapper `json:"product"`
    BeforeUpdate int       `json:"before_update"`
    AfterUpdate  int       `json:"after_update"`
}

type IDWrapper struct {
    ID IDStringWrapper `json:"id"`
}

type IDStringWrapper struct {
    String string `json:"String"`
}

type TransformedMessage struct {
    ID           string `json:"id"`
    ProductID    string `json:"productId"`
    BeforeUpdate int    `json:"before_update"`
    AfterUpdate  int    `json:"after_update"`
    Action       string `json:"action"`
    Time         string `json:"time"`
}

func main() {
    // Load environment variables
    err := godotenv.Load()
    if err != nil {
        log.Fatalf("Error loading .env file: %v", err)
    }

    // Kafka configuration
    kafkaBroker := os.Getenv("KAFKA_BROKER")
    kafkaTopic := os.Getenv("KAFKA_TOPIC")
    kafkaGroupID := os.Getenv("KAFKA_GROUP_ID")

    // Set up Kafka reader
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{kafkaBroker},
        GroupID:  kafkaGroupID,
        Topic:    kafkaTopic,
        MinBytes: 10e3, // 10KB
        MaxBytes: 10e6, // 10MB
    })

    // Create a channel to handle OS signals
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, os.Interrupt, syscall.SIGTERM)

    // Start consuming messages
    go func() {
        for {
            m, err := reader.FetchMessage(context.Background())
            if err != nil {
                log.Printf("Error fetching message: %v", err)
                continue
            }

            rawMessage := string(m.Value)

            jsonMessage := strings.TrimPrefix(rawMessage, "Message ")

            var originalMessage OriginalMessage
            if err := json.Unmarshal([]byte(jsonMessage), &originalMessage); err != nil {
                log.Printf("Error unmarshaling message: %v", err)
                continue
            }

            transformedMessage := transformMessage(originalMessage)
            transformedMessageJSON, err := json.Marshal(transformedMessage)
            if err != nil {
                log.Printf("Error marshaling transformed message: %v", err)
                continue
            }

            log.Printf("Incoming message: %s", transformedMessageJSON)

            // Commit the message to mark it as processed
            if err := reader.CommitMessages(context.Background(), m); err != nil {
                log.Printf("Error committing message: %v", err)
            }
        }
    }()

    // Wait for a termination signal
    <-sigchan
    log.Println("Shutting down...")
    reader.Close()
}

func transformMessage(orig OriginalMessage) TransformedMessage {
    return TransformedMessage{
        ID:           orig.ID.ID.String,
        ProductID:    orig.Product.ID.String,
        BeforeUpdate: orig.BeforeUpdate,
        AfterUpdate:  orig.AfterUpdate,
        Action:       orig.Action,
        Time:         orig.Time,
    }
}
Enter fullscreen mode Exit fullscreen mode

Running the services

Go inside the inventory_service directory and run the binary:

cargo run
Enter fullscreen mode Exit fullscreen mode

Open another terminal window, go inside the shipment_service directory and run the binary:

go run main.go
Enter fullscreen mode Exit fullscreen mode

Execution

Now on a new terminal window execute this CURL request:

curl --location --globoff --request PATCH 'http://localhost:3000/inventory/{product_id}' \
--header 'Content-Type: application/json' \
--data '{
    "units":2
}'
Enter fullscreen mode Exit fullscreen mode

You will notice that messages are getting printed on the terminal window where you are running your Go binary(shipment service). It is consuming the changes in stock in real time without any HTTP communication between the services.

Final Output

Output GIF

Conclusion

I hope you enjoyed reading this article and learned something new. Though I have tried to explain one of the use cases of Kafka and the features of SurrealDB, there is so much more that can be achieved and a highly efficient application can built using tools and technologies like this that cannot be covered in one single article.

Signing Off!!

Top comments (2)

Collapse
 
nigel447 profile image
nigel447

thanks for taking the time to write this polygot streaming setup

Collapse
 
sourabpramanik profile image
Sourab Pramanik

It's a pleasure. Glad that you liked it