DEV Community

Abhishek Gupta
Abhishek Gupta

Posted on • Edited on • Originally published at abhishek1987.Medium

Getting started with Kafka and Rust: Part 1

This is a two-part series to help you get started with Rust and Kafka. We will be using the rust-rdkafka crate which itself is based on librdkafka (C library).

In this post we will cover the Kafka Producer API.

Alt Text

Initial setup

Make sure you install a Kafka broker - a local setup should suffice. Of course you will need to have Rust installed as well - you will need version 1.45 or above

Before you begin, clone the GitHub repo:

git clone https://github.com/abhirockzz/rust-kafka-101
cd part1
Enter fullscreen mode Exit fullscreen mode

Check the Cargo.toml file:

...
[dependencies]
rdkafka = { version = "0.25", features = ["cmake-build","ssl"] }
...
Enter fullscreen mode Exit fullscreen mode

Note on the cmake-build feature

rust-rdkafka provides a couple of ways to resolve the librdkafka dependency. I chose static linking, wherein librdkafka was compiled. You could opt for dynamic linking to refer to a locally installed version though.

For more, please refer to this link

Ok, let's start off with the basics.

Simple producer

Here is a simple producer based on BaseProducer:

    let producer: BaseProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("security.protocol", "SASL_SSL")
        .set("sasl.mechanisms", "PLAIN")
        .set("sasl.username", "<update>")
        .set("sasl.password", "<update>")
        .create()
        .expect("invalid producer config");
Enter fullscreen mode Exit fullscreen mode

The send method to start producing messages - it's done in tight loop with a thread::sleep in between (not something you would do in production) to make it easier to track/follow the results. The key, value (payload) and the destination Kafka topic is represented in the form of a BaseRecord

    for i in 1..100 {
        println!("sending message");
        producer
            .send(
                BaseRecord::to("rust")
                    .key(&format!("key-{}", i))
                    .payload(&format!("value-{}", i)),
            )
            .expect("failed to send message");
        thread::sleep(Duration::from_secs(3));
    }
Enter fullscreen mode Exit fullscreen mode

You can check the entire code in the file src/1_producer_simple.rs

To test if the producer is working ...

Run the program:

  • simply rename the file src/1_producer_simple.rs to main.rs
  • execute cargo run

You should see this output:

sending message
sending message
sending message
...
Enter fullscreen mode Exit fullscreen mode

What's going on? To figure it out - connect to your Kafka topic (I have used rust as the name of the Kafka topic in the above example) using the Kafka CLI consumer (or any other consumer client e.g. kafkacat). You should see the messages flowing in.

For example:

&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
Enter fullscreen mode Exit fullscreen mode

Producer callback

We are flying blind right now! Unless we explicitly create a consumer to look at our messages, we have no clue whether they are being sent to Kafka. Let's fix that by implementing a ProducerContext (trait) to hook into the produce event - it's like a callback.

Start by creating a struct and an empty implementation for the ClientContext trait (this is mandatory).

struct ProducerCallbackLogger;

impl ClientContext for ProducerCallbackLogger {}
Enter fullscreen mode Exit fullscreen mode

Now comes the main part where we implement the delivery function in the ProducerContext trait.

impl ProducerContext for ProduceCallbackLogger {
    type DeliveryOpaque = ();

