DEV Community

Srikant V S
Srikant V S

Posted on

From Chaos to Clarity: Building a Robust Log Aggregation Pipeline with Kafka (KRaft) and ELK

From Chaos to Clarity: Building a Robust Log Aggregation Pipeline with Kafka (KRaft) and ELK

As systems grow into microservices, logs become fragmented. Debugging an issue often means SSH-ing into multiple containers or Grepping through scattered files. This is where Centralized Log Aggregation becomes non-negotiable.

In this guide, we are going to build a production-grade logging pipeline from scratch. We will decouple log shipping from log indexing using Apache Kafka (running in the modern, Zookeeper-less KRaft mode), process data with Logstash, and visualize it all in Kibana.

Whether you are a Junior Developer looking to understand the full flow or a Principal Engineer architecting a decoupled observability layer, this setup has something for you.

🏗️ High-Level Architecture

  1. Source: Node.js Microservices (simulating traffic).
  2. Shipper: Filebeat tails the log files and pushes them to Kafka.
  3. Buffer: Kafka Cluster (3 Brokers + 1 Controller) queues the logs.
  4. Processor: Logstash consumes from Kafka, transforms JSON, and pushes to Elasticsearch.
  5. Storage & View: Elasticsearch indexes the data; Kibana visualizes it.

Phase 1: The Backbone — Setting up the Kafka Cluster (KRaft Mode)

We are skipping Zookeeper and using Kafka's new KRaft mode for metadata management. We will set up a cluster with 1 Controller and 3 Brokers.

1. Configuration

Download Kafka and navigate to your kafka/config directory. You will need to create/modify four specific properties files.

A. The Controller (controller.properties)
This node manages the cluster state.

process.roles=controller
node.id=1
controller.quorum.bootstrap.servers=localhost:9093
listeners=CONTROLLER://localhost:9093
log.dirs=/tmp/kraft-controller-logs

Enter fullscreen mode Exit fullscreen mode

(Note: Ensure node.id and process.roles are distinct for the controller).

B. Broker 1 (broker-1.properties)

node.id=2
process.roles=broker
controller.quorum.bootstrap.servers=localhost:9093
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-1

Enter fullscreen mode Exit fullscreen mode

C. Broker 2 (broker-2.properties)
Note the different ports (9095) to run on the same machine.

node.id=3
process.roles=broker
controller.quorum.bootstrap.servers=localhost:9093
listeners=PLAINTEXT://localhost:9095
advertised.listeners=PLAINTEXT://localhost:9095
log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-2

Enter fullscreen mode Exit fullscreen mode

D. Broker 3 (broker-3.properties)
Running on port 9096.

node.id=4
process.roles=broker
controller.quorum.bootstrap.servers=localhost:9093
listeners=PLAINTEXT://localhost:9096
advertised.listeners=PLAINTEXT://localhost:9096
log.dirs=/workspaces/codespaces-blank/tmp/kraft-broker-logs-3

Enter fullscreen mode Exit fullscreen mode

2. Initialization & Startup

First, generate a unique Cluster UUID and format the storage directories.

# Generate UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

# Format Log Directories
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/controller.properties
bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-1.properties --no-initial-controllers
bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-2.properties --no-initial-controllers
bin/kafka-storage.sh format --cluster-id $KAFKA_CLUSTER_ID --config config/broker-3.properties --no-initial-controllers

Enter fullscreen mode Exit fullscreen mode

Now, start them up in separate terminals:

bin/kafka-server-start.sh config/controller.properties
bin/kafka-server-start.sh config/broker-1.properties
bin/kafka-server-start.sh config/broker-2.properties
bin/kafka-server-start.sh config/broker-3.properties

Enter fullscreen mode Exit fullscreen mode

To verify our cluster is healthy, we can use Kafbat UI (formerly Kafka UI) running on port 8080.

Above: Our local cluster with 3 brokers fully operational.


Phase 2: The Source — Generating "Fake" Microservice Logs

To test a pipeline, we need data. I created a Node.js simulator using winston that generates structured JSON logs for fictitious services (Auth, Payments, Orders, etc.).

