A couple of months ago I joined CloudRift, tagging along my long time peer Dmitry in his new venture. One of the first things that came up was setting up log aggregation — something many projects start without, but quickly realize they need after spinning up more than one server.
It should be a mundane task after having done this a few times, but yet again, there is a twist and something new to discover. I pull out my usual cards and start with ELK, since even though there are serious contenders out there, I still find Kibana unbeatable, and most people love it.
CloudRift deploys its agents on pools of nodes which in turn communicate with an orchestrator that acts as a centralized resource broker. While in most scenarios log forwarding is handled by a daemon, like filebeat or fluentd, with CloudRift we want to push the logs directly from every instance to simplify installation in various environments. And this is where the fun starts...
Who is this for?
This is going to be a detailed technical post on how to approach log aggregation. While it assumes a technical background, it should be fairly easy to follow as a beginner. It can also be a good read if you are revisiting this problem and you want to know how OpenTelemetry can play into it. Finally, it can be a high-protein meal for your AI agent tasked with implementing a logging pipeline.
If it is time to ship the logs of your Rust application to Elasticsearch, go ahead and read on.
The Spectacular Work of Log Forwarding
This seems trivial on the surface. With Elasticsearch on the backend, having SDKs in most popular programming languages, we should be able to push the logs with a few lines of code. This would work but would disregard many of the caveats of log forwarding. Here are a few:
- Buffering
- Batching
- Retries
- Non-blocking
All these aspects are normally handled by the log forwarding daemon, but in this case, we have to take care of them while making sure we don't drop any logs or crash the application. To my delight, the OpenTelemetry project has made great advances. And this feels like the right time to jump into it.
OpenTelemetry meets Rust
CloudRift is written in Rust and is already using tracing. The OpenTelemetry project has a Rust SDK, which in turn integrates with the tracing framework, so this looks like a no-brainer.
The puzzle is getting complete with the addition of the OpenTelemetry collector. It is the layer that connects the generic OpenTelemetry exporter to the specific backend, which in our case is Elasticsearch.
Here is a diagram outlining the desired flow:
Our application instances will be pushing the tracing logs to the OpenTelemetry collector, which in turn will format them and write them as JSON documents to Elasticsearch.
So let's go ahead and get this up and running.
Getting down with the setup
As usual, it is sensible to develop a prototype that runs locally. With some help from AI and a few minor edits we get our first docker-compose:
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:9.0.0
container_name: elasticsearch
environment:
- node.name=elasticsearch
- cluster.name=es-docker-cluster
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.egd=file:/dev/./urandom"
- xpack.security.enabled=false
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- 127.0.0.1:9200:9200
networks:
- elk-network
# Kibana: Web UI for visualizing and searching logs
kibana:
image: docker.elastic.co/kibana/kibana:9.0.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- 127.0.0.1:5601:5601
depends_on:
- elasticsearch
networks:
- elk-network
# OpenTelemetry Collector: Receives OTLP logs and forwards to Elasticsearch
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
container_name: otel-collector
command: [ "--config=/etc/otel-collector-config.yaml" ]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
environment:
- SERVICE_NAME=rift
- SERVICE_ENVIRONMENT=production
ports:
- 127.0.0.1:4318:4318 # OTLP HTTP receiver
depends_on:
- elasticsearch
networks:
- elk-network
networks:
elk-network:
driver: bridge
volumes:
elasticsearch-data:
And in turn our collector config:
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
memory_limiter:
check_interval: 1s
limit_mib: 1000
resource:
attributes:
- key: service.environment
value: ${SERVICE_ENVIRONMENT}
action: upsert
exporters:
elasticsearch/logs:
endpoints: [ "http://elasticsearch:9200" ]
logstash_format:
enabled: true
service:
pipelines:
logs:
receivers: [ otlp ]
processors: [ batch, memory_limiter, resource ]
exporters: [ elasticsearch/logs ]
This is a lot to digest and probably the most important bit of the setup so let's break it down:
Receivers
First we define the collector's OTLP receiver.
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
There is nothing complicated here. We listen to HTTP on port 4318, which is the default for OTLP. We could additionally use gRPC if we wanted but HTTP is sufficient for us at the moment.
Processors
Next we define the processor.
processors:
batch:
timeout: 1s
memory_limiter:
check_interval: 1s
limit_mib: 1000
resource:
attributes:
- key: service.environment
value: ${SERVICE_ENVIRONMENT}
action: upsert
This is one of the most important parts of the collector. It is responsible for processing the data that is received and handling aspects like batching and rate limiting. This ensures that the collector doesn't overload the backend while adding useful metadata. In our case, it aggregates logs in batches of 1 second and limits the memory usage to 1000 MiB. The batch timeout can be easily increased. A lower value is gives a more real-time view of the logs, while a higher value is relatively more efficient by reducing the number of requests to the backend. A lower value is also useful for testing to facilitate more instant feedback.
While the batch and memory processors are part of the core collector repository, the resource processor is part of the contrib collection. The resource processor is particularly useful for adding common metadata to all telemetry data passing through the collector. For example, we can use it to amend our logs with the service name and environment to enrich the information that is sent to the backend. You can read more about it here.
Exporters
The exporter is the part of the collector that actually sends the data to the backend. In our case we are using the Elasticsearch exporter.
exporters:
elasticsearch/logs:
endpoints: [ "http://elasticsearch:9200" ]
logstash_format:
enabled: true
Here you can tweak many settings about how the logs are shipped to Elasticsearch. In our case we are simply setting the endpoint to our local Elasticsearch instance which is routed to the elasticsearch service defined our docker-compose file. Additionally, we enable the Logstash format to automatically create indices on a daily basis. This is pretty useful when it comes to log retention since it is very easy to delete old elasticsearch indices, while purging logs from a big index is nearly impossible. One can further expand the possible settings to tune the exporter to your needs like flush interval, timeout, retries, etc. For now, we rely on the defaults but more details can be found here.
Service
Finally, we define the service section. This is where we put together the components defined above to create the final pipeline.
service:
pipelines:
logs:
receivers: [ otlp ]
processors: [ batch, memory_limiter, resource ]
exporters: [ elasticsearch/logs ]
We are collecting logs from the OTLP receiver, processing them with the defined processors and exporting them with the Elasticsearch exporter. Later this is the section where we can add extensions like authentication.
OTEL Log Appender for tracing
This has been pretty straightforward so far. Now comes the time when we integrate the OpenTelemetry log appender with our application. We will use the OTEL tracing appender.
The README has no instructions on how to use it, but thankfully there is an example to start with. Since we already have tracing set up, we can simply create the otel_layer
and just add it to the tracing registry:
use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::{prelude::*, EnvFilter};
fn main() {
let exporter = opentelemetry_stdout::LogExporter::default();
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
.with_resource(
Resource::builder()
.with_service_name("log-appender-tracing-example")
.build(),
)
.with_simple_exporter(exporter)
.build();
let filter_otel = EnvFilter::new("info")
.add_directive("hyper=off".parse().unwrap())
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel);
tracing_subscriber::registry()
.with(otel_layer)
.init();
error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message");
let _ = provider.shutdown();
}
We put this together and run it, but we quickly stumble upon the compiler error:
error[E0277]: the trait bound `opentelemetry_stdout::LogExporter: opentelemetry_sdk::logs::LogExporter` is not satisfied
--> src/main.rs:14:31
For a Rust newbie I start sweating a bit. First I switch to the opentelemetry_otlp::LogExporter
and see if this gets me further. The default()
constructor is not there anymore, so I go for the builder instead per my IDEs suggestion.
diff --git a/src/main.rs b/src/main.rs
index 1af9407..67b10b1 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,10 +1,11 @@
use opentelemetry_appender_tracing::layer;
+use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::{prelude::*, EnvFilter};
fn main() {
- let exporter = opentelemetry_stdout::LogExporter::default();
+ let exporter = LogExporter::builder().with_http().build().unwrap();
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
.with_resource(
Resource::builder()
We go ahead with running the program and it successfully completes. No error message in our stdout though so let's check Kibana.
And voilà! We have our first log entry! On the side we can see all the details of the event:
It is interesting to note a few details here. First, the index is automatically created for us on today's date. Second, we can see the service environment attribute that we added to the resource processor. Finally, we can see that each parameter of our log entry is indexed separately unlocking many search capabilities.
Moving to production
Our little POC is working locally, so it is time to make this fly. We are first going to test with pushing our logs to the local exporter which in turn pushes it to the remote Elasticsearch instance. Then we deploy our collector and let our application push its logs to it.
Deploying Elasticsearch
We went for deploying a cloud instance of Elasticsearch offered from their website, however as with the local instance, this setup would work with any Elasticsearch instance. First We need to add the basicauth extension and provide our credentials to it along with the remote Elasticsearch endpoint.
diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml
index b2a3411..9b45f97 100644
--- a/otel-collector-config.yaml
+++ b/otel-collector-config.yaml
@@ -18,11 +18,20 @@ processors:
exporters:
elasticsearch/logs:
- endpoints: ["http://elasticsearch:9200"]
+ endpoint: "https://****.kb.****.es.io:443"
+ auth:
+ authenticator: basicauth
logstash_format:
enabled: true
+extensions:
+ basicauth:
+ client_auth:
+ username: *******
+ password: *******
+
service:
+ extensions: [basicauth]
pipelines:
logs:
receivers: [otlp]
We wait hoping to see the first log appearing in our remote instance, but instead we get the following error in the collector's logs:
2025-04-30T09:57:26.454836419Z 2025-04-30T09:57:26.454Z error elasticsearchexporter@v0.121.0/bulkindexer.go:346 bulk indexer flush error {"otelcol.component.id": "elasticsearch/logs", "otelcol.component.kind": "Exporter", "otelcol.signal": "logs", "error": "flush failed (404): "}
flush failed (404)
This can't be right. Having a closer look though I spot a little .kb.
as part of the subdomain of the Elasticsearch dashboard used in our configuration. Clearly this is not the elasticsearch endpoint but the kibana one. Looking into the cloud dashboard of Elastic and I find a place to copy the URL for Elasticsearch from the deployment's management page:
Funny enough, the only bit that I had to change was .kb.
to .es.
.
diff --git a/otel-collector-config.yaml b/otel-collector-config.yaml
index 9b45f97..f8f9208 100644
--- a/otel-collector-config.yaml
+++ b/otel-collector-config.yaml
@@ -18,7 +18,7 @@ processors:
exporters:
elasticsearch/logs:
- endpoint: "https://*****.kb.****.es.io:443"
+ endpoint: "https://*****.es.****.es.io:443"
auth:
authenticator: basicauth
logstash_format:
We try another run of our service and successfully see our log entry in the remote Elasticsearch instance!
There is an interesting sidenote here. Instead of the basic auth used in the above example one can also use API keys. At the date of writing, the documentation mentions the api_key
option, which was my initial approach, but since there is no example of how to use it, or a complete schema definition, the position of this setting quickly became very elusive. The unorthodox way I managed to get it to work was by following the Kibana onboarding which in turn provides you with zip with some pre-configured settings and a new API key to use with. Eventually, the setting is defined right next to the endpoint:
exporters:
elasticsearch/logs:
endpoint: "https://*****.es.****.es.io:443"
api_key: "BASE64_ENCODED_API_KEY"
...
More API keys can be created from within Kibana under the security section of the stack management page. This is important since in the case of the cloud offering, it should not be confused with any key management settings provided on that console instead.
Deploying OpenTelemetry Collector
Since the collector is a stateless container, we can easily deploy it anywhere by using the upstream docker image and attaching the complete configuration file to it as a secret considering the sensitive values defined in it. We used Google Cloud's CloudRun service to deploy the collector, which is a serverless platform provided by Google based on Knative. The following is a sample knative service definition that deploys the collector so make sure you adapt to your needs.
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: opentelemetry-collector
namespace: your-namespace # Replace with your target namespace
labels:
app: opentelemetry-collector
annotations:
# Add any relevant Knative or general annotations here
# e.g., autoscaling.knative.dev/minScale: "1"
autoscaling.knative.dev/maxScale: "5"
spec:
template:
metadata:
labels:
app: opentelemetry-collector
annotations:
# Add any relevant pod-level annotations here
# e.g., sidecar.istio.io/inject: "true" if using Istio
spec:
containerConcurrency: 80
timeoutSeconds: 300
containers:
- name: opentelemetry-collector-container
image: otel/opentelemetry-collector-contrib:latest
args:
- --config=/etc/otel/otel-collector-config.yaml
ports:
- name: otlp-grpc
containerPort: 4317
- name: otlp-http
containerPort: 4318
- name: health-check
containerPort: 13133
env:
- name: SERVICE_NAME
value: your-application-name
- name: DEPLOYMENT_ENVIRONMENT
value: your-environment
resources:
limits:
cpu: "1"
memory: 512Mi
requests:
cpu: 250m
memory: 256Mi
volumeMounts:
- name: otel-collector-config-vol
mountPath: /etc/otel
startupProbe:
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 3
tcpSocket:
port: 13133
volumes:
- name: otel-collector-config-vol
secret:
secretName: opentelemetry-collector-config # Name of the K8s secret holding the config
items:
- key: otel-collector-config.yaml # The key in the secret data
path: otel-collector-config.yaml # The filename to mount
traffic:
- percent: 100
latestRevision: true
Our final OTEL configuration looks like this:
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
memory_limiter:
check_interval: 1s
limit_mib: 1000
resource:
attributes:
- key: service.environment
value: ${SERVICE_ENVIRONMENT}
action: upsert
- key: service.name
value: ${SERVICE_NAME}
action: upsert
exporters:
elasticsearch/logs:
endpoint: "your-elasticsearch-endpoint:port"
api_key: ${ES_API_KEY}
logstash_format:
enabled: true
service:
pipelines:
logs:
receivers: [otlp]
processors: [batch, memory_limiter, resource]
exporters: [elasticsearch/logs]
With this, we deployed our collector remotely as well. Now we need to point our application to it.
Pushing logs to the remote Collector
Let's go back to our application and define the remote collector endpoint. Looking at the docs this is not clearly stated. After a bit of digging, I offload the task to AI and it comes back with the following:
- use the environment variable
OTEL_EXPORTER_OTLP_ENDPOINT
export OTEL_EXPORTER_OTLP_ENDPOINT="http://your-collector-endpoint:4318"
cargo run
- or use the
.with_endpoint
method of the LogExporterBuilder
diff --git a/src/main.rs b/src/main.rs
index 67b10b1..f08ff26 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,11 +1,17 @@
use opentelemetry_appender_tracing::layer;
-use opentelemetry_otlp::LogExporter;
+use opentelemetry_otlp::{LogExporter, Protocol, WithExportConfig};
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::{prelude::*, EnvFilter};
fn main() {
- let exporter = LogExporter::builder().with_http().build().unwrap();
+ let exporter = LogExporter::builder()
+ .with_http()
+ .with_endpoint("http://your-collector-endpoint:4318/v1/logs")
+ .with_protocol(Protocol::HttpJson) // or Protocol::Grpc
+ .build()
+ .unwrap();
+
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
.with_resource(
Resource::builder()
Note an important difference between the above; while the environment variable is good to go without setting any PATH in the URL, the with_endpoint
method requires the /v1/logs/
PATH to be appended.
Here is the complete Rust code:
use opentelemetry_appender_tracing::layer;
use opentelemetry_otlp::{LogExporter, Protocol, WithExportConfig};
use opentelemetry_sdk::{logs::SdkLoggerProvider, Resource};
use tracing::error;
use tracing_subscriber::{prelude::*, EnvFilter};
fn main() {
let exporter = LogExporter::builder()
.with_http()
.with_endpoint("http://your-collector-endpoint:4318/v1/logs")
.with_protocol(Protocol::HttpJson) // or Protocol::Grpc
.build()
.unwrap();
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
.with_resource(
Resource::builder()
.with_service_name("log-appender-tracing-example")
.build(),
)
.with_simple_exporter(exporter)
.build();
let filter_otel = EnvFilter::new("info")
.add_directive("hyper=off".parse().unwrap())
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel);
tracing_subscriber::registry()
.with(otel_layer)
.init();
error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io", message = "This is an example message");
let _ = provider.shutdown();
}
Wrapping up
With the solutions provided by the OpenTelemetry project, we could easily put the pieces together and get our application logging to Elasticsearch. While both the opentelemetry-rust SDK and the opentelemetry-collector exporter for Elasticsearch are in early stages of development, they are already bring a lot of value. the documentation can be improved for both, but with some AI agent doing some research in the codebase of the projects we managed to extract the information we need.
If you have any questions, comments or suggestions, please add to the discussion below!
Top comments (0)