From Chaos to Clarity: Building a Robust Log Aggregation Pipeline with Kafka (KRaft) and ELK
As systems grow into microservices, logs become fragmented. Debugging an issue often means SSH-ing into multiple containers or Grepping through scattered files. This is where Centralized Log Aggregation becomes non-negotiable.
In this guide, we are going to build a production-grade logging pipeline from scratch. We will decouple log shipping from log indexing using Apache Kafka (running in the modern, Zookeeper-less KRaft mode), process data with Logstash, and visualize it all in Kibana.
Whether you are a Junior Developer looking to understand the full flow or a Principal Engineer architecting a decoupled observability layer, this setup has something for you.
🏗️ High-Level Architecture
- Source: Node.js Microservices (simulating traffic).
- Shipper: Filebeat tails the log files and pushes them to Kafka.
- Buffer: Kafka Cluster (3 Brokers + 1 Controller) queues the logs.
- Processor: Logstash consumes from Kafka, transforms JSON, and pushes to Elasticsearch.
- Storage & View: Elasticsearch indexes the data; Kibana visualizes it.
Phase 1: The Backbone — Setting up the Kafka Cluster (KRaft Mode)
We are skipping Zookeeper and using Kafka's new KRaft mode for metadata management. We will set up a cluster with 1 Controller and 3 Brokers.
1. Configuration
Download Kafka and navigate to your kafka/config directory. You will need to create/modify four specific properties files.
A. The Controller (controller.properties)
This node manages the cluster state.
process.roles=controller
node.id=1
controller.quorum.bootstrap.servers=localhost:9093
listeners=CONTROLLER://localhost:9093
log.dirs=/tmp/kraft-controller-logs
(Note: Ensure node.id and process.roles are distinct for the controller).
B. Broker 1 (broker-1.properties)
node.id=2
process.roles=broker
controller.quorum.bootstrap.servers=localhost:9093
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-1
C. Broker 2 (broker-2.properties)
Note the different ports (9095) to run on the same machine.
node.id=3
process.roles=broker
controller.quorum.bootstrap.servers=localhost:9093
listeners=PLAINTEXT://localhost:9095
advertised.listeners=PLAINTEXT://localhost:9095
log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-2
D. Broker 3 (broker-3.properties)
Running on port 9096.
node.id=4
process.roles=broker
controller.quorum.bootstrap.servers=localhost:9093
listeners=PLAINTEXT://localhost:9096
advertised.listeners=PLAINTEXT://localhost:9096
log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-3
2. Initialization & Startup
First, generate a unique Cluster UUID and format the storage directories.
# Generate UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
# Format Log Directories
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/controller.properties
bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-1.properties --no-initial-controllers
bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-2.properties --no-initial-controllers
bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-3.properties --no-initial-controllers
Now, start them up in separate terminals:
bin/kafka-server-start.sh config/controller.properties
bin/kafka-server-start.sh config/broker-1.properties
bin/kafka-server-start.sh config/broker-2.properties
bin/kafka-server-start.sh config/broker-3.properties
To verify our cluster is healthy, we can use Kafbat UI (formerly Kafka UI) running on port 8080.
Above: Our local cluster with 3 brokers fully operational.
Phase 2: The Source — Generating "Fake" Microservice Logs
To test a pipeline, we need data. I created a Node.js simulator using winston that generates structured JSON logs for fictitious services (Auth, Payments, Orders, etc.).
The Simulator (microservice.js)
The script randomizes HTTP status codes, latency, and log levels to mimic real-world traffic.
Key Code Snippet (Structured JSON):
/**
* fake-logs.js
*
* Usage:
* npm init -y
* npm i winston
* node fake-logs.js
*
* Environment variables:
* MICROSERVICES (default: 6) -> number of services to simulate
* SERVICE_NAMES (optional) -> comma separated names, overrides MICROSERVICES
* LOG_FREQ_MS (default: 800) -> avg ms between logs per service (randomized)
* LOG_LEVEL (default: info) -> winston log level
* LOG_DIR (default: ./logs) -> directory for per-service files
* ERROR_RATE (default: 0.08) -> fraction of logs that are errors (0..1)
* TRACE_RATE (default: 0.02) -> fraction of logs that include stack traces
* RANDOM_SEED (optional) -> for deterministic randomness if desired
*
* Example:
* MICROSERVICES=6 LOG_FREQ_MS=500 LOG_LEVEL=debug node fake-logs.js
*/
const fs = require('fs');
const path = require('path');
const { createLogger, format, transports } = require('winston');
const env = process.env;
const DEFAULTS = {
MICROSERVICES: 6,
LOG_FREQ_MS: 800,
LOG_LEVEL: 'info',
LOG_DIR: './logs',
ERROR_RATE: 0.08,
TRACE_RATE: 0.02
};
const MICRO = parseInt(env.MICROSERVICES || DEFAULTS.MICROSERVICES, 10);
const LOG_FREQ_MS = parseInt(env.LOG_FREQ_MS || DEFAULTS.LOG_FREQ_MS, 10);
const LOG_LEVEL = env.LOG_LEVEL || DEFAULTS.LOG_LEVEL;
const LOG_DIR = env.LOG_DIR || DEFAULTS.LOG_DIR;
const ERROR_RATE = parseFloat(env.ERROR_RATE ?? DEFAULTS.ERROR_RATE);
const TRACE_RATE = parseFloat(env.TRACE_RATE ?? DEFAULTS.TRACE_RATE);
if (!fs.existsSync(LOG_DIR)) fs.mkdirSync(LOG_DIR, { recursive: true });
const providedNames = env.SERVICE_NAMES ? env.SERVICE_NAMES.split(',').map(s => s.trim()).filter(Boolean) : [];
const serviceNames = providedNames.length ? providedNames :
Array.from({ length: MICRO }).map((_, i) => `service-${i + 1}`);
// helper deterministic-ish RNG if seed provided
let rnd = Math.random;
if (env.RANDOM_SEED) {
let seed = parseInt(env.RANDOM_SEED, 10) || 1;
rnd = () => {
// xorshift32
seed ^= seed << 13;
seed ^= seed >>> 17;
seed ^= seed << 5;
// normalize
return (seed >>> 0) / 0xffffffff;
};
}
function randomInt(min, max) { return Math.floor(rnd() * (max - min + 1)) + min; }
function pick(arr) { return arr[Math.floor(rnd() * arr.length)]; }
// sample random generators
const userIds = Array.from({ length: 50 }).map((_, i) => `user-${1000 + i}`);
const endpoints = ['/api/v1/login', '/api/v1/orders', '/api/v1/products', '/api/v1/cart', '/api/v1/search', '/health'];
const httpMethods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH'];
const messages = [
'cache miss',
'cache hit',
'db query success',
'db connection timeout',
'order processed',
'payment failed',
'auth success',
'token expired',
'validation error',
'background job completed'
];
function createServiceLogger(name) {
const filename = path.join(LOG_DIR, `service-${name}.log`);
const logger = createLogger({
level: LOG_LEVEL,
format: format.combine(
format.timestamp(),
// structured JSON — easy to ship to Kafka/Logstash
format.printf(info => {
// ensure message is JSON-friendly
const base = {
timestamp: info.timestamp,
service: name,
level: info.level,
message: info.message,
meta: info.meta || {}
};
return JSON.stringify(base);
})
),
transports: [
new transports.Console({ stderrLevels: ['error', 'warn'] }),
new transports.File({ filename, maxsize: 10 * 1024 * 1024 }) // 10MB (no rotation lib required)
]
});
return logger;
}
function generateLogForService(logger, name) {
// Simulate either HTTP access log, app event, or error
const isHttp = rnd() < 0.7;
const levelRoll = rnd();
const isError = levelRoll < ERROR_RATE;
const includeTrace = rnd() < TRACE_RATE;
if (isHttp) {
const method = pick(httpMethods);
const endpoint = pick(endpoints);
const latency = randomInt(5, 1200); // ms
const status = isError ? pick([500, 502, 503, 504, 400]) : (latency > 1000 ? 504 : (rnd() < 0.9 ? 200 : 201));
const user = pick(userIds);
const msg = `${method} ${endpoint} ${status} ${latency}ms`;
logger.log({
level: isError ? 'error' : 'info',
message: msg,
meta: {
type: 'http',
method,
endpoint,
status,
latency,
userId: user,
requestId: `r-${Date.now().toString(36)}-${randomInt(1000,9999)}`
}
});
} else {
// app event
const msg = pick(messages);
const severity = isError ? 'error' : (rnd() < 0.2 ? 'warn' : 'info');
const meta = {
type: 'app',
event: msg,
userId: rnd() < 0.5 ? pick(userIds) : undefined,
correlationId: `c-${randomInt(100000, 999999)}`
};
if (includeTrace) {
meta.stack = `Error: ${msg} at /app/module.js:${randomInt(10,200)}:${randomInt(2,80)}\n at processTicksAndRejections (internal/process/task_queues.js:93:5)`;
}
logger.log({
level: severity,
message: msg,
meta
});
}
}
// create all service loggers
const services = serviceNames.map(name => {
const logger = createServiceLogger(name);
return { name, logger };
});
// main loop: each service emits logs at random-ish intervals
services.forEach(({ name, logger }) => {
// stagger start
setTimeout(() => {
(function emitLoop() {
try {
generateLogForService(logger, name);
} catch (err) {
// Logging errors shouldn't crash the whole simulator
console.error(`[sim-error] ${name} logger crashed:`, err && err.stack ? err.stack : err);
}
// random jitter around configured frequency
const jitter = Math.floor((rnd() - 0.5) * 0.6 * LOG_FREQ_MS);
const next = Math.max(100, LOG_FREQ_MS + jitter);
setTimeout(emitLoop, next);
})();
}, randomInt(0, LOG_FREQ_MS));
});
// simple status output so user knows it's running
console.log(JSON.stringify({
event: 'fake-logs-started',
services: serviceNames,
logDir: LOG_DIR,
now: new Date().toISOString(),
notes: 'Structured JSON logs written to console and files per service. Tail logs and push to Kafka/Logstash for ELK.'
}));
Our package.json looks like this.
{
"name": "microservices",
"version": "1.0.0",
"description": "",
"main": "microservice.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"winston": "^3.18.3"
}
}
Our Dockerfile looks like this, we will use this in our Docker Compose
FROM node:22-alpine
WORKDIR /app
COPY package*.json .
RUN npm install
COPY microservice.js .
CMD [ "node", "microservice.js"]
Orchestration with Docker Compose
We spin up multiple containers sharing a common volume for logs.
services:
ms-generic-service:
image: base-microservice
build: ./microservices
container_name: ms-generic-service
environment:
- SERVICE_NAMES=generic-service
volumes:
- /workspaces/codespaces-blank/logs:/app/logs
ms-auth-service:
image: base-microservice
container_name: ms-auth-service
environment:
- SERVICE_NAMES=auth-service
volumes:
- /workspaces/codespaces-blank/logs:/app/logs
ms-payments-service:
image: base-microservice
container_name: ms-payments-service
environment:
- SERVICE_NAMES=payments-service
volumes:
- /workspaces/codespaces-blank/logs:/app/logs
ms-orders-service:
image: base-microservice
container_name: ms-orders-service
environment:
- SERVICE_NAMES=orders-service
volumes:
- /workspaces/codespaces-blank/logs:/app/logs
ms-catalog-service:
image: base-microservice
container_name: ms-catalog-service
environment:
- SERVICE_NAMES=catalog-service
volumes:
- /workspaces/codespaces-blank/logs:/app/logs
Run the stack:
docker compose up
Phase 3: The Shipper — Filebeat to Kafka
Instead of writing directly to Logstash, we use Filebeat. Its lightweight and handles backpressure well.
Configuration (filebeat.yml)
We configure it to read the logs generated by our Docker containers and output directly to our Kafka broker.
filebeat.inputs:
- type: filestream
id: my-filestream-id
paths:
- /workspaces/codespaces-blank/logs/*.log
output.kafka:
hosts: ["localhost:9092"]
topic: 'logs'
partition.round_robin:
reachable_only: false
required_acks: 1
Start Filebeat:
sudo ./filebeat -e
Checking Kafbat UI again, we can now see messages flooding into the logs topic!
Above: JSON logs arriving in the Kafka topic.
Phase 4: The Processor — Logstash & ELK
Now for the heavy lifting. We need to extract the data from Kafka and push it into Elasticsearch.
1. Start Elasticsearch & Kibana
Using Docker Compose, spin up the standard ELK stack.
(Note: Kibana runs on port 5601).
At this stage, Index Management in Kibana will show No Indices because we haven't processed anything yet.
2. Configure Logstash
We create a pipeline (logstash-sample.conf) that acts as a consumer group for Kafka.
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["logs"]
codec => "json"
consumer_threads => 10
group_id => "logstashmy" # new group id to avoid old offsets
auto_offset_reset => "earliest" # read from beginning if no offsets
}
}
filter {
json {
source => "message"
}
mutate {
remove_field =>["@metadata","input","ecs","host","agent","log","stream"]
}
date {
match => ["@timestamp","ISO8601"]
}
if [level] == "error" {
mutate {
add_tag => ["error"]
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "5LaAci5P"
}
}
Run Logstash:
bin/logstash -f config/logstash-sample.conf
Phase 5: Visualization & Analysis
Once Logstash starts running, head back to Kibana.
-
Index Management: You will now see the
logs-2026.01.16index created and populated.
-
Discover: The raw data is now searchable. You can filter by
service,level, orlatency.
- Dashboards: Finally, we can build a dashboard to monitor our ecosystem.
- Pie Chart: Distribution of logs by Service.
- Bar Chart: HTTP Methods over time.
- Heatmap: Error rates per service.
Above: A complete view of our microservices' health.
Conclusion
We just built a fully decoupled logging architecture. By placing Kafka in the middle, we ensured that if Elasticsearch goes down for maintenance, our logs are safe in the Kafka queue, waiting to be replayed.
This is the standard pattern for high-scale observability.
Top comments (0)