DEV Community

Cover image for Trace Through a Kafka Cluster with Rust and OpenTelemetry
Jan Schulte for Outshift By Cisco

Posted on

Trace Through a Kafka Cluster with Rust and OpenTelemetry

OpenTelemetry is a fantastic approach to observability, providing additional insight into your applications. While generating spans within a single application is straightforward, it gets more challenging once you add more applications and services, jettisoning you into the realm of distributed tracing.

Distributed tracing includes use cases such as tracking an HTTP request from a client to a server. OpenTelemetry provides tools and techniques to build trace graphs spanning across several systems. While it’s a great solution for applications, this approach can also work well for services like Apache Kafka®.

In this blog post, I’ll explore how to "trace through" a Kafka cluster with your Rust application by leveraging standard OpenTelemetry tools.

The Scenario

In our hypothetical scenario, we have a producer, which is sending messages to a Kafka cluster, with a consumer on the other end. Both applications are instrumented with OpenTelemetry, although spans are shown separately in a backend, such as Jaeger. With OpenTelemetry instrumentation in place, we already have great insight into production behavior. But we lack context for transactions that include Apache Kafka.

The goal is to use OpenTelemetry to connect these disparate spans so we can see a trace graph in Jaeger.

Jaeger Tracing Details

How Do We "Connect" Spans?

Connecting OpenTelemetry spans is a common use case. It is so common, in fact, that OpenTelemetry comes with built-in mechanisms to help us implement distributed tracing.

Trace context propagation in a HTTP client/server architecture

A common scenario in distributed tracing is connecting HTTP client and server spans. In fact, this scenario offers many similarities to what we want to achieve with Kafka. The objective is clear: When we look at a trace in Jaeger, we want to see the full transaction: the span from the HTTP client sending the HTTP request and the server span handling the request.

Here’s a real-world use case: an e-commerce application consisting of various microservices. The user adds items to the shopping cart and is ready to check out. The checkout process requires several services to work together (payment, shopping cart, shipping, and so on). All services involved emit OpenTelemetry spans. These spans only provide value if we can see the full transaction (as in, "service A called B called C") to detect irregularities.

An OpenTelemetry backend (such as Jaeger) can assemble this graph for us if we provide the necessary information. In practice, the HTTP client includes an additional identifier in a custom header—the so-called trace-parent—in its request to the server. The server receiving the request creates spans on its own. If a trace-parent is present, the server includes this ID in all spans related to this specific request. When we look at a trace in Jaeger, we see the client span and all related server spans. Most OpenTelemetry libraries already have mechanisms built in to automate this process. Depending on your programming language and used libraries/frameworks, after some initial configuration, this process might already work out of the box.

Trace context propagation with Kafka

Implementing distributed tracing across our own applications is straightforward, compared to third-party services, such as databases, message queues, or event brokers. Our applications frequently interact with these services. But all metadata we usually pass around (i.e. in HTTP headers) won't get propagated (at least not out of the box). Luckily, with Kafka, there are ways to preserve trace context.

Let's explore the options:

Option 1: We could embed the trace-parent attribute as part of the payload the producer sends to Kafka. In a greenfield project, this could work out. But it also requires the consumer to understand and implement this format. If we already have existing applications producing and consuming messages, this approach won't be feasible, because we would have to touch every single client to implement this change.

Option 2: Leverage Kafka headers. Similar to HTTP requests, Kafka messages can include headers, which contain additional metadata. We can leverage this metadata to add our trace information. The advantage is that we won’t need to change message formats, and clients that don't use OpenTelemetry won’t be affected by this change.

How OpenTelemetry Rust Connects Spans

Let's look at some code. We have different ways to create spans in Rust. In this article, we focus on creating spans manually, not covering any abstractions such as tracing.

In a rudimentary scenario, we create a span as the following:

    let mut span = global::tracer("producer").start("send_to_kafka");
    span.set_attribute(KeyValue { key: Key::new("title"), value: opentelemetry::Value::String(StringValue::from(hn_search_result.title.to_string())) });
    span.end();

