DEV Community

Daniel Baptista Dias for Kubeshop

Posted on • Originally published at tracetest.io

OpenTelemetry Trace Context Propagation for gRPC Streams

gRPC is a modern, open-source remote procedure call (RPC) framework developed by Google and broadly adopted today through many enterprise systems. Built on the HTTP/2 protocol, it is commonly used in microservices architecture because of its performance and support for communication between services written in different programming languages.

One interesting feature of gRPC is its ability to enable communication via streaming. This allows systems to listen to these streams and fetch data as it is available instead of making polling strategies to external systems to get their data, which avoids overloading the consumer of the data with requests.

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801061/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-diagram-1_ox9pcr.png

In this article, you will see an example of a system written in Go that uses gRPC streams to send data to consumers and learn:

  1. how to instrument it with Traces using OpenTelemetry;
  2. how to set the context propagation to track the processing of each data item;
  3. how to test it and guarantee that the data item is properly processed.

The code sample for this article is available here, and you can run it with:

git clone git@github.com:kubeshop/tracetest.git
cd ./examples/quick-start-grpc-stream-propagation

TRACETEST_API_KEY=your-api-key docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Using gRPC Streams to Communicate Between Systems

Suppose you have a system written in Go that receives user payments and notifies a worker that this payment has arrived, identifies if it is a high-value payment, and to do further processing:

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801214/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-diagram-2_gp5c6t.png

To avoid asking the PaymentReceiverAPIfor notifications, you model two endpoints: ReceivePayment to receive payments and NotifyPayment to emit these notifications. You can specify it with the following protobuf file (full example here):

syntax = "proto3";

package proto;
option go_package = "your.module.path/proto";

service PaymentReceiver {
    rpc ReceivePayment(Payment) returns (ReceivePaymentResponse) {}

    rpc NotifyPayment(Empty) returns (stream PaymentNotification) {}
}

message Empty {}

message Payment {
    string customerId = 1;
    float amount = 2;
}

message ReceivePaymentResponse {
    bool received = 1;
}

message PaymentNotification {
    Payment payment = 1;
    bool highValuePayment = 2;
}
Enter fullscreen mode Exit fullscreen mode

In a simple implementation for the PaymentReceiverAPI (full implementation here), ReceivePayment will enqueue the request for further processing, telling the user that it was received while processing the item:

package main

import (
    // ...
    pb "your.module.path/proto"
)

type serverImpl struct { // Implement the PaymentReceiverServer interface
    pb.PaymentReceiverServer
}

var paymentChannel = make(chan *pb.Payment) // act as an "in-memory" queue

func (s *serverImpl) ReceivePayment(ctx context.Context, payment *pb.Payment) (*pb.ReceivePaymentResponse, error) {
    go func() { // enqueue payment
        paymentChannel <- payment
    }()
    return &pb.ReceivePaymentResponse{Received: true}, nil
}
// to continue
Enter fullscreen mode Exit fullscreen mode

While NotifyPayment will read from this queue, detect if the payment has a high value, and publish it into a stream:

