Thought about using Kafka? This blog post will walk you through the
steps necessary to bootstrap your Rust producer and consumer application
with a batteries-included local dev setup featuring VSCode Dev
Containers, Kafka, and Zookeeper.
In this tutorial, we will build a simple Rust application consisting of
a producer and consumer working with search data from Hacker News.
Step 0: Prerequisites
- Rust 1.66 or later
- VSCode
- Docker
Step 1: Setup a new Rust Project
We will leverage Cargo's workspace
feature
in this example. A workspace contains more than one application or
library but can be compiled from the top-level directory.
Let's get started. Let's create our project directory that will contain
all source code.
$ mkdir kafka_hn_processing
$ cd kafka_hn_processing
Once we're in the new directory, let's set up our workspace:
$ cat > Cargo.toml
[workspace]
members = [
"producer"
]
For now, this Cargo.toml
only references a producer application which
we will create in the next step.
$ cargo new producer
$ cd producer
To make this application work, we need to install a few dependencies. We
use cargo add
for this (Note: cargo add
requires at least Rust
1.62). cargo add
downloads the dependency and adds it to the project's
Cargo.toml
.
$ cargo add reqwest --features=json -p producer
$ cargo add tokio -p producer --features=full
$ cargo add serde --features=derive -p producer
$ cargo add serde_json -p producer
$ cargo add urlencoding -p producer
We're installing:
-
reqwest
- To perform HTTP Requests -
tokio
- Forasync
support -
serde
- Serialization/Deserialization -
serde_json
To serialize and deserialize JSON -
urlencoding
To url-encode parameters
Step 2: Setup a Dev Container
Before we start writing code, we need to set up our Docker environment.
In the project's root directory, create a docker-compose.yml
:
---
version: '3.8'
services:
rust-log-processing:
image: mcr.microsoft.com/devcontainers/rust:0-1-bullseye
volumes:
- ..:/workspaces:cached
cap_add:
- SYS_PTRACE
security_opt:
- seccomp:unconfined
command: /bin/sh -c "while sleep 1000; do :; done"
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
(Find a detailed explanation of this docker-compose.yml
file
here.)
We won't run Rust code directly on the local machine but in a Docker
container instead. VSCode provides us with the
devcontainer
feature, which allows us to run our workspace code in a Docker
container.
To get started with a Dev Container, we need a .devcontainer
folder
for our VSCode settings. Create .devcontainer
on the root level.
Folder structure:
$ mkdir .devcontainer
$ ls -l
drwxr-xr-x - user 7 Feb 13:45 .devcontainer
drwxr-xr-x - user 7 Feb 13:51 .git
.rw-r--r-- 8 user 7 Feb 10:52 .gitignore
.rw-r--r-- 9.6k user 7 Feb 13:53 Cargo.lock
.rw-r--r-- 199 user 7 Feb 13:53 Cargo.toml
.rw-r--r-- 1.2k user 7 Feb 13:46 docker-compose.yml
drwxr-xr-x - user 7 Feb 10:52 src
drwxr-xr-x@ - user 7 Feb 10:52 target
Then, inside .devcontainer/
, create the
.devcontainer/devcontainer.json
config file:
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/rust
{
"name": "Rust",
"service": "rust-log-processing",
"dockerComposeFile": "../docker-compose.yml",
"features": {
"ghcr.io/devcontainers/features/rust:1": {}
},
"workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}",
"shutdownAction": "stopCompose"
}
Our configuration is more advanced because we also leverage Docker
Compose. By default, VSCode runs everything in a single container. Since
we need Kafka to test our code, we let it know to take our
docker-compose.yml
into account.
With these files in place, we're ready to start writing code. Open the
project in VSCode:
$ code .
Important: Make sure you open the project In Container. VSCode
usually prompts you for this, but if not, click the little icon in the
bottom left corner and select Reopen in Container in the command
menu.
Step 3: Add HN Search Code
To kick everything off, we add some code to allow us to communicate with
the Hacker News API. We're choosing Hacker News because its API does not
require authentication and offers a vast amount of data we can process.
We will cover this code sparingly and add it to a separate module.
Create a new file producer/src/hn.rs
:
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct HackerNewsResponse {
pub hits: Vec<HNSearchResult>,
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct HNSearchResult {
pub author: String,
#[serde(alias = "objectID")]
pub id: String,
pub title: String,
url: Option<String>,
pub story_text: Option<String>,
#[serde(alias = "_tags")]
pub tags: Option<Vec<String>>,
pub points: u32,
}
pub async fn fetch_hn_stories(search_term: String, search_result_limit: u32) -> Result<HackerNewsResponse, reqwest::Error> {
let url_encoded_search_term = urlencoding::encode(&search_term);
let url_str= format!("https://hn.algolia.com/api/v1/search_by_date?query={}&tags=story&hitsPerPage={}", url_encoded_search_term, search_result_limit);
let client = reqwest::Client::new();
let request = client.get(url_str)
.build().unwrap();
let json_response = client.execute(request)
.await?
.json::<HackerNewsResponse>()
.await?;
Ok(json_response)
}
We'll use this code to fetch stories via the Search API from Hacker News
to generate content for Kafka. From our main
function, we'll call
fetch_hn_stories
with a search term and a limit, indicating how many
results we want at maximum.
Step 4: Add Producer Code
In main.rs
, add a new function:
use kafka::producer::{Producer, Record, RequiredAcks};
use std::time::Duration;
use crate::hn::HNSearchResult;
mod hn;
fn send_to_kafka(host: &str, topic: &str, payload: Vec<HNSearchResult>) {
let mut producer = Producer::from_hosts(vec![host.to_owned()])
.with_ack_timeout(Duration::from_secs(1))
.with_required_acks(RequiredAcks::One)
.create()
.unwrap();
for search_result in payload {
let buffer = serde_json::to_string(&search_result).unwrap();
producer
.send(&Record::from_value(topic, buffer.as_bytes()))
.unwrap();
}
}
fn main() {
//...
}
send_to_kafka
contains the minimum setup needed to talk to a Kafka
broker. We configure a timeout (.with_ack_timeout
) and how many
Ack
's we need at least to move on (.with_required_acks
). Since our
Dev Setup only uses a single broker, we're setting this to 1
(this
setting might differ in production depending on your use case and the
number of available brokers).
Within topics, Kafka stores payloads as bytes. Therefore we need to
serialize it to a byte array (buffer.as_bytes
).
With our send_to_kafka
function in place, let's call in main()
:
//producer/src/main.rs
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stories = hn::fetch_hn_stories("Ruby".into(), 100).await?;
println!("Fetched {} stories", stories.hits.len());
send_to_kafka("broker:9092", "hnstories", stories.hits);
Ok(())
}
Also notice how we added #[tokio::main]
and
async fn main() -> Result<(), Box<dyn std::error::Error>>
. This
addition is necessary to allow async
code to run.
We fetch a hundred stories with an arbitrary search term (such as
"Ruby") and then send it to Kafka.
Step 5: Create Kafka Topic
Before we can run our code, we need to create a Kafka topic. By default,
a Kafka installation ships with command-line utilities to help with such
maintenance tasks. In a terminal on your local machine, run the
following command:
$ docker exec broker \
kafka-topics --bootstrap-server broker:9092 \
--create \
--topic hnstories
Created topic hnstories.
Step 6: The Consumer
With the producer in place, let's create the consumer, reading Hacker
News search results from the topic. Before we run cargo new
, open
Cargo.toml
in the project root directory to add a new project:
[workspace]
members = [
"producer",
"consumer"
]
Add "consumer"
to the member list. Save and close the file. In the dev
container, run the following command to create a new project:
$ cargo new consumer
Created binary (application) `consumer` package
Add the following dependencies for the consumer
:
$ cargo add serde --features=derive -p consumer
$ cargo add serde_json -p consumer
$ cargo add kafka -p consumer
In main.rs
, add the following code:
use kafka::consumer::{Consumer, FetchOffset};
fn main() {
let mut consumer =
Consumer::from_hosts(vec!("broker:9092".to_owned()))
.with_topic("hnstories".to_owned())
.with_fallback_offset(FetchOffset::Earliest)
.create()
.unwrap();
loop {
for ms in consumer.poll().unwrap().iter() {
for m in ms.messages() {
let str = String::from_utf8_lossy(m.value);
println!("{:?}",str);
}
let _ = consumer.consume_messageset(ms);
}
consumer.commit_consumed().unwrap();
}
}
Let's go through this code step by step.
First of all, we create a new consumer:
let mut consumer =
Consumer::from_hosts(vec!("broker:9092".to_owned()))
.with_topic("hnstories".to_owned())
.with_fallback_offset(FetchOffset::Earliest)
.create()
.unwrap();
We connect to a single broker (broker:9092
). broker
in this case, is
the domain name managed by Docker compose. We're listening to a single
topic (hnstories
) and configuring a fallback offset.
The fallback offset allows the consumer to start reading messages from
the beginning of the topic. If we omit this configuration, it will not
consider previous events once up and running.
It's time to run the code. In VSCode, open the terminal to run commands
within the context of the dev container.
Open two terminal sessions within VSCode. Then, run the code:
$ cargo run -p producer
And in the second session, run:
$ cargo run -p consumer
This command starts the consumer application. The consumer polls the
topic for content and prints out messages whenever it receives new ones.
Final Thoughts
We only need a few steps to start building applications with Kafka. To
get us kicked off, we rely on a Docker Compose configuration that stands
up a single instance of Kafka and Zookeeper. With that out of the way,
we need a producer and consumer. A producer writes new data on a
topic while the consumer reads it.
To keep everything self-contained, we use VSCode's Dev Containers.
Once you are ready to move your application into production, make sure to check out Calisti.
Calisti allows you to stand up a production-ready Kafka cluster in your Kubernetes cluster in minutes. Additionally, it supports you in operating Kafka, even if you're still working on becoming a Kafka
expert.
Find the source code here.
Top comments (1)
Hi Jan! Thanks for great write up!
But I have one question. Every time I'm start consumer its receive all messages sended by producer (already received last time). The producer sended only once and was shut down.
I was thinking that
consumer.commit_consumed().unwrap();
must tell to broker to commit offsets, but seems it not happened. Did you expect this behavior? Or I miss some broker settings?