DEV Community

Cover image for Log me Baby – Shipping Rust logs straight to Elasticsearch with OpenTelemetry
Dimitris Verraros
Dimitris Verraros

Posted on • Edited on

Log me Baby – Shipping Rust logs straight to Elasticsearch with OpenTelemetry

A couple of months ago I joined CloudRift, tagging along my long time peer Dmitry in his new venture. One of the first things that came up was setting up log aggregation — something many projects start without, but quickly realize they need after spinning up more than one server.

It should be a mundane task after having done this a few times, but yet again, there is a twist and something new to discover. I pull out my usual cards and start with ELK, since even though there are serious contenders out there, I still find Kibana unbeatable, and most people love it.

CloudRift deploys its agents on pools of nodes which in turn communicate with an orchestrator that acts as a centralized resource broker. While in most scenarios log forwarding is handled by a daemon, like filebeat or fluentd, with CloudRift we want to push the logs directly from every instance to simplify installation in various environments. And this is where the fun starts...

Who is this for?

This is going to be a detailed technical post on how to approach log aggregation. While it assumes a technical background, it should be fairly easy to follow as a beginner. It can also be a good read if you are revisiting this problem and you want to know how OpenTelemetry can play into it. Finally, it can be a high-protein meal for your AI agent tasked with implementing a logging pipeline.

If it is time to ship the logs of your Rust application to Elasticsearch, go ahead and read on.

The Spectacular Work of Log Forwarding

This seems trivial on the surface. With Elasticsearch on the backend, having SDKs in most popular programming languages, we should be able to push the logs with a few lines of code. This would work but would disregard many of the caveats of log forwarding. Here are a few:

  • Buffering
  • Batching
  • Retries
  • Non-blocking

All these aspects are normally handled by the log forwarding daemon, but in this case, we have to take care of them while making sure we don't drop any logs or crash the application. To my delight, the OpenTelemetry project has made great advances. And this feels like the right time to jump into it.

OpenTelemetry meets Rust

CloudRift is written in Rust and is already using tracing. The OpenTelemetry project has a Rust SDK, which in turn integrates with the tracing framework, so this looks like a no-brainer.

The puzzle is getting complete with the addition of the OpenTelemetry collector. It is the layer that connects the generic OpenTelemetry exporter to the specific backend, which in our case is Elasticsearch.

Here is a diagram outlining the desired flow:

OpenTelemetry Log Pipeline

Our application instances will be pushing the tracing logs to the OpenTelemetry collector, which in turn will format them and write them as JSON documents to Elasticsearch.

So let's go ahead and get this up and running.

Getting down with the setup

As usual, it is sensible to develop a prototype that runs locally. With some help from AI and a few minor edits we get our first docker-compose:

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:9.0.0
    container_name: elasticsearch
    environment:
      - node.name=elasticsearch
      - cluster.name=es-docker-cluster
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.egd=file:/dev/./urandom"
      - xpack.security.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data
    ports:
      - 127.0.0.1:9200:9200
    networks:
      - elk-network

  # Kibana: Web UI for visualizing and searching logs
  kibana:
    image: docker.elastic.co/kibana/kibana:9.0.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - 127.0.0.1:5601:5601
    depends_on:
      - elasticsearch
    networks:
      - elk-network

  # OpenTelemetry Collector: Receives OTLP logs and forwards to Elasticsearch
  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    container_name: otel-collector
    command: [ "--config=/etc/otel-collector-config.yaml" ]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
    environment:
      - SERVICE_NAME=rift
      - SERVICE_ENVIRONMENT=production
    ports:
      - 127.0.0.1:4318:4318  # OTLP HTTP receiver
    depends_on:
      - elasticsearch
    networks:
      - elk-network

networks:
  elk-network:
    driver: bridge

volumes:
  elasticsearch-data:
Enter fullscreen mode Exit fullscreen mode