The Simulator (microservice.js)

The script randomizes HTTP status codes, latency, and log levels to mimic real-world traffic.

Key Code Snippet (Structured JSON):

/**
 * fake-logs.js
 *
 * Usage:
 *   npm init -y
 *   npm i winston
 *   node fake-logs.js
 *
 * Environment variables:
 *   MICROSERVICES            (default: 6)         -> number of services to simulate
 *   SERVICE_NAMES            (optional)           -> comma separated names, overrides MICROSERVICES
 *   LOG_FREQ_MS              (default: 800)       -> avg ms between logs per service (randomized)
 *   LOG_LEVEL                (default: info)      -> winston log level
 *   LOG_DIR                  (default: ./logs)    -> directory for per-service files
 *   ERROR_RATE               (default: 0.08)      -> fraction of logs that are errors (0..1)
 *   TRACE_RATE               (default: 0.02)      -> fraction of logs that include stack traces
 *   RANDOM_SEED              (optional)           -> for deterministic randomness if desired
 *
 * Example:
 *   MICROSERVICES=6 LOG_FREQ_MS=500 LOG_LEVEL=debug node fake-logs.js
 */

const fs = require('fs');
const path = require('path');
const { createLogger, format, transports } = require('winston');

const env = process.env;

const DEFAULTS = {
  MICROSERVICES: 6,
  LOG_FREQ_MS: 800,
  LOG_LEVEL: 'info',
  LOG_DIR: './logs',
  ERROR_RATE: 0.08,
  TRACE_RATE: 0.02
};

const MICRO = parseInt(env.MICROSERVICES || DEFAULTS.MICROSERVICES, 10);
const LOG_FREQ_MS = parseInt(env.LOG_FREQ_MS || DEFAULTS.LOG_FREQ_MS, 10);
const LOG_LEVEL = env.LOG_LEVEL || DEFAULTS.LOG_LEVEL;
const LOG_DIR = env.LOG_DIR || DEFAULTS.LOG_DIR;
const ERROR_RATE = parseFloat(env.ERROR_RATE ?? DEFAULTS.ERROR_RATE);
const TRACE_RATE = parseFloat(env.TRACE_RATE ?? DEFAULTS.TRACE_RATE);

if (!fs.existsSync(LOG_DIR)) fs.mkdirSync(LOG_DIR, { recursive: true });

const providedNames = env.SERVICE_NAMES ? env.SERVICE_NAMES.split(',').map(s => s.trim()).filter(Boolean) : [];
const serviceNames = providedNames.length ? providedNames :
  Array.from({ length: MICRO }).map((_, i) => `service-${i + 1}`);

// helper deterministic-ish RNG if seed provided
let rnd = Math.random;
if (env.RANDOM_SEED) {
  let seed = parseInt(env.RANDOM_SEED, 10) || 1;
  rnd = () => {
    // xorshift32
    seed ^= seed << 13;
    seed ^= seed >>> 17;
    seed ^= seed << 5;
    // normalize
    return (seed >>> 0) / 0xffffffff;
  };
}

function randomInt(min, max) { return Math.floor(rnd() * (max - min + 1)) + min; }
function pick(arr) { return arr[Math.floor(rnd() * arr.length)]; }

// sample random generators
const userIds = Array.from({ length: 50 }).map((_, i) => `user-${1000 + i}`);
const endpoints = ['/api/v1/login', '/api/v1/orders', '/api/v1/products', '/api/v1/cart', '/api/v1/search', '/health'];
const httpMethods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH'];
const messages = [
  'cache miss',
  'cache hit',
  'db query success',
  'db connection timeout',
  'order processed',
  'payment failed',
  'auth success',
  'token expired',
  'validation error',
  'background job completed'
];