This snippet creates a single span and configures a `title` attribute. We can even create nested spans:

    tracer.in_span("operation", |cx| {
        let span = cx.span();
        span.set_attribute(ANOTHER_KEY.string("yes"));
        tracer.in_span("Sub operation...", |cx| {
            let span = cx.span();
            span.add_event("Sub span event", vec![]);
        });
    });
Enter fullscreen mode Exit fullscreen mode

These spans are automatically connected through the cx or context. This all works well—as long we only consider a single application.

Parent and Child span
Screenshot from: https://github.com/open-telemetry/opentelemetry-rust/tree/main/examples/basic

If we want to connect spans across application boundaries, we need to inject context. OpenTelemetry provides us with a built-in propagation mechanism. As described earlier, in HTTP scenarios, this approach relies on custom headers.

Implementing context propagation in Rust, our code looks like this:

    let client = Client::new();
    let span = global::tracer("example/client").start("say hello");
    let cx = Context::current_with_span(span);

    let mut req = hyper::Request::builder().uri("http://127.0.0.1:3000");
    //(1)
    global::get_text_map_propagator(|propagator| {
       //(2)
       propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
    });
    let res = client.request(req.body(Body::from("Hallo!"))?).await?;
Enter fullscreen mode Exit fullscreen mode

(Source: https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/http/src/client.rs)

In this example, we're injecting context into a HTTP request.

In step (1), we obtain the propagator (which has been configured at application start) and then use it to inject context (2). In this case, the Rust OpenTelemetry library already has a HeaderInjector that knows how to inject HTTP Headers.

Now, let's explore how to extract the context and use it to create spans on the receiving end:

    //(1)
    let parent_cx = global::get_text_map_propagator(|propagator| {
       propagator.extract(&HeaderExtractor(req.headers()))
    });

    let mut span = global::tracer("example/server").start_with_context("hello", &parent_cx);
    span.add_event("handling this...", Vec::new());

    Ok(Response::new("Hello, World!".into()))
Enter fullscreen mode Exit fullscreen mode

(Source: https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/http/src/server.rs)

As we've seen in the previous example, we start by retrieving the global propagator. In this case, we extract our context from the provided HTTP headers. the parent_cx variable now holds all information we need to create spans (1).

In contrast to the previous code example, we now use the start_with_context("hello", &parent_cx) method to create a new span. This method creates the span and injects the trace parent we've received from the client.

Building Kafka Producer and Consumers

We can use the almost exact same approach for Kafka producers and consumers.

Kafka producer

For this example, we're using rdkafka to build producers and consumers, because it allows us to specify custom headers for each record.

     async fn send_to_kafka(host: &str, topic: &str, payload: Vec<HNSearchResult>) {
      let producer: &FutureProducer = &ClientConfig::new()
          .set("bootstrap.servers", host)
          .set("message.timeout.ms", "5000")
          .create()
          .expect("Producer creation error");
      for hn_search_result in payload.iter() {
          let mut span = global::tracer("producer").start("send_to_kafka");
          span.set_attribute(KeyValue { key: Key::new("title"), value: opentelemetry::Value::String(StringValue::from(hn_search_result.title.to_string())) });
          let serialized = serde_json::to_string(&hn_search_result).unwrap();

          let mut headers = OwnedHeaders::new().insert(Header { key: "key", value: Some("value") });

          let delivery_status = producer
              .send(
                  FutureRecord::to(&topic.to_string())
                          .key(&format!("Key {}", -1))
                      .headers(headers)
                      .payload(&serialized),
                  Duration::from_secs(0)
              )
              .await;
          println!("Delivery Status: {:?}", delivery_status);
      }
    }
Enter fullscreen mode Exit fullscreen mode

In this code example, you see a standard Kafka producer working through a list of payloads and sending them to the configured topic. For each processed payload, we also create a new span.