And in turn our collector config:

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 1s
  memory_limiter:
    check_interval: 1s
    limit_mib: 1000
  resource:
    attributes:
      - key: service.environment
        value: ${SERVICE_ENVIRONMENT}
        action: upsert

exporters:
  elasticsearch/logs:
    endpoints: [ "http://elasticsearch:9200" ]
    logstash_format:
      enabled: true

service:
  pipelines:
    logs:
      receivers: [ otlp ]
      processors: [ batch, memory_limiter, resource ]
      exporters: [ elasticsearch/logs ]
Enter fullscreen mode Exit fullscreen mode

This is a lot to digest and probably the most important bit of the setup so let's break it down:

Receivers

First we define the collector's OTLP receiver.

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
Enter fullscreen mode Exit fullscreen mode

There is nothing complicated here. We listen to HTTP on port 4318, which is the default for OTLP. We could additionally use gRPC if we wanted but HTTP is sufficient for us at the moment.

Processors

Next we define the processor.

processors:
  batch:
    timeout: 1s
  memory_limiter:
    check_interval: 1s
    limit_mib: 1000
  resource:
    attributes:
      - key: service.environment
        value: ${SERVICE_ENVIRONMENT}
        action: upsert
Enter fullscreen mode Exit fullscreen mode

This is one of the most important parts of the collector. It is responsible for processing the data that is received and handling aspects like batching and rate limiting. This ensures that the collector doesn't overload the backend while adding useful metadata. In our case, it aggregates logs in batches of 1 second and limits the memory usage to 1000 MiB. The batch timeout can be easily increased. A lower value is gives a more real-time view of the logs, while a higher value is relatively more efficient by reducing the number of requests to the backend. A lower value is also useful for testing to facilitate more instant feedback.

While the batch and memory processors are part of the core collector repository, the resource processor is part of the contrib collection. The resource processor is particularly useful for adding common metadata to all telemetry data passing through the collector. For example, we can use it to amend our logs with the service name and environment to enrich the information that is sent to the backend. You can read more about it here.

Exporters

The exporter is the part of the collector that actually sends the data to the backend. In our case we are using the Elasticsearch exporter.

exporters:
  elasticsearch/logs:
    endpoints: [ "http://elasticsearch:9200" ]
    logstash_format:
      enabled: true
Enter fullscreen mode Exit fullscreen mode

Here you can tweak many settings about how the logs are shipped to Elasticsearch. In our case we are simply setting the endpoint to our local Elasticsearch instance which is routed to the elasticsearch service defined our docker-compose file. Additionally, we enable the Logstash format to automatically create indices on a daily basis. This is pretty useful when it comes to log retention since it is very easy to delete old elasticsearch indices, while purging logs from a big index is nearly impossible. One can further expand the possible settings to tune the exporter to your needs like flush interval, timeout, retries, etc. For now, we rely on the defaults but more details can be found here.

Service

Finally, we define the service section. This is where we put together the components defined above to create the final pipeline.

service:
  pipelines:
    logs:
      receivers: [ otlp ]
      processors: [ batch, memory_limiter, resource ]
      exporters: [ elasticsearch/logs ]
Enter fullscreen mode Exit fullscreen mode

We are collecting logs from the OTLP receiver, processing them with the defined processors and exporting them with the Elasticsearch exporter. Later this is the section where we can add extensions like authentication.

OTEL Log Appender for tracing

This has been pretty straightforward so far. Now comes the time when we integrate the OpenTelemetry log appender with our application. We will use the OTEL tracing appender.

The README has no instructions on how to use it, but thankfully there is an example to start with. Since we already have tracing set up, we can simply create the otel_layer and just add it to the tracing registry:

use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::{prelude::*, EnvFilter};

