This Rust binding for the librdkafka C library allows you to harness Kafka's powerful messaging capabilities while benefiting from Rust's safety and performance.
First, you'll want to add the rdkafka dependency to your Cargo.toml file:
[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "naive-runtime", "tracing", "tokio","zstd"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
You’ll also need to have librdkafka installed on your system. If you're on macOS, you can easily install it using Homebrew:
brew install librdkafka
For Ubuntu or other Debian-based systems, use:
sudo apt-get install librdkafka-dev
*Setting Up the Kafka Client
*
Here’s a sample implementation:
use rdkafka::{
message::OwnedHeaders,
producer::{BaseRecord, Producer, ProducerContext, ThreadedProducer},
ClientConfig, ClientContext, Message,
};
use std::{env, time::Duration};
/// Kafka Client
pub struct Client {}
impl Client {
/// Publish on the Kafka client
pub async fn publish(
headers: OwnedHeaders,
message: &str,
key: &str,
topic: &str,
) -> anyhow::Result<()> {
tracing::debug!(event="publish-start", key=?key, topic=?topic);
let brokers = env::var("KAFKA_BROKERS").unwrap_or_else(|_| "localhost:9092".into());
let producer: ThreadedProducer<ProducerCallback> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("acks", "all")
.set("enable.idempotence", "true")
.set("compression.type", "zstd") // Enable zstd compression
.set("queue.buffering.max.ms", "0")
.set("retries", "10")
.set("request.timeout.ms", "10000")
.create_with_context(ProducerCallback {})
.expect("unable to create kafka producer");
// Send the message
let _ = producer.send(
BaseRecord::to(topic)
.payload(message)
.key(key)
.headers(headers),
);
// Flush the producer to ensure the message is sent
let _ = producer.flush(Duration::from_secs(5));
Ok(())
}
}
Implementing Retry Logic
Network issues or broker unavailability can cause message publishing to fail. To handle this, we can add a function for publishing messages with an exponential backoff strategy:
/// Publish on Kafka with an exponential backoff
pub(crate) async fn publish_with_retry(
headers: OwnedHeaders,
message: &str,
key: &str,
topic: &str,
interval: u64,
) -> anyhow::Result<()> {
const MAX_ATTEMPTS: u64 = 10;
let mut publish_attempts = 1;
while let Err(err) = Client::publish(headers.clone(), message, key, topic).await {
tracing::error!(event="publish", msg="failed", err=?err);
if publish_attempts > MAX_ATTEMPTS {
tracing::error!(event="publish", msg="exceeded retries", err=?err);
return Err(KafkaError::RetryExceeded.into());
} else {
publish_attempts += 1;
tracing::error!(event="publish", msg="retrying", err=?err, retry_count=publish_attempts);
// Note: exponential backoff happens here
tokio::time::sleep(Duration::from_secs(interval + (2 * publish_attempts))).await;
}
}
Ok(())
}
This function retries sending the message, waiting longer between attempts, and gives up after a set number of retries.
Error Handling
For better error management during publishing, defining a custom error type can be useful:
/// Kafka Error
#[derive(Debug, thiserror::Error)]
pub enum KafkaError {
#[error("Retry exceeded on trying to publish message")]
RetryExceeded,
}
Producer Callback for Delivery Reports
To handle delivery reports for messages sent, you can implement a ProducerCallback struct. This allows you to manage responses to successful deliveries and failures:
/// Producer Callback for Kafka
struct ProducerCallback {}
impl ClientContext for ProducerCallback {}
impl ProducerContext for ProducerCallback {
type DeliveryOpaque = ();
/// Handle delivery report from the producer
fn delivery(
&self,
delivery_result: &rdkafka::producer::DeliveryResult<'_>,
_delivery_opaque: Self::DeliveryOpaque,
) {
let dr = delivery_result.as_ref();
match dr {
Ok(msg) => {
let key: &str = msg.key_view().expect("unable to get key view").unwrap();
tracing::debug!(
event = "publish",
key = key,
offset = msg.offset(),
partition = msg.partition()
);
}
Err(err) => {
tracing::warn!(event="publish", state="fail",
content=?err);
}
};
}
}
This callback provides insights into the delivery status of your messages, allowing you to react appropriately to successes and failures.
With this setup, you can effectively publish messages to Kafka using Rust’s rdkafka library while utilizing zstd compression.
Top comments (0)