You might notice how we're already specifying headers for each payload we send. For now, these headers only contain an example entry. In the next step, we want to inject our trace context, as well.

    async fn send_to_kafka(host: &str, topic: &str, payload: Vec<HNSearchResult>) {
        let producer: &FutureProducer = &ClientConfig::new()
            .set("bootstrap.servers", host)
            .set("message.timeout.ms", "5000")
            .create()
            .expect("Producer creation error");


        for hn_search_result in payload.iter() {
            let mut span = global::tracer("producer").start("send_to_kafka");
            span.set_attribute(KeyValue { key: Key::new("title"), value: opentelemetry::Value::String(StringValue::from(hn_search_result.title.to_string())) });
            let context = Context::current_with_span(span);
            let serialized = serde_json::to_string(&hn_search_result).unwrap();

            let mut headers = OwnedHeaders::new().insert(Header { key: "key", value: Some("value") });
            //(1)
            global::get_text_map_propagator(|propagator| {
                //(2)
                propagator.inject_context(&context, &mut shared::HeaderInjector(&mut headers))
            });

            let delivery_status = producer
                .send(
                    FutureRecord::to(&topic.to_string())
                            .key(&format!("Key {}", -1))
                        .headers(headers)
                        .payload(&serialized),
                    Duration::from_secs(0)
                )
                .await;
            println!("Delivery Status: {:?}", delivery_status);
        }
    }
Enter fullscreen mode Exit fullscreen mode

Just as in the HTTP example before, we use global::get_text_map_propagator to retrieve a propagator. In (2), we inject our trace context into our Kafka headers.

But where does HeaderInjector come from? This struct is a custom implementation. propagator.inject_context accepts any struct that implements Injector.

Let's implement it:

    use opentelemetry_api::propagation::{Injector};
    use rdkafka::message::{Headers, OwnedHeaders};

    //(1)
    pub struct HeaderInjector<'a>(pub &'a mut OwnedHeaders);

    impl <'a>Injector for HeaderInjector<'a> {
        fn set(&mut self, key: &str, value: String) {
            let mut new = OwnedHeaders::new().insert(rdkafka::message::Header {
                key,
                value: Some(&value),
            });

            for header in self.0.iter() {
                let s = String::from_utf8(header.value.unwrap().to_vec()).unwrap();
                new = new.insert(rdkafka::message::Header { key: header.key, value: Some(&s) });
            }

            self.0.clone_from(&new);
        }
    }
Enter fullscreen mode Exit fullscreen mode

We define our own new struct, HeaderInjector (1). This struct contains a reference to our Kafka header object. The Injector trait requires us to implement set to insert a new key-value pair.

How does this look on the receiver side?

Kafka consumer

We start with the consumer implementation, without context propagation:

    async fn consume_topic(broker: &str, topic: &str) {
        let context = CustomContext;

        let consumer: LoggingConsumer = ClientConfig::new()
            .set("group.id", "-1")
            .set("bootstrap.servers", broker)
            .set_log_level(RDKafkaLogLevel::Debug)
            .create_with_context(context)
            .expect("failed to create consumer");

        consumer
            .subscribe(&vec![topic])
            .expect("Can't subscribe to topics");

        println!("Subscribed");
        loop {
            match consumer.recv().await {
                Err(e) => eprintln!("Kafka error: {}", e),
                Ok(m) => {
                    let payload = match m.payload_view::<str>() {
                        None => "",
                        Some(Ok(s)) => s,
                        Some(Err(e)) => {
                            eprintln!("Error while deserializing message payload: {:?}", e);
                            ""
                        }
                    };
                    println!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
                          m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
                    //(1)
                    if let Some(headers) = m.headers() {
                        for header in headers.iter() {
                            if let Some(val) = header.value {
                                println!(
                                    "  Header {:#?}: {:?}",
                                    header.key,
                                    String::from_utf8(val.to_vec())
                                );
                            }
                        }
                    }
                    consumer.commit_message(&m, CommitMode::Async).unwrap();
                }
            };
        }
    }
Enter fullscreen mode Exit fullscreen mode