function createServiceLogger(name) {
  const filename = path.join(LOG_DIR, `service-${name}.log`);
  const logger = createLogger({
    level: LOG_LEVEL,
    format: format.combine(
      format.timestamp(),
      // structured JSON — easy to ship to Kafka/Logstash
      format.printf(info => {
        // ensure message is JSON-friendly
        const base = {
          timestamp: info.timestamp,
          service: name,
          level: info.level,
          message: info.message,
          meta: info.meta || {}
        };
        return JSON.stringify(base);
      })
    ),
    transports: [
      new transports.Console({ stderrLevels: ['error', 'warn'] }),
      new transports.File({ filename, maxsize: 10 * 1024 * 1024 }) // 10MB (no rotation lib required)
    ]
  });

  return logger;
}

function generateLogForService(logger, name) {
  // Simulate either HTTP access log, app event, or error
  const isHttp = rnd() < 0.7;
  const levelRoll = rnd();
  const isError = levelRoll < ERROR_RATE;
  const includeTrace = rnd() < TRACE_RATE;

  if (isHttp) {
    const method = pick(httpMethods);
    const endpoint = pick(endpoints);
    const latency = randomInt(5, 1200); // ms
    const status = isError ? pick([500, 502, 503, 504, 400]) : (latency > 1000 ? 504 : (rnd() < 0.9 ? 200 : 201));
    const user = pick(userIds);

    const msg = `${method} ${endpoint} ${status} ${latency}ms`;
    logger.log({
      level: isError ? 'error' : 'info',
      message: msg,
      meta: {
        type: 'http',
        method,
        endpoint,
        status,
        latency,
        userId: user,
        requestId: `r-${Date.now().toString(36)}-${randomInt(1000,9999)}`
      }
    });
  } else {
    // app event
    const msg = pick(messages);
    const severity = isError ? 'error' : (rnd() < 0.2 ? 'warn' : 'info');

    const meta = {
      type: 'app',
      event: msg,
      userId: rnd() < 0.5 ? pick(userIds) : undefined,
      correlationId: `c-${randomInt(100000, 999999)}`
    };

    if (includeTrace) {
      meta.stack = `Error: ${msg} at /app/module.js:${randomInt(10,200)}:${randomInt(2,80)}\n    at processTicksAndRejections (internal/process/task_queues.js:93:5)`;
    }

    logger.log({
      level: severity,
      message: msg,
      meta
    });
  }
}

// create all service loggers
const services = serviceNames.map(name => {
  const logger = createServiceLogger(name);
  return { name, logger };
});

// main loop: each service emits logs at random-ish intervals
services.forEach(({ name, logger }) => {
  // stagger start
  setTimeout(() => {
    (function emitLoop() {
      try {
        generateLogForService(logger, name);
      } catch (err) {
        // Logging errors shouldn't crash the whole simulator
        console.error(`[sim-error] ${name} logger crashed:`, err && err.stack ? err.stack : err);
      }
      // random jitter around configured frequency
      const jitter = Math.floor((rnd() - 0.5) * 0.6 * LOG_FREQ_MS);
      const next = Math.max(100, LOG_FREQ_MS + jitter);
      setTimeout(emitLoop, next);
    })();
  }, randomInt(0, LOG_FREQ_MS));
});

// simple status output so user knows it's running
console.log(JSON.stringify({
  event: 'fake-logs-started',
  services: serviceNames,
  logDir: LOG_DIR,
  now: new Date().toISOString(),
  notes: 'Structured JSON logs written to console and files per service. Tail logs and push to Kafka/Logstash for ELK.'
}));
Enter fullscreen mode Exit fullscreen mode

Our package.json looks like this.

{
  "name": "microservices",
  "version": "1.0.0",
  "description": "",
  "main": "microservice.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "winston": "^3.18.3"
  }
}
Enter fullscreen mode Exit fullscreen mode

Our Dockerfile looks like this, we will use this in our Docker Compose

FROM node:22-alpine
WORKDIR /app

COPY package*.json .
RUN npm install
COPY microservice.js .

CMD [ "node", "microservice.js"]
Enter fullscreen mode Exit fullscreen mode

Orchestration with Docker Compose

We spin up multiple containers sharing a common volume for logs.