fn main() {
    let exporter = opentelemetry_stdout::LogExporter::default();
    let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
        .with_resource(
            Resource::builder()
                .with_service_name("log-appender-tracing-example")
                .build(),
        )
        .with_simple_exporter(exporter)
        .build();

    let filter_otel = EnvFilter::new("info")
        .add_directive("hyper=off".parse().unwrap())
        .add_directive("opentelemetry=off".parse().unwrap())
        .add_directive("tonic=off".parse().unwrap())
        .add_directive("h2=off".parse().unwrap())
        .add_directive("reqwest=off".parse().unwrap());
    let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel);

    tracing_subscriber::registry()
        .with(otel_layer)
        .init();

    error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message");
    let _ = provider.shutdown();
}
Enter fullscreen mode Exit fullscreen mode

We put this together and run it, but we quickly stumble upon the compiler error:

error[E0277]: the trait bound `opentelemetry_stdout::LogExporter: opentelemetry_sdk::logs::LogExporter` is not satisfied
   --> src/main.rs:14:31
Enter fullscreen mode Exit fullscreen mode

For a Rust newbie I start sweating a bit. First I switch to the opentelemetry_otlp::LogExporter and see if this gets me further. The default() constructor is not there anymore, so I go for the builder instead per my IDEs suggestion.

diff --git a/src/main.rs b/src/main.rs
index 1af9407..67b10b1 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,10 +1,11 @@
 use opentelemetry_appender_tracing::layer;