    fn delivery(
        &self,
        delivery_result: &rdkafka::producer::DeliveryResult<'_>,
        _delivery_opaque: Self::DeliveryOpaque,
    ) {
        let dr = delivery_result.as_ref();
        match dr {
            Ok(msg) => {
                let key: &str = msg.key_view().unwrap().unwrap();
                println!(
                    "produced message with key {} in offset {} of partition {}",
                    key,
                    msg.offset(),
                    msg.partition()
                )
            }
            Err(producer_err) => {
                let key: &str = producer_err.1.key_view().unwrap().unwrap();

                println!(
                    "failed to produce message with key {} - {}",
                    key, producer_err.0,
                )
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We match against the DeliveryResult (which is a Result after all) to account for success (Ok) and failure (Err) scenarios. All we do is simply log the message in both cases, since this is just an example. You could do pretty much anything you wanted to here (don't go crazy though!)

We've ignored DeliveryOpaque which is an associated type of the ProducerContext trait

We need to make sure that we plug in our ProducerContext implementation. We do this by using the create_with_context method (instead of create) and make sure by providing the correct type for BaseProducer as well.

let producer: BaseProducer<ProduceCallbackLogger> = ClientConfig::new().set(....)
...
.create_with_context(ProduceCallbackLogger {})
...
Enter fullscreen mode Exit fullscreen mode

How does the "callback get called"?

Ok, we have the implementation, but we need a way to trigger it! One of the ways is to call flush on the producer. So, we could write our producer as such:

  • add producer.flush(Duration::from_secs(3));, and
  • comment the sleep (just for now)
        producer
            .send(
                BaseRecord::to("rust")
                    .key(&format!("key-{}", i))
                    .payload(&format!("value-{}", i)),
            )
            .expect("failed to send message");

        producer.flush(Duration::from_secs(3));
        println!("flushed message");
        //thread::sleep(Duration::from_secs(3));
Enter fullscreen mode Exit fullscreen mode

Hold on, we can do better!

The send method is non-blocking (be default) but by calling flush after each send, we have now converted this into a synchronous invocation - not recommended from a performance perspective.

We can improve the situation by using a ThreadedProducer. It takes care of invoking the poll method in a background thread to ensure that the delivery callback notifications are delivered. Doing this is very simple - just change the type from BaseProducer to ThreadedProducer!

# before: BaseProducer<ProduceCallbackLogger>
# after: ThreadedProducer<ProduceCallbackLogger>
Enter fullscreen mode Exit fullscreen mode

Also, we don't need the call to flush anymore.

...
//producer.flush(Duration::from_secs(3));
//println!("flushed message");
thread::sleep(Duration::from_secs(3));
...
Enter fullscreen mode Exit fullscreen mode

The code is available in src/2_threaded_producer.rs

Run the program again

  • Rename the file src/2_threaded_producer.rs to main.rs and
  • execute cargo run

Output:

sending message
sending message
produced message with key key-1 in offset 6 of partition 2
produced message with key key-2 in offset 3 of partition 0
sending message
produced message with key key-3 in offset 7 of partition 2
Enter fullscreen mode Exit fullscreen mode

As expected, you should be able to see the producer event callback, denoting that the messages were indeed sent to the Kafka topic. Of course, you can connect to the topic directly and double-check, just like before:

&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
Enter fullscreen mode Exit fullscreen mode

To try a failure scenario, try using an incorrect topic name and notice how the Err variant of the delivery implementation gets invoked.

Sending JSON messages

So far, we were just sending Strings as key and values. JSON is a commonly used message format, let's see how to use that.

Assume we want to send User info which will be represented using this struct:

struct User {
    id: i32,
    email: String,
}
Enter fullscreen mode Exit fullscreen mode

We can then use serde_json library to serialize this as JSON. All we need is to use the custom derives in serde - Deserialize and Serialize

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    id: i32,
    email: String,
}
Enter fullscreen mode Exit fullscreen mode

Change the producer loop:

  • Create a User instance
  • Serialize it to a JSON string using to_string_pretty
  • Include that in the payload
...

let user_json = serde_json::to_string_pretty(&user).expect("json serialization failed");

        producer
            .send(
                BaseRecord::to("rust")
                    .key(&format!("user-{}", i))
                    .payload(&user_json),
            )
            .expect("failed to send message");
...
Enter fullscreen mode Exit fullscreen mode

you can also use to_vec (instead of to_string()) to convert it into a Vec of bytes (Vec<u8>)

To run the program...

  • Rename the file src/3_JSON_payload.rs to main.rs, and
  • execute cargo run

Consume from the topic:

&KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic rust --from-beginning
Enter fullscreen mode Exit fullscreen mode

You should see messages with a String key (e.g. user-34) and JSON value:

{
  "id": 34,
  "email": "user-34@foobar.com"
}
Enter fullscreen mode Exit fullscreen mode

Is there a better way?

Yes! If you are used to the declarative serialization/de-serialization approach in the Kafka Java client (and probably others as well), you may not like this "explicit" approach. Just to put things in perspective, this is how you'd do it in Java:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");

....

ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, key, user);
producer.send(record);
Enter fullscreen mode Exit fullscreen mode

Notice that you simply configure the Producer to use KafkaJsonSchemaSerializer and the User class is serialized to JSON

rust-rdkafka provides something similar with the ToBytes trait. Here is what it looks like:

pub trait ToBytes {
    /// Converts the provided data to bytes.
    fn to_bytes(&self) -> &[u8];
}
Enter fullscreen mode Exit fullscreen mode

Self-explanatory, right? There are existing implementations for String, Vec<u8> etc. So you can use these types as key or value without any additional work - this is exactly what we just did. But the problem is the way we did it was "explicit" i.e. we converted the User struct into a JSON String and passed it on.

What if we could implement ToBytes for User?

impl ToBytes for User {
    fn to_bytes(&self) -> &[u8] {
        let b = serde_json::to_vec_pretty(&self).expect("json serialization failed");
        b.as_slice()
    }
}
Enter fullscreen mode Exit fullscreen mode

You will see a compiler error:

cannot return value referencing local variable `b`
returns a value referencing data owned by the current function
Enter fullscreen mode Exit fullscreen mode

For additional background, please refer to this GitHub issue. I would happy to see an example other which can work with ToBytes - please drop in a note if you've inputs on this!

TL;DR is that it's best to stick to the "explicit" way of doing things unless you have a ToBytes implementation that "does not involve an allocation and cannot fail".

Wrap up

That's it for the first part. Part 2 will cover topics around Kafka consumer.

Top comments (1)

Collapse
 
niilz profile image
niilz • Edited

An idea about the impl ToBytes for User problem:

The issue is that in to_bytes we allocate a Vec<u8> on the heap and then want to return a pointer to the underlying byte-array. But the Vec goes out of scope when to_bytes ends. This cleans up the Vec and the pointer would be invalid if would be allowed to use it after the method returns.
Save but sad.

One option would be to build a wrapper around the User that contains an inner: User and an extra field bytes, that contains a Vec of u8.

Or, which is what I will show here, we could store the Vec directly on the User. While the field is excluded from serialization. Like so:

#[derive(Serialize, Deserialize, Debug)]
struct User {
    id: u32,
    email: String,
    #[serde(skip)]
    bytes: Option<Vec<u8>>,
}
Enter fullscreen mode Exit fullscreen mode

Then there can be an extra method that initializes the bytes:

impl User {
    fn calc_bytes(&mut self) {
        let serde_vec = serde_json::to_vec_pretty(self)
            .expect("json serialization failed");
        self.bytes = Some(serde_vec);
    }
}
Enter fullscreen mode Exit fullscreen mode

Which means, that as long as we initialize the bytes field before we call to_bytes the compiler allows us to return a pointer to the underlying bytes-array. Because the compiler knows that this pointer is valid as long as the Vec is valid, which in turn will not be dropped until the User instance goes our of scope.
The impl of ToBytes could look like this:

impl ToBytes for User {
    fn to_bytes(&self) -> &[u8] {
        match self.bytes {
            Some(ref bytes) => bytes,
            None => &[],
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The full example can be seen at this playground-link:
play.rust-lang.org/?version=stable...