services:
  ms-generic-service:
    image: base-microservice
    build: ./microservices
    container_name: ms-generic-service
    environment:
      - SERVICE_NAMES=generic-service
    volumes:
      - /workspaces/codespaces-blank/logs:/app/logs
  ms-auth-service:
    image: base-microservice
    container_name: ms-auth-service
    environment:
      - SERVICE_NAMES=auth-service
    volumes:
      - /workspaces/codespaces-blank/logs:/app/logs
  ms-payments-service:
    image: base-microservice
    container_name: ms-payments-service
    environment:
      - SERVICE_NAMES=payments-service
    volumes:
      - /workspaces/codespaces-blank/logs:/app/logs
  ms-orders-service:
    image: base-microservice
    container_name: ms-orders-service
    environment:
      - SERVICE_NAMES=orders-service
    volumes:
      - /workspaces/codespaces-blank/logs:/app/logs
  ms-catalog-service:
    image: base-microservice
    container_name: ms-catalog-service
    environment:
      - SERVICE_NAMES=catalog-service
    volumes:
      - /workspaces/codespaces-blank/logs:/app/logs
Enter fullscreen mode Exit fullscreen mode

Run the stack:

docker compose up

Enter fullscreen mode Exit fullscreen mode

Phase 3: The Shipper — Filebeat to Kafka

Instead of writing directly to Logstash, we use Filebeat. Its lightweight and handles backpressure well.

Configuration (filebeat.yml)
We configure it to read the logs generated by our Docker containers and output directly to our Kafka broker.

filebeat.inputs:
  - type: filestream
    id: my-filestream-id
    paths:
      - /workspaces/codespaces-blank/logs/*.log

output.kafka:
  hosts: ["localhost:9092"]
  topic: 'logs'
  partition.round_robin:
    reachable_only: false
  required_acks: 1

Enter fullscreen mode Exit fullscreen mode

Start Filebeat:

sudo ./filebeat -e

Enter fullscreen mode Exit fullscreen mode

Checking Kafbat UI again, we can now see messages flooding into the logs topic!

Above: JSON logs arriving in the Kafka topic.


Phase 4: The Processor — Logstash & ELK

Now for the heavy lifting. We need to extract the data from Kafka and push it into Elasticsearch.

1. Start Elasticsearch & Kibana

Using Docker Compose, spin up the standard ELK stack.

(Note: Kibana runs on port 5601).

At this stage, Index Management in Kibana will show No Indices because we haven't processed anything yet.

2. Configure Logstash

We create a pipeline (logstash-sample.conf) that acts as a consumer group for Kafka.

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["logs"]
    codec => "json"
    consumer_threads => 10
    group_id => "logstashmy"      # new group id to avoid old offsets
    auto_offset_reset => "earliest"    # read from beginning if no offsets
  }

}

 filter {
    json {
      source => "message"
    }

    mutate {
    remove_field =>["@metadata","input","ecs","host","agent","log","stream"]
  }

  date {
    match => ["@timestamp","ISO8601"]
  }

  if [level] == "error" {
    mutate {
      add_tag => ["error"]
    }
  }
  }


output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "5LaAci5P"
  }
}
Enter fullscreen mode Exit fullscreen mode

Run Logstash:

bin/logstash -f config/logstash-sample.conf

Enter fullscreen mode Exit fullscreen mode

Phase 5: Visualization & Analysis

Once Logstash starts running, head back to Kibana.

  1. Index Management: You will now see the logs-2026.01.16 index created and populated.

  1. Discover: The raw data is now searchable. You can filter by service, level, or latency.

  1. Dashboards: Finally, we can build a dashboard to monitor our ecosystem.
  2. Pie Chart: Distribution of logs by Service.
  3. Bar Chart: HTTP Methods over time.
  4. Heatmap: Error rates per service.

Above: A complete view of our microservices' health.

Conclusion

We just built a fully decoupled logging architecture. By placing Kafka in the middle, we ensured that if Elasticsearch goes down for maintenance, our logs are safe in the Kafka queue, waiting to be replayed.

This is the standard pattern for high-scale observability.

Top comments (0)