func (s *serverImpl) NotifyPayment(_ *pb.Empty, stream pb.PaymentReceiver_NotifyPaymentServer) error {
    for {
        payment, ok := <-paymentChannel //dequeue
        if !ok {
            return nil
        }

        highValuePayment := payment.Amount > 10_000

        notification := &pb.PaymentNotification{
            Payment:          payment,
            HighValuePayment: highValuePayment,
        }

        if err := stream.Send(notification); err != nil {
            return err
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

As a consumer, you can create a simple worker that will consume the gRPC API and call the NotifyPayment endpoint, opening a stream connection and receiving notifications as they are available through the stream, with the command stream.Recv() (full example here):

package main

import (    
  // ...
    pb "your.module.path/proto"
)

func main() {
    ctx := context.Background()

    grpcClient, err := grpc.NewClient(/* ... */)
    if err != nil {
        log.Fatalf("could not connect to producer API: %v", err)
    }

    log.Printf("Connected to producer API at %s", producerAPIAddress)

    client := pb.NewPaymentReceiverClient(grpcClient)

    stream, err := client.NotifyPayment(ctx, &pb.Empty{}, grpc.WaitForReady(true))
    if err != nil {
        log.Fatalf("could not receive payment notifications: %v", err)
    }

    log.Printf("Listening for payment notifications...")

    for {
        notification, err := stream.Recv()
        if err == io.EOF {
            log.Printf("No more payment notifications")
            return
        }
        if err != nil {
            log.Fatalf("could not receive payment notification: %v", err)
        }

        // process notifications
        processPaymentNotification(notification)
    }
}

func processPaymentNotification(notification *pb.PaymentNotification) {
    log.Printf("Received payment notification: %v", notification)
}
Enter fullscreen mode Exit fullscreen mode

Using grpcurl, you can simulate a customer adding a payment of $50000 by calling your service with the following command:

grpcurl -plaintext -proto ./proto/paymentreceiver.proto -d '{ "customerId": "1234", "amount": 50000 }' localhost:8080 proto.PaymentReceiver/ReceivePayment

# It should output:
# {
#   "received": true
# }
Enter fullscreen mode Exit fullscreen mode

Also, you should see the following output from the consumer:

Received payment notification: payment:{customerId:"1234" amount:50000} highValuePayment:true}
Enter fullscreen mode Exit fullscreen mode

Adding OpenTelemetry to the System

OpenTelemetry is an open-source observability framework for generating, capturing, and collecting telemetry data such as logs, metrics, and traces from software services and applications. For this article, we will focus on configuring traces in the system, so you can see the entire distributed operation of processing a payment.

First, you need to set up a basic infrastructure, with an OpenTelemetry (OTel) Collector to receive traces and Jaeger to store them, structuring the system like this:

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801271/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-diagram-3_mbjcze.png

To simplify the setup, you will set up both in a docker-compose.yaml file (full example here), like this:

services:
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.101.0
    command:
      - "--config"
      - "/otel-local-config.yaml"
    volumes:
      - ./collector.config.yaml:/otel-local-config.yaml
    ports:
      - 4317:4317
    depends_on:
      jaeger:
        condition: service_started

  jaeger:
    image: jaegertracing/all-in-one:latest
    restart: unless-stopped
    ports:
      - 16686:16686
      - 16685:16685
    environment:
      - COLLECTOR_OTLP_ENABLED=true
    healthcheck:
      test: ["CMD", "wget", "--spider", "localhost:16686"]
      interval: 1s
      timeout: 3s
      retries: 60

Enter fullscreen mode Exit fullscreen mode

A local collector.config.yaml will be used to configure the OTel Collector to receive traces and send them to Jaeger:

receivers:
  otlp:
    protocols:
      grpc:
      http:

processors:
  batch:
    timeout: 100ms

exporters:
  logging:
    loglevel: debug
  otlp/jaeger:
    endpoint: jaeger:4317
    tls:
      insecure: true

service:
  pipelines:
    traces/1:
      receivers: [otlp]
      processors: [batch]
      exporters: [otlp/jaeger]
Enter fullscreen mode Exit fullscreen mode

You can run both locally on your machine by executing docker compose up in the folder where you set up the files, with access to the Jaeger UI through http://localhost:16686/.

After configuring the infra, you will start to instrument your code by sending data to it. Since both the PaymentReceiverAPI and the Worker are written in Go, you will use OpenTelemetry Go to set up basic instrumentation and OpenTelemetry Go Contrib to instrument the gRPC server and client.

Add the following functions to your code to configure basic instrumentation:

// ...
const spanExporterTimeout = 1 * time.Minute

func setupOpenTelemetry(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) {
    log.Printf("Setting up OpenTelemetry with exporter endpoint %s and service name %s", otelExporterEndpoint, serviceName)

    spanExporter, err := getSpanExporter(ctx, otelExporterEndpoint)
    if err != nil {
        return nil, fmt.Errorf("failed to setup span exporter: %w", err)
    }

    traceProvider, err := getTraceProvider(spanExporter, serviceName)
    if err != nil {
        return nil, fmt.Errorf("failed to setup trace provider: %w", err)
    }

    return traceProvider.Tracer(serviceName), nil
}

func getSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace.SpanExporter, error) {
    ctx, cancel := context.WithTimeout(ctx, spanExporterTimeout)
    defer cancel()

    conn, err := grpc.NewClient(
        otelExporterEndpoint,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
    }

    traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
    if err != nil {
        return nil, fmt.Errorf("failed to create trace exporter: %w", err)
    }

    return traceExporter, nil
}

func getTraceProvider(spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) {
    defaultResource := resource.Default()

    mergedResource, err := resource.Merge(
        defaultResource,
        resource.NewWithAttributes(
            defaultResource.SchemaURL(),
            semconv.ServiceNameKey.String(serviceName),
        ),
    )
    if err != nil {
        return nil, fmt.Errorf("failed to create otel resource: %w", err)
    }

    tp := sdkTrace.NewTracerProvider(
        sdkTrace.WithBatcher(spanExporter),
        sdkTrace.WithResource(mergedResource),
    )

    otel.SetTracerProvider(tp)

    otel.SetTextMapPropagator(
        propagation.NewCompositeTextMapPropagator(
            propagation.TraceContext{},
            propagation.Baggage{},
        ),
    )

    return tp, nil
}
Enter fullscreen mode Exit fullscreen mode

This setupOpenTelemetry function will configure a spanExporter to send telemetry data to the OTel Collector via gRPC with the address specified with otelExporterEndpoint (which can be localhost:4317 if you are running your API locally, or otel-collector:4317 if you are running the API inside docker), and then set up a traceProvider globally to start capturing traces.

You can call this function from your entrypoint in main with code like this (PaymentReceiver API example and Worker example):

func main() {
    otelExporterEndpoint := // ...
    otelServiceName := // ...

    tracer, err := setupOpenTelemetry(context.Background(), otelExporterEndpoint, otelServiceName)
    if err != nil {
        log.Fatalf("failed to initialize OpenTelemetry: %v", err)
        return
    }

    // ...
}
Enter fullscreen mode Exit fullscreen mode

Finally, you need to configure both the gRPC server and clients to start creating spans for each operation by setting up the OTel Contrib middleware:

// in server
grpcServer := grpc.NewServer(
    grpc.StatsHandler(otelgrpc.NewServerHandler()),
)

// in client
grpcClient, err := grpc.NewClient(
    /* ... */,
    grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
Enter fullscreen mode Exit fullscreen mode

Now you can run the system again with OpenTelemetry. To see that it is working you can execute another grpcurl and check in Jaeger to see it registered:

grpcurl -plaintext -proto ./proto/paymentreceiver.proto -d '{ "customerId": "1234", "amount": 50000 }' localhost:8080 proto.PaymentReceiver/ReceivePayment

# after result, go to http://localhost:16686/search
Enter fullscreen mode Exit fullscreen mode

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801343/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-jaeger-1_nlf78a.png

However, there is a problem.

You can see two traces for the PaymentReceiverAPI and the Worker, for each part of the process, but you cannot see it together as a single timeline. This happens due to the lack of Trace Context propagation.

The OpenTelemetry library does not have the trace metadata to identify which trace is the parent. Newly created spans are instead created as another trace instead of getting added to the trace parent.

Fixing Context Propagation for Producer and Consumer

To propagate context through HTTP systems, OpenTelemetry libraries use HTTP Headers to send metadata informing other APIs that a trace was generated previously by another API, usually by the traceparent header, that contains the TraceID of the current transaction.

Since the Worker uses a streaming client for multiple messages (a single HTTP call to continuously receive data), you cannot rely on HTTP headers to track each time a piece of data is received. To solve that, you need to manually set the Context Propagation for your payment notifications, by attaching metadata to each notification with the traceparent.

Also, we will add manual instrumentation to track internal operations, so we can trace the following operations:

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801397/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-diagram-4_ntzdse.png

First, you will change the Protobuf definition of the PaymentNotification to have metadata, like this:

// ...

message PaymentNotification {
    Payment payment = 1;
    bool highValuePayment = 2;
    map<string, string> metadata = 3; // new field
}
Enter fullscreen mode Exit fullscreen mode

Then update the generated server and clients to have these fields (usually done by protoc command, or in the code sample, using make build-proto).

In Go, OTel libraries rely on setting the tracing metadata on the context.Context object to track operations. In the next step, we will capture the trace propagation metadata in the context and inject it into the notification. To do that, create the following helper functions:

func injectMetadataIntoContext(ctx context.Context, metadata map[string]string) context.Context {
    propagator := otel.GetTextMapPropagator()

    return propagator.Extract(
        ctx,
        propagation.MapCarrier(metadata),
    )
}

func extractMetadataFromContext(ctx context.Context) map[string]string {
    propagator := otel.GetTextMapPropagator()

    metadata := map[string]string{}
    propagator.Inject(
        ctx,
        propagation.MapCarrier(metadata),
    )

    return metadata
}
Enter fullscreen mode Exit fullscreen mode

Then, change the server to handle this metadata and add manual instrumentation for receiving and enqueuing payments:

type paymentWithMetadata struct {
    payment  *pb.Payment
    metadata map[string]string
}

// Guarantee that the serverImpl implements the PaymentReceiverServer interface
var _ pb.PaymentReceiverServer = &serverImpl{}

// Channel to store payments and used as a "in-memory" queue
var paymentChannel = make(chan *paymentWithMetadata)

func (s *serverImpl) ReceivePayment(ctx context.Context, payment *pb.Payment) (*pb.ReceivePaymentResponse, error) {
    go func() {
        ctx, span := s.tracer.Start(ctx, "EnqueuePayment")
        defer span.End()

        message := &paymentWithMetadata{
            payment:  payment,
            metadata: extractMetadataFromContext(ctx),
        }

        // handle channel as in-memory queue
        paymentChannel <- message
    }()

    return &pb.ReceivePaymentResponse{Received: true}, nil
}
Enter fullscreen mode Exit fullscreen mode

Then, for sending it through the stream:

func (s *serverImpl) NotifyPayment(_ *pb.Empty, stream pb.PaymentReceiver_NotifyPaymentServer) error {
    for {
        message, ok := <-paymentChannel
        if !ok {
            return nil
        }

        ctx := injectMetadataIntoContext(context.Background(), message.metadata)
        ctx, span := s.tracer.Start(ctx, "SendPaymentNotification")

        payment := message.payment
        highValuePayment := payment.Amount > 10_000

        notification := &pb.PaymentNotification{
            Payment:          payment,
            HighValuePayment: highValuePayment,
        }

        // extract OTel data from context and add it to the notification
        notification.Metadata = extractMetadataFromContext(ctx)

        if err := stream.Send(notification); err != nil {
            return err
        }

        span.End()
    }
}
Enter fullscreen mode Exit fullscreen mode

With the PaymentReceiverAPI instrumented, the last step is to change the Worker to get the tracing metadata and start registering spans linked to the current operation:

func processPaymentNotification(tracer trace.Tracer, notification *pb.PaymentNotification) {
    messageProcessingCtx := injectMetadataIntoContext(context.Background(), notification.Metadata)
    _, span := tracer.Start(messageProcessingCtx, "ProcessPaymentNotification")
    defer span.End()

    log.Printf("Received payment notification: %v", notification)
}
Enter fullscreen mode Exit fullscreen mode

Now, use grpcurl again and check Jaeger. You should be able to see one trace for both PaymentReceiverAPI and Worker, with the entire operation in one timeline:

grpcurl -plaintext -proto ./proto/paymentreceiver.proto -d '{ "customerId": "1234", "amount": 50000 }' localhost:8080 proto.PaymentReceiver/ReceivePayment

# after result, go to http://localhost:16686/search
Enter fullscreen mode Exit fullscreen mode

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801483/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-jaeger-2_vroshy.png

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801529/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-jaeger-3_vv5bja.png

Testing a Payment Being Processed

To evaluate and guarantee that everything is working properly, you can create a trace-based test that triggers a gRPC call against the API and validates whether the trace is logged as intended and the payment is correctly processed in each part of the system.

To do that, we will use Tracetest, which triggers service calls (in our case, gRPC calls like our grpcurl calls) and validate the emitted traces to ensure that our observability stack works as intended.

First, you will create a new account on Tracetest, and then create a new organization and a new environment. This you allow you to have an API Key for your agent, you will start the local stack with a new container with a Tracetest Agent:

TRACETEST_API_KEY=your-api-key docker compose up -d
Enter fullscreen mode Exit fullscreen mode

Then, you will install Tracetest CLI and configure it to access your environment with the command below. It will guide you to connect to your personal-org and environment.

tracetest configure

# This command will print some instructions interactively to help to connect to your env:

# What tracetest server do you want to use? (default: https://app.tracetest.io/)

# What Organization do you want to use?:
#  > personal-org (ttorg_000000000000000)
# What Environment do you want to use?:
#  > OTel (ttenv_000000000000000)
# SUCCESS  Successfully configured Tracetest CLI
Enter fullscreen mode Exit fullscreen mode

Now, configure the Tracetest Agent to connect to local Jaeger, using the following command:

tracetest apply datastore -f ./tracetest/tracetest-tracing-backend.yaml 

# It will send the following output, which means that our environment was correctly configured:

# type: DataStore
# spec:
#  id: current
#  name: Jaeger
#  type: jaeger
#  default: true
#  createdAt: 2023-10-31T00:30:47.137194Z
#  jaeger:
#    endpoint: jaeger:16685
#    tls:
#      insecure: true
Enter fullscreen mode Exit fullscreen mode

Next, write a test that checks the trace generated by calling ReceivePayment gRPC endpoint and validate:

  1. if ReceivePayment gRPC endpoint is properly called.
  2. if a payment is enqueued to be sent.
  3. if a payment notification is sent through a gRPC stream.
  4. if the payment notification is received and processed.

To do that, we will create a test file called ./trace-based-test.yaml with the following contents:

type: Test
spec:
  id: pprDfSUSg
  name: Test gRPC Stream Propagation
  trigger:
    type: grpc
    grpc:
      address: producer-api:8080
      method: proto.PaymentReceiver.ReceivePayment
      protobufFile: ./proto/paymentreceiver.proto
      request: |
        {
          "customerId": "1234",
          "amount": 50000
        }
  specs:
  - selector: span[name="proto.PaymentReceiver/ReceivePayment"]
    name: It should call ReceivePayment gRPC endpoint
    assertions:
    - attr:tracetest.selected_spans.count = 1
  - selector: span[name="EnqueuePayment"]
    name: In should enqueue a payment to send it in a stream
    assertions:
    - attr:tracetest.selected_spans.count = 1
  - selector: span[name="SendPaymentNotification"]
    name: It should send a payment notification through a gRPC stream
    assertions:
    - attr:tracetest.selected_spans.count = 1
  - selector: span[name="ProcessPaymentNotification"]
    name: It should receive a PaymentNotification through a stream and process it
    assertions:
    - attr:tracetest.selected_spans.count = 1
  - selector: span[name="proto.PaymentReceiver/ReceivePayment"] span[name="EnqueuePayment"] span[name="SendPaymentNotification"] span[name="ProcessPaymentNotification"]
    name: The trace shape is correct
    assertions:
    - attr:tracetest.selected_spans.count = 1
Enter fullscreen mode Exit fullscreen mode

Note that under the spec.specs section, you have one assertion for each span emitted by the system, checking every process step. Also, you have one last step that checks if each step is executed in the correct order.

To run it, you can execute the following command:

tracetest run test --file ./trace-based-test.yaml

# It will output:
# ✔ RunGroup: #b_CBSFUIg (https://app.tracetest.io/organizations/your-organization-id/environments/your-environment-id/run/b_CBSFUIg)
#  Summary: 1 passed, 0 failed, 0 pending
#   ✔ Test gRPC Stream Propagation (https://app.tracetest.io/organizations/your-organization-id/environments/your-environment-id/test/pprDfSUSg/run/1/test) - trace id: 4ec68b1a3aaa57aecf0098dd7b4a9916
#         ✔ It should call ReceivePayment gRPC endpoint
#         ✔ In should enqueue a payment to send it in a stream
#         ✔ It should send a payment notification through a gRPC stream
#         ✔ It should receive a PaymentNotification through a stream and process it
#         ✔ The trace shape is correct
Enter fullscreen mode Exit fullscreen mode

You can also see this output in the Tracetest UI through the links printed in the CLI output:

https://res.cloudinary.com/djwdcmwdz/image/upload/v1718801588/Blogposts/Trace-Context-Propagation-for-gRPC-Streams/grpc-tracetest_ubpi45.png

Conclusion

gRPC streams are a great way to integrate between APIs to send a continuous flow of data. However, tracking a single trace for an operation can be tricky because of how OpenTelemetry Context propagation works for HTTP.

This article provides a guide on how to implement trace context propagation for gRPC streams using OpenTelemetry, fix context propagation issues for producers and consumers, and test them.

The example sources used in this article and setup instructions are available in the Tracetest GitHub repository.

Would you like to learn more about Tracetest and what it brings to the table? Visit the Tracetest docs and try it out by signing up today!

Also, please feel free to join our Slack Community, give Tracetest a star on GitHub, or schedule a time to chat 1:1.

Top comments (0)