This implementation is mostly based on the example code from rdkafka. We have a consumer, which is listening to a single topic and working off new incoming messages. In (1), you see how we extract headers from the incoming message. For now, we only print out what we find.

With the consumer code in place, we can now extract the trace context (I omitted some code for brevity in the upcoming example).

    //...
    println!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
          m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
    if let Some(headers) = m.headers() {
        for header in headers.iter() {
            if let Some(val) = header.value {
                println!(
                    "  Header {:#?}: {:?}",
                    header.key,
                    String::from_utf8(val.to_vec())
                );
            }
        }
        //(1)
        let context = global::get_text_map_propagator(|propagator| {
            propagator.extract(&HeaderExtractor(&headers))
        });

        //(2)
        let mut span =
            global::tracer("consumer").start_with_context("consume_payload", &context);
        span.set_attribute(KeyValue { key: Key::new("payload"), value: opentelemetry::Value::String(StringValue::from(payload.to_string())) });
        span.end();
    }
    consumer.commit_message(&m, CommitMode::Async).unwrap();
    //...
Enter fullscreen mode Exit fullscreen mode

Most of the consumer code stayed the same. In (1), we use the OpenTelemetry TextMapPropagator (1) to extract our context. All spans, in this case just one, will be created within this context (2).

Let's look at the implementation of HeaderExtractor.

    use opentelemetry_api::propagation::{Extractor, Injector};
    use rdkafka::message::{Headers, OwnedHeaders, BorrowedHeaders};

    //...

    pub struct HeaderExtractor<'a>(pub &'a BorrowedHeaders);

    impl<'a> Extractor for HeaderExtractor<'a> {
        fn get(&self, key: &str) -> Option<&str> {
            for i in 0..self.0.count() {
                if let Ok(val) = self.0.get_as::<str>(i) {
                    if val.key == key {
                        return val.value
                    }
                }
            }
            None
        }

        fn keys(&self) -> Vec<&str> {
            self.0.iter().map(|kv| kv.key).collect::<Vec<_>>()
        }
    }
Enter fullscreen mode Exit fullscreen mode

Similar to our Injector implementation, we implement the Extractor trait provided by OpenTelemetry. Extractor requires us to implement two methods: get and keys. Also, note how we're using a different header struct here. Instead of OwnedHeaders, we now work with BorrowedHeaders. When we consume a message, headers are stored as BorrowedHeaders.

Connected Producer and Consumer Spans

Looking at Jaeger, we can verify that the producer and consumer spans are now connected.

The Bottom Line

In a nutshell, it is possible to "trace through" a Kafka cluster with OpenTelemetry. With a bit of custom code, we can leverage many of OpenTelemetry's built-in mechanisms to achieve this objective. Tracing through a Kafka cluster helps us understand better how our code runs in production. What if you could enhance that picture with even more insight, combining it with insight about your Kafka cluster, as well? That's where Cisco’s Calisti comes in.

Calisti provides you with a holistic view of both the application and Kafka cluster health. It also gives you at-a-glance health information. Need to see how well your brokers are doing? Done.

Need to manage topics and permissions but feel as though using the Kafka shell scripts is tedious? Use kubectl instead:

    $ cat << EOF | kubectl apply -n kafka -f -
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaTopic
    metadata:
        name: my-topic
    spec:
        clusterRef:
            name: kafka
        name: my-topic
        partitions: 1
        replicationFactor: 1
        config:
            "retention.ms": "604800000"
            "cleanup.policy": "delete"
    EOF
Enter fullscreen mode Exit fullscreen mode

If you’d like to give this a try, you can try out Calisti’s free tier. It lets you test the product in your own Kubernetes environment—no credit card required.

Source code: https://github.com/schultyy/kafka-tracing-blog-post-example-code/tree/main

Top comments (1)

Collapse
 
fmassot profile image
François Massot

This is indeed a good example. I can't resist to tell you that I worked on a storage backend for Jaeger.. not elasticsearch nor cassandra but an OSS search engine written in Rust :) github.com/quickwit-oss/quickwit

And we are Kafka native and we eat our own traces!