+use opentelemetry_otlp::LogExporter;
 use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
 use tracing::error;
 use tracing_subscriber::{prelude::*, EnvFilter};

 fn main() {
-    let exporter = opentelemetry_stdout::LogExporter::default();
+    let exporter = LogExporter::builder().with_http().build().unwrap();
     let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
         .with_resource(
             Resource::builder()

Enter fullscreen mode Exit fullscreen mode

We go ahead with running the program and it successfully completes. No error message in our stdout though so let's check Kibana.

Kibana

And voilà! We have our first log entry! On the side we can see all the details of the event:

Log entry

It is interesting to note a few details here. First, the index is automatically created for us on today's date. Second, we can see the service environment attribute that we added to the resource processor. Finally, we can see that each parameter of our log entry is indexed separately unlocking many search capabilities.

Moving to production

Our little POC is working locally, so it is time to make this fly. We are first going to test with pushing our logs to the local exporter which in turn pushes it to the remote Elasticsearch instance. Then we deploy our collector and let our application push its logs to it.

Deploying Elasticsearch

We went for deploying a cloud instance of Elasticsearch offered from their website, however as with the local instance, this setup would work with any Elasticsearch instance. First We need to add the basicauth extension and provide our credentials to it along with the remote Elasticsearch endpoint.

       diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml
index b2a3411..9b45f97 100644
--- a/otel-collector-config.yaml
+++ b/otel-collector-config.yaml
@@ -18,11 +18,20 @@ processors:

 exporters:
   elasticsearch/logs:
-    endpoints: ["http://elasticsearch:9200"]
+    endpoint: "https://****.kb.****.es.io:443"
+    auth:
+      authenticator: basicauth
     logstash_format:
       enabled: true

+extensions:
+  basicauth:
+    client_auth:
+      username: *******
+      password: *******
+
 service:
+  extensions: [basicauth]
   pipelines:
     logs:
       receivers: [otlp]
Enter fullscreen mode Exit fullscreen mode

We wait hoping to see the first log appearing in our remote instance, but instead we get the following error in the collector's logs:

2025-04-30T09:57:26.454836419Z 2025-04-30T09:57:26.454Z error   elasticsearchexporter@v0.121.0/bulkindexer.go:346   bulk indexer flush error    {"otelcol.component.id": "elasticsearch/logs", "otelcol.component.kind": "Exporter", "otelcol.signal": "logs", "error": "flush failed (404): "}
Enter fullscreen mode Exit fullscreen mode

flush failed (404) This can't be right. Having a closer look though I spot a little .kb. as part of the subdomain of the Elasticsearch dashboard used in our configuration. Clearly this is not the elasticsearch endpoint but the kibana one. Looking into the cloud dashboard of Elastic and I find a place to copy the URL for Elasticsearch from the deployment's management page:

Elastic Management Console

Funny enough, the only bit that I had to change was .kb. to .es..

diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml
index 9b45f97..f8f9208 100644
--- a/otel-collector-config.yaml
+++ b/otel-collector-config.yaml
@@ -18,7 +18,7 @@ processors:

 exporters:
   elasticsearch/logs:
-    endpoint: "https://*****.kb.****.es.io:443"
+    endpoint: "https://*****.es.****.es.io:443"
     auth:
       authenticator: basicauth
     logstash_format:
Enter fullscreen mode Exit fullscreen mode

We try another run of our service and successfully see our log entry in the remote Elasticsearch instance!

There is an interesting sidenote here. Instead of the basic auth used in the above example one can also use API keys. At the date of writing, the documentation mentions the api_key option, which was my initial approach, but since there is no example of how to use it, or a complete schema definition, the position of this setting quickly became very elusive. The unorthodox way I managed to get it to work was by following the Kibana onboarding which in turn provides you with zip with some pre-configured settings and a new API key to use with. Eventually, the setting is defined right next to the endpoint:

exporters:
  elasticsearch/logs:
    endpoint: "https://*****.es.****.es.io:443"
    api_key: "BASE64_ENCODED_API_KEY"
    ...
Enter fullscreen mode Exit fullscreen mode

More API keys can be created from within Kibana under the security section of the stack management page. This is important since in the case of the cloud offering, it should not be confused with any key management settings provided on that console instead.

Deploying OpenTelemetry Collector

Since the collector is a stateless container, we can easily deploy it anywhere by using the upstream docker image and attaching the complete configuration file to it as a secret considering the sensitive values defined in it. We used Google Cloud's CloudRun service to deploy the collector, which is a serverless platform provided by Google based on Knative. The following is a sample knative service definition that deploys the collector so make sure you adapt to your needs.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: opentelemetry-collector
  namespace: your-namespace    # Replace with your target namespace
  labels:
    app: opentelemetry-collector
  annotations:
    # Add any relevant Knative or general annotations here
    # e.g., autoscaling.knative.dev/minScale: "1"
    autoscaling.knative.dev/maxScale: "5"
spec:
  template:
    metadata:
      labels:
        app: opentelemetry-collector
      annotations:
        # Add any relevant pod-level annotations here
        # e.g., sidecar.istio.io/inject: "true" if using Istio
    spec:
      containerConcurrency: 80
      timeoutSeconds: 300
      containers:
        - name: opentelemetry-collector-container
          image: otel/opentelemetry-collector-contrib:latest
          args:
            - --config=/etc/otel/otel-collector-config.yaml
          ports:
            - name: otlp-grpc
              containerPort: 4317
            - name: otlp-http
              containerPort: 4318
            - name: health-check
              containerPort: 13133
          env:
            - name: SERVICE_NAME
              value: your-application-name
            - name: DEPLOYMENT_ENVIRONMENT
              value: your-environment
          resources:
            limits:
              cpu: "1"
              memory: 512Mi
            requests:
              cpu: 250m
              memory: 256Mi
          volumeMounts:
            - name: otel-collector-config-vol
              mountPath: /etc/otel
          startupProbe:
            initialDelaySeconds: 10
            timeoutSeconds: 5
            periodSeconds: 10
            failureThreshold: 3
            tcpSocket:
              port: 13133
      volumes:
        - name: otel-collector-config-vol
          secret:
            secretName: opentelemetry-collector-config # Name of the K8s secret holding the config
            items:
              - key: otel-collector-config.yaml # The key in the secret data
                path: otel-collector-config.yaml # The filename to mount
  traffic:
    - percent: 100
      latestRevision: true
Enter fullscreen mode Exit fullscreen mode

Our final OTEL configuration looks like this:

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 1s
  memory_limiter:
    check_interval: 1s
    limit_mib: 1000
  resource:
    attributes:
      - key: service.environment
        value: ${SERVICE_ENVIRONMENT}
        action: upsert
      - key: service.name
        value: ${SERVICE_NAME}
        action: upsert

exporters:
  elasticsearch/logs:
    endpoint: "your-elasticsearch-endpoint:port"
    api_key: ${ES_API_KEY}
    logstash_format:
      enabled: true

service:
  pipelines:
    logs:
      receivers: [otlp]
      processors: [batch, memory_limiter, resource]
      exporters: [elasticsearch/logs]
Enter fullscreen mode Exit fullscreen mode

With this, we deployed our collector remotely as well. Now we need to point our application to it.

Pushing logs to the remote Collector

Let's go back to our application and define the remote collector endpoint. Looking at the docs this is not clearly stated. After a bit of digging, I offload the task to AI and it comes back with the following:

  • use the environment variable OTEL_EXPORTER_OTLP_ENDPOINT
  export OTEL_EXPORTER_OTLP_ENDPOINT="http://your-collector-endpoint:4318"

  cargo run
Enter fullscreen mode Exit fullscreen mode
  • or use the .with_endpoint method of the LogExporterBuilder
  diff --git a/src/main.rs b/src/main.rs
index 67b10b1..f08ff26 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,11 +1,17 @@
use opentelemetry_appender_tracing::layer;
-use opentelemetry_otlp::LogExporter;
+use opentelemetry_otlp::{LogExporter, Protocol, WithExportConfig};
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::{prelude::*, EnvFilter};

fn main() {
-    let exporter = LogExporter::builder().with_http().build().unwrap();
+    let exporter = LogExporter::builder()
+        .with_http()
+        .with_endpoint("http://your-collector-endpoint:4318/v1/logs")
+        .with_protocol(Protocol::HttpJson) // or Protocol::Grpc
+        .build()
+        .unwrap();
+
    let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
    .with_resource(
    Resource::builder()

Enter fullscreen mode Exit fullscreen mode

Note an important difference between the above; while the environment variable is good to go without setting any PATH in the URL, the with_endpoint method requires the /v1/logs/ PATH to be appended.

Here is the complete Rust code:

use opentelemetry_appender_tracing::layer;
use opentelemetry_otlp::{LogExporter, Protocol, WithExportConfig};
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::{prelude::*, EnvFilter};

fn main() {
    let exporter = LogExporter::builder()
        .with_http()
        .with_endpoint("http://your-collector-endpoint:4318/v1/logs")
        .with_protocol(Protocol::HttpJson) // or Protocol::Grpc
        .build()
        .unwrap();

    let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
        .with_resource(
            Resource::builder()
                .with_service_name("log-appender-tracing-example")
                .build(),
        )
        .with_simple_exporter(exporter)
        .build();

    let filter_otel = EnvFilter::new("info")
        .add_directive("hyper=off".parse().unwrap())
        .add_directive("opentelemetry=off".parse().unwrap())
        .add_directive("tonic=off".parse().unwrap())
        .add_directive("h2=off".parse().unwrap())
        .add_directive("reqwest=off".parse().unwrap());
    let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel);

    tracing_subscriber::registry()
        .with(otel_layer)
        .init();

    error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message");
    let _ = provider.shutdown();
}

Enter fullscreen mode Exit fullscreen mode

Wrapping up

With the solutions provided by the OpenTelemetry project, we could easily put the pieces together and get our application logging to Elasticsearch. While both the opentelemetry-rust SDK and the opentelemetry-collector exporter for Elasticsearch are in early stages of development, they are already bring a lot of value. the documentation can be improved for both, but with some AI agent doing some research in the codebase of the projects we managed to extract the information we need.

If you have any questions, comments or suggestions, please add to the discussion below!

Top comments (0)