DEV Community

Aisalkyn Aidarova
Aisalkyn Aidarova

Posted on

Project idea – “Real-Time Orders Platform”

Business story

You’re building a real-time order processing platform for an e-commerce company:

  • Existing Oracle database with orders & customers (in the lab we’ll use PostgreSQL to simulate Oracle).
  • New Couchbase (NoSQL) for fast customer session & cart data (you can simulate with Couchbase or Mongo if easier).
  • Need Confluent Kafka in the middle to stream events:

    • order-service writes new orders to Kafka.
    • payment-service, fraud-service, analytics-service consume.
    • Kafka Connect syncs data from Oracle → Kafka and Kafka → Couchbase.
    • ksqlDB / Kafka Streams does real-time aggregations (e.g., sales per minute, fraud rules).

What this lets you talk about:

  • Kafka architecture & streaming design
  • Confluent components: Brokers, Schema Registry, Connect, ksqlDB, Control Center
  • SQL + NoSQL integration (Oracle/Postgres + Couchbase)
  • Topics, partitions, replication, consumer groups, offsets
  • Reliability, scale, monitoring, security
  • How you “owned” the Confluent environment and became the escalation point

2. High-level architecture

Describe this in interviews like this:

  1. Producers
  • order-service (REST API) → publishes orders events to Kafka.

    1. Kafka / Confluent cluster
  • 3 Kafka brokers (or 1 for lab).

  • Topics:

    • orders (3 partitions, RF=2)
    • payments
    • fraud-alerts
    • order-analytics
  • Schema Registry for Avro/JSON schemas.

    1. Stream processing
  • ksqlDB or Kafka Streams app:

    • joins orders with payments
    • flags potential fraud
    • writes fraud-alerts & order-analytics.
      1. Connectors
  • JDBC Source Connector: Oracle/Postgres → topic legacy_orders.

  • Sink Connector: order-analytics → Couchbase collection.

    1. Consumers
  • payment-service → consumes orders, writes to DB and publishes to payments.

  • fraud-service → consumes orders+payments, publishes to fraud-alerts.

  • analytics-service → consumes orders & writes summary to NoSQL / analytics DB.

    1. Ops
  • Confluent Control Center or CLI tools for monitoring.

  • Basic ACLs / SSL (at least conceptually).


3. Tech stack for the lab

  • Platform: Docker Compose on your Mac
  • Core: Confluent Platform images:

    • zookeeper (or KRaft mode if you want modern setup)
    • kafka brokers
    • schema-registry
    • ksqldb-server + ksqldb-cli
    • connect
    • control-center
  • Databases

    • postgres (simulating Oracle)
    • couchbase (or Mongo if Couchbase is too heavy)
  • Microservices

    • Language you like (Python or Node.js) for producer/consumer services.
  • UI

    • kafdrop or Confluent Control Center to browse topics.

4. Step-by-step project plan

Step 1 – Bring up the Confluent stack with Docker Compose

Goal: show you can set up a Confluent environment from scratch.

  • Create docker-compose.yml with:

    • Zookeeper (optional if not using KRaft)
    • 1–3 Kafka brokers
    • Schema Registry
    • Connect
    • ksqlDB
    • Control Center
    • Postgres
    • Couchbase
  • Verify with:

    • docker ps
    • kafka-topics CLI listing
    • Control Center UI opens in browser.

Interview mapping:
“Tell me about a Confluent environment you set up.”
“How many brokers, what replication factor, how did you run it locally for POCs?”


Step 2 – Design & create Kafka topics

Goal: talk like an architect about topics, partitions & replication.

  • Design topics:

    • orders – 3 partitions, RF=1 or 2 (lab).
    • payments – 3 partitions.
    • fraud-alerts – 1 or 2 partitions.
    • order-analytics – 3 partitions.
  • Use kafka-topics CLI or Control Center to create them.

  • Decide partition key (e.g., order_id or customer_id).

Interview mapping:
“How do you decide number of partitions?”
“How do you handle ordering?”
“What replication factor do you choose and why?”


Step 3 – Implement an order producer service

Goal: show hands-on Kafka client experience.

  • Build order-service:

    • Simple REST endpoint /orders that accepts an order JSON.
    • Validates & publishes to orders topic using Kafka client library.
    • Adds headers (source, correlation-id) to show best practices.
  • Demonstrate:

    • Fire a few orders.
    • Watch them appear in orders topic (Kafdrop or kafka-console-consumer).

Interview mapping:
“Walk me through a producer you wrote.”
“How do you handle retries, acks, idempotence?”


Step 4 – Implement consumer microservices

Goal: talk about consumer groups, scaling, offset management.

  1. payment-service
  • Consumes from orders (group payments-group).
  • “Processes payment” (simulated) and publishes event to payments topic.
  1. fraud-service
  • Consumes from orders & payments (either directly or via order-payments stream later).
  • Simple rule: if amount > X and country is Y → publish alert to fraud-alerts.
  1. analytics-service
  • Consumes from orders & writes to order_analytics table in Postgres (or pushes to order-analytics topic for Connect).

Show:

  • Scaling a consumer group: run 2 instances of payment-service and watch partition assignment change.
  • Show offset lag using kafka-consumer-groups --describe.

Interview mapping:
“How do consumer groups work?”
“What happens when you add/remove consumers?”
“How do you handle reprocessing / replay?”


Step 5 – Integrate with Oracle (Postgres) using Kafka Connect

Goal: show Connect and JDBC connectors.

  • In Postgres create tables:

    • legacy_orders
    • Insert some sample historical orders.
  • Configure JDBC Source Connector:

    • Source: Postgres legacy_orders.
    • Sink: legacy_orders topic.
  • Verify:

    • Rows from DB appear as messages in legacy_orders.

Interview mapping:
“Have you used Kafka Connect?”
“Explain how you brought data from Oracle into Kafka.”
“How do you handle schema changes?”


Step 6 – Sink analytics to Couchbase via Connect

Goal: show Kafka → NoSQL integration.

  • Create Couchbase bucket/collection order_analytics.
  • Configure Sink Connector:

    • Source topic: order-analytics.
    • Target: Couchbase.
  • Verify:

    • Aggregated analytics events appear as documents in Couchbase.

Interview mapping:
“Tell us about integrating Kafka with NoSQL / Couchbase.”
“How did you configure your sink connectors?”
“How do you handle retries / DLQs?”


Step 7 – Stream processing with ksqlDB or Kafka Streams

Goal: cover Kafka Streams / kSQL & streaming architecture.

Using ksqlDB:

  • Define streams:

    • ORDERS_STREAM on orders.
    • PAYMENTS_STREAM on payments.
  • Build a joined stream:

    • ORDERS_WITH_PAYMENTS joining by order_id.
  • Create aggregations:

    • Total sales per country per minute.
    • Count of “suspicious orders” flagged by simple rule.
  • Output results to order-analytics topic (used by sink connector above).

Interview mapping:
“What’s your experience with Kafka Streams / kSQL?”
“How do you build a real-time pipeline end-to-end?”
“How do you design stateful vs stateless processing?”


Step 8 – Schema Registry & message evolution

Goal: talk about schemas, compatibility & governance.

  • Define an Avro or JSON schema for Order and register it in Schema Registry.
  • Configure producer & consumers to use that schema.
  • Demonstrate:

    • Add a new optional field (e.g., promo_code) → show compatibility (backward/forward).
    • Talk about what happens if you make a breaking change.

Interview mapping:
“Have you used Schema Registry?”
“How do you manage schema evolution?”
“How do you avoid breaking consumers?”


Step 9 – Reliability, monitoring & troubleshooting

Goal: show you can be the escalation point for Kafka.

Do some experiments:

  • Kill one consumer instance and watch rebalance.
  • Stop a connector → see lag build up, restart and recover.
  • Configure producer with acks=all, retries, and discuss durability.
  • Use:

    • Control Center dashboards or CLI to check:
    • Consumer lag
    • Broker health
    • Topic throughput

Prepare talking points:

  • How you would monitor in prod (Prometheus/Grafana, alerts on lag, disk, ISR count).
  • Backup & disaster recovery strategy (snapshots, multi-AZ, mirror topics across clusters).

Interview mapping:
“How do you monitor Kafka?”
“What are common failure scenarios?”
“If a consumer is lagging badly, how do you troubleshoot?”


Step 10 – Security & access control (conceptual + minimal lab)

Goal: speak about security architecture even if lab is simple.

In lab (optional but nice):

  • Enable SASL/PLAINTEXT or at least explain how you’d:

    • Use SASL/SCRAM or mTLS for auth.
    • Use ACLs to restrict which services can read/write which topics.
    • Use encryption in transit (TLS) and at rest (disks).

Interview mapping:
“How do you secure a Kafka / Confluent environment?”
“How do you isolate teams & applications?”


Step 11 – Your 2-minute “project story” for interviews

Practice saying something like:

“I recently built a real-time orders platform using Confluent Kafka. The company had an existing Oracle database and was adding Couchbase as a NoSQL store.
I designed the Kafka architecture with multiple topics (orders, payments, fraud-alerts, order-analytics) and set up a Confluent stack with Schema Registry, Connect, and ksqlDB using Docker.
An order-service publishes orders to Kafka; payment-service, fraud-service, and analytics-service consume them in different consumer groups.
I used a JDBC Source Connector to stream historical data from Oracle (simulated with Postgres) into Kafka, and a sink connector to push real-time analytics into Couchbase.
On top of that I used ksqlDB to join orders and payments, detect potential fraud, and compute per-minute sales metrics.
I monitored consumer lag and broker health through Control Center, experimented with failures, and documented how to scale consumers and handle schema evolution using Schema Registry.
This project gave me end-to-end experience as the person owning the Confluent platform and integrating it with both SQL and NoSQL systems.”

1. Project structure

Create a folder, for example:

mkdir kafka-enterprise-orders
cd kafka-enterprise-orders
Enter fullscreen mode Exit fullscreen mode

Inside it, create this structure:

kafka-enterprise-orders/
├── docker-compose.yml
├── .env
├── db/
│   └── init.sql
├── connect/
│   ├── Dockerfile
│   └── connectors/
│       ├── jdbc-source.json
│       └── couchbase-sink.json
├── ksql/
│   └── streams.sql
├── producer/
│   ├── Dockerfile
│   ├── requirements.txt
│   └── order_producer.py
└── consumers/
    ├── fraud-service/
    │   ├── Dockerfile
    │   ├── requirements.txt
    │   └── fraud_consumer.py
    ├── payment-service/
    │   ├── Dockerfile
    │   ├── requirements.txt
    │   └── payment_consumer.py
    └── analytics-service/
        ├── Dockerfile
        ├── requirements.txt
        └── analytics_consumer.py
Enter fullscreen mode Exit fullscreen mode

Now fill each file as below.


2. .env

# Kafka
KAFKA_BROKER=kafka:9092

# Topics
ORDERS_TOPIC=orders
PAYMENTS_TOPIC=payments
FRAUD_ALERTS_TOPIC=fraud-alerts
ORDER_ANALYTICS_TOPIC=order-analytics

# Producer config
ORDER_PRODUCER_INTERVAL_SECONDS=3

# Fraud rules
FRAUD_AMOUNT_THRESHOLD=400
FRAUD_RISKY_COUNTRIES=RU,FR,BR

# Analytics settings
ANALYTICS_PRINT_EVERY=10

# Postgres (simulating Oracle)
POSTGRES_DB=ordersdb
POSTGRES_USER=orders_user
POSTGRES_PASSWORD=orders_pass

# Couchbase (you will set these in UI)
COUCHBASE_HOST=couchbase
COUCHBASE_BUCKET=order_analytics
COUCHBASE_USERNAME=Administrator
COUCHBASE_PASSWORD=password
Enter fullscreen mode Exit fullscreen mode

3. docker-compose.yml

services:
  # ---------------------------
  # Zookeeper & Kafka broker
  # ---------------------------
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.6.1
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  # ---------------------------
  # Schema Registry
  # ---------------------------
  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.1
    container_name: schema-registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"

  # ---------------------------
  # ksqlDB
  # ---------------------------
  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.6.1
    container_name: ksqldb-server
    depends_on:
      - kafka
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
      KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_KSQL_SERVICE_ID: "ksql-service"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.6.1
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  # ---------------------------
  # Kafka Connect (custom image)
  # ---------------------------
  connect:
    build: ./connect
    container_name: connect
    depends_on:
      - kafka
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "_connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "_connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "_connect-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"

  # ---------------------------
  # Confluent Control Center
  # ---------------------------
  control-center:
    image: confluentinc/cp-enterprise-control-center:7.6.1
    container_name: control-center
    depends_on:
      - kafka
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:9092"
      CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
      CONTROL_CENTER_KSQL_KSQLDB-SERVER_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  # ---------------------------
  # Kafdrop
  # ---------------------------
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    depends_on:
      - kafka
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
      SERVER_SERVLET_CONTEXTPATH: "/"

  # ---------------------------
  # PostgreSQL (simulating Oracle)
  # ---------------------------
  postgres:
    image: postgres:15
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - ./db/init.sql:/docker-entrypoint-initdb.d/init.sql

  # ---------------------------
  # Couchbase (single node)
  # ---------------------------
  couchbase:
    image: couchbase:community-7.2.0
    container_name: couchbase
    ports:
      - "8091-8094:8091-8094"
      - "11210:11210"
    environment:
      COUCHBASE_ADMINISTRATOR_USERNAME: ${COUCHBASE_USERNAME}
      COUCHBASE_ADMINISTRATOR_PASSWORD: ${COUCHBASE_PASSWORD}

  # ---------------------------
  # Order producer
  # ---------------------------
  order-producer:
    build: ./producer
    container_name: order-producer
    depends_on:
      - kafka
    environment:
      KAFKA_BROKER: ${KAFKA_BROKER}
      ORDERS_TOPIC: ${ORDERS_TOPIC}
      ORDER_PRODUCER_INTERVAL_SECONDS: ${ORDER_PRODUCER_INTERVAL_SECONDS}

  # ---------------------------
  # Fraud service
  # ---------------------------
  fraud-service:
    build: ./consumers/fraud-service
    container_name: fraud-service
    depends_on:
      - kafka
    environment:
      KAFKA_BROKER: ${KAFKA_BROKER}
      ORDERS_TOPIC: ${ORDERS_TOPIC}
      FRAUD_AMOUNT_THRESHOLD: ${FRAUD_AMOUNT_THRESHOLD}
      FRAUD_RISKY_COUNTRIES: ${FRAUD_RISKY_COUNTRIES}

  # ---------------------------
  # Payment service
  # ---------------------------
  payment-service:
    build: ./consumers/payment-service
    container_name: payment-service
    depends_on:
      - kafka
    environment:
      KAFKA_BROKER: ${KAFKA_BROKER}
      ORDERS_TOPIC: ${ORDERS_TOPIC}
      PAYMENTS_TOPIC: ${PAYMENTS_TOPIC}

  # ---------------------------
  # Analytics service
  # ---------------------------
  analytics-service:
    build: ./consumers/analytics-service
    container_name: analytics-service
    depends_on:
      - kafka
      - couchbase
    environment:
      KAFKA_BROKER: ${KAFKA_BROKER}
      ORDERS_TOPIC: ${ORDERS_TOPIC}
      ORDER_ANALYTICS_TOPIC: ${ORDER_ANALYTICS_TOPIC}
      ANALYTICS_PRINT_EVERY: ${ANALYTICS_PRINT_EVERY}
      COUCHBASE_HOST: ${COUCHBASE_HOST}
      COUCHBASE_BUCKET: ${COUCHBASE_BUCKET}
      COUCHBASE_USERNAME: ${COUCHBASE_USERNAME}
      COUCHBASE_PASSWORD: ${COUCHBASE_PASSWORD}
Enter fullscreen mode Exit fullscreen mode

4. PostgreSQL init script: db/init.sql

CREATE TABLE IF NOT EXISTS legacy_orders (
    id SERIAL PRIMARY KEY,
    order_id INT NOT NULL,
    customer_id INT NOT NULL,
    amount NUMERIC(10,2) NOT NULL,
    currency VARCHAR(10) NOT NULL,
    country VARCHAR(10) NOT NULL,
    created_at TIMESTAMP NOT NULL
);

INSERT INTO legacy_orders (order_id, customer_id, amount, currency, country, created_at) VALUES
(1001, 2001, 123.45, 'USD', 'US', NOW() - INTERVAL '1 day'),
(1002, 2002, 555.00, 'USD', 'FR', NOW() - INTERVAL '2 days'),
(1003, 2003, 75.99, 'USD', 'DE', NOW() - INTERVAL '3 days');
Enter fullscreen mode Exit fullscreen mode

5. Kafka Connect image: connect/Dockerfile

This installs JDBC and Couchbase connectors via Confluent Hub.

FROM confluentinc/cp-kafka-connect:7.6.1

# Install JDBC connector
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest

# Install Couchbase Kafka connector
RUN confluent-hub install --no-prompt couchbase/kafka-connect-couchbase:latest

# Default command
CMD ["bash", "-c", "connect-distributed /etc/kafka/connect-distributed.properties"]
Enter fullscreen mode Exit fullscreen mode

6. Kafka Connect connector configs

6.1 connect/connectors/jdbc-source.json

This pulls from Postgres legacy_orders → Kafka topic legacy_orders.

{
  "name": "jdbc-source-legacy-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres:5432/ordersdb",
    "connection.user": "orders_user",
    "connection.password": "orders_pass",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "table.whitelist": "legacy_orders",
    "topic.prefix": "legacy_",
    "poll.interval.ms": "10000"
  }
}
Enter fullscreen mode Exit fullscreen mode

6.2 connect/connectors/couchbase-sink.json

This writes from topic order-analytics → Couchbase bucket order_analytics.

{
  "name": "couchbase-sink-order-analytics",
  "config": {
    "connector.class": "com.couchbase.connect.kafka.CouchbaseSinkConnector",
    "tasks.max": "1",
    "topics": "order-analytics",
    "couchbase.bootstrap.servers": "couchbase",
    "couchbase.bucket": "order_analytics",
    "couchbase.username": "Administrator",
    "couchbase.password": "password",
    "couchbase.enable.tls": "false",
    "couchbase.document.id": "${topic}-${partition}-${offset}",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}
Enter fullscreen mode Exit fullscreen mode

You’ll POST these JSONs to http://localhost:8083/connectors after everything is up.


7. ksqlDB: ksql/streams.sql

Minimal example: create stream on orders, aggregate sales per country.

-- Run this inside ksqldb-cli (docker exec -it ksqldb-cli /bin/sh then ksql http://ksqldb-server:8088)

CREATE STREAM ORDERS_STREAM (
  order_id INT,
  customer_id INT,
  amount DOUBLE,
  currency VARCHAR,
  country VARCHAR,
  status VARCHAR,
  created_at VARCHAR,
  source VARCHAR
) WITH (
  KAFKA_TOPIC = 'orders',
  VALUE_FORMAT = 'JSON'
);

CREATE TABLE SALES_PER_COUNTRY AS
  SELECT country,
         COUNT(*) AS order_count,
         SUM(amount) AS total_amount
  FROM ORDERS_STREAM
  WINDOW TUMBLING (SIZE 1 MINUTE)
  GROUP BY country
  EMIT CHANGES;

CREATE STREAM ORDER_ANALYTICS_STREAM
  WITH (KAFKA_TOPIC = 'order-analytics', VALUE_FORMAT = 'JSON') AS
  SELECT country,
         COUNT(*) AS order_count,
         SUM(amount) AS total_amount
  FROM ORDERS_STREAM
  WINDOW TUMBLING (SIZE 1 MINUTE)
  GROUP BY country
  EMIT CHANGES;
Enter fullscreen mode Exit fullscreen mode

8. Producer service

8.1 producer/requirements.txt

kafka-python==2.0.2
Enter fullscreen mode Exit fullscreen mode

8.2 producer/Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY order_producer.py .

CMD ["python", "order_producer.py"]
Enter fullscreen mode Exit fullscreen mode

8.3 producer/order_producer.py

import json
import os
import random
import time
from datetime import datetime, timezone

from kafka import KafkaProducer


def get_env(name: str, default: str) -> str:
    return os.environ.get(name, default)


KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")
INTERVAL_SECONDS = float(get_env("ORDER_PRODUCER_INTERVAL_SECONDS", "3"))


def create_producer() -> KafkaProducer:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER,
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
        key_serializer=lambda k: str(k).encode("utf-8"),
    )
    print(f"[order-producer] Connected to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
    return producer


def random_country() -> str:
    countries = ["US", "CA", "GB", "DE", "FR", "RU", "BR", "AU", "IN"]
    return random.choice(countries)


def random_currency() -> str:
    return "USD"


def random_customer_id() -> int:
    return random.randint(1000, 9999)


def random_amount() -> float:
    return round(random.uniform(10, 600), 2)


def random_source() -> str:
    return random.choice(["web", "mobile", "api"])


def generate_order(order_id: int) -> dict:
    now = datetime.now(timezone.utc).isoformat()
    order = {
        "order_id": order_id,
        "customer_id": random_customer_id(),
        "amount": random_amount(),
        "currency": random_currency(),
        "country": random_country(),
        "status": "NEW",
        "created_at": now,
        "source": random_source(),
    }
    return order


def main():
    producer = create_producer()
    order_id = 1

    while True:
        order = generate_order(order_id)
        producer.send(ORDERS_TOPIC, key=order["order_id"], value=order)
        producer.flush()

        print(f"[order-producer] Sent order: {order}")
        order_id += 1
        time.sleep(INTERVAL_SECONDS)


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

9. Fraud service

9.1 consumers/fraud-service/requirements.txt

kafka-python==2.0.2
Enter fullscreen mode Exit fullscreen mode

9.2 consumers/fraud-service/Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY fraud_consumer.py .

CMD ["python", "fraud_consumer.py"]
Enter fullscreen mode Exit fullscreen mode

9.3 consumers/fraud-service/fraud_consumer.py

import json
import os
from typing import List

from kafka import KafkaConsumer


def get_env(name: str, default: str) -> str:
    return os.environ.get(name, default)


KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")

AMOUNT_THRESHOLD = float(get_env("FRAUD_AMOUNT_THRESHOLD", "400"))
RISKY_COUNTRIES_RAW = get_env("FRAUD_RISKY_COUNTRIES", "RU,FR,BR")
RISKY_COUNTRIES: List[str] = [c.strip().upper() for c in RISKY_COUNTRIES_RAW.split(",") if c.strip()]


def create_consumer() -> KafkaConsumer:
    consumer = KafkaConsumer(
        ORDERS_TOPIC,
        bootstrap_servers=KAFKA_BROKER,
        group_id="fraud-service-group",
        value_deserializer=lambda v: json.loads(v.decode("utf-8")),
        key_deserializer=lambda k: int(k.decode("utf-8")) if k else None,
        auto_offset_reset="earliest",
        enable_auto_commit=True,
    )
    print(f"[fraud-service] Connected to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
    print(f"[fraud-service] Amount threshold: {AMOUNT_THRESHOLD}, risky countries: {RISKY_COUNTRIES}")
    return consumer


def is_fraud(order: dict) -> (bool, str):
    amount = order.get("amount", 0)
    country = order.get("country", "").upper()

    if amount >= AMOUNT_THRESHOLD and country in RISKY_COUNTRIES:
        return True, "HIGH_AMOUNT_RISKY_COUNTRY"
    if amount >= AMOUNT_THRESHOLD:
        return True, "HIGH_AMOUNT"
    if country in RISKY_COUNTRIES:
        return True, "RISKY_COUNTRY"

    return False, ""


def main():
    consumer = create_consumer()

    for message in consumer:
        order = message.value
        key = message.key

        print(f"[fraud-service] Received order: key={key}, value={order}")

        flagged, reason = is_fraud(order)
        if flagged:
            alert = {
                "order_id": order.get("order_id"),
                "reason": reason,
                "amount": order.get("amount"),
                "country": order.get("country"),
                "status": "CANCELLED",
            }
            print(f"[fraud-service] FRAUD ALERT: {alert}")
        else:
            print(f"[fraud-service] Order is clean, id={order.get('order_id')}")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

10. Payment service

10.1 consumers/payment-service/requirements.txt

kafka-python==2.0.2
Enter fullscreen mode Exit fullscreen mode

10.2 consumers/payment-service/Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY payment_consumer.py .

CMD ["python", "payment_consumer.py"]
Enter fullscreen mode Exit fullscreen mode

10.3 consumers/payment-service/payment_consumer.py

import json
import os

from kafka import KafkaConsumer, KafkaProducer


def get_env(name: str, default: str) -> str:
    return os.environ.get(name, default)


KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")
PAYMENTS_TOPIC = get_env("PAYMENTS_TOPIC", "payments")


def create_consumer() -> KafkaConsumer:
    consumer = KafkaConsumer(
        ORDERS_TOPIC,
        bootstrap_servers=KAFKA_BROKER,
        group_id="payment-service-group",
        value_deserializer=lambda v: json.loads(v.decode("utf-8")),
        key_deserializer=lambda k: int(k.decode("utf-8")) if k else None,
        auto_offset_reset="earliest",
        enable_auto_commit=True,
    )
    print(f"[payment-service] Connected consumer to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
    return consumer


def create_producer() -> KafkaProducer:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BROKER,
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
        key_serializer=lambda k: str(k).encode("utf-8"),
    )
    print(f"[payment-service] Connected producer to Kafka at {KAFKA_BROKER}, topic '{PAYMENTS_TOPIC}'")
    return producer


def main():
    consumer = create_consumer()
    producer = create_producer()

    for message in consumer:
        order = message.value
        key = message.key

        print(f"[payment-service] Received order: key={key}, value={order}")

        if order.get("status") == "NEW":
            print(f"[payment-service] Processing payment for order {order.get('order_id')} "
                  f"amount={order.get('amount')} {order.get('currency')}")
            payment_event = {
                "order_id": order.get("order_id"),
                "customer_id": order.get("customer_id"),
                "amount": order.get("amount"),
                "currency": order.get("currency"),
                "status": "PAID",
            }
            producer.send(PAYMENTS_TOPIC, key=payment_event["order_id"], value=payment_event)
            producer.flush()
            print(f"[payment-service] Payment successful, event sent: {payment_event}")
        else:
            print(f"[payment-service] Skipping order {order.get('order_id')} with status={order.get('status')}")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

11. Analytics service

This one both prints stats and (optionally) writes to Couchbase directly, so even if Connect is not configured yet you still have Kafka → Couchbase path.

11.1 consumers/analytics-service/requirements.txt

kafka-python==2.0.2
couchbase==4.3.0
Enter fullscreen mode Exit fullscreen mode

11.2 consumers/analytics-service/Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY analytics_consumer.py .

CMD ["python", "analytics_consumer.py"]
Enter fullscreen mode Exit fullscreen mode

11.3 consumers/analytics-service/analytics_consumer.py

import json
import os
from collections import Counter

from kafka import KafkaConsumer
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterTimeoutOptions


def get_env(name: str, default: str) -> str:
    return os.environ.get(name, default)


KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")
PRINT_EVERY = int(get_env("ANALYTICS_PRINT_EVERY", "10"))

COUCHBASE_HOST = get_env("COUCHBASE_HOST", "couchbase")
COUCHBASE_BUCKET = get_env("COUCHBASE_BUCKET", "order_analytics")
COUCHBASE_USERNAME = get_env("COUCHBASE_USERNAME", "Administrator")
COUCHBASE_PASSWORD = get_env("COUCHBASE_PASSWORD", "password")


def create_consumer() -> KafkaConsumer:
    consumer = KafkaConsumer(
        ORDERS_TOPIC,
        bootstrap_servers=KAFKA_BROKER,
        group_id="analytics-service-group",
        value_deserializer=lambda v: json.loads(v.decode("utf-8")),
        key_deserializer=lambda k: int(k.decode("utf-8")) if k else None,
        auto_offset_reset="earliest",
        enable_auto_commit=True,
    )
    print(f"[analytics-service] Connected to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
    return consumer


def create_couchbase_bucket():
    try:
        cluster = Cluster(
            f"couchbase://{COUCHBASE_HOST}",
            ClusterOptions(
                PasswordAuthenticator(COUCHBASE_USERNAME, COUCHBASE_PASSWORD),
                timeout_options=ClusterTimeoutOptions(kv_timeout=10)
            )
        )
        bucket = cluster.bucket(COUCHBASE_BUCKET)
        collection = bucket.default_collection()
        print(f"[analytics-service] Connected to Couchbase bucket '{COUCHBASE_BUCKET}' at {COUCHBASE_HOST}")
        return collection
    except Exception as e:
        print(f"[analytics-service] WARNING: Could not connect to Couchbase: {e}")
        return None


def main():
    consumer = create_consumer()
    collection = create_couchbase_bucket()

    total_orders = 0
    total_amount = 0.0
    orders_by_country = Counter()

    for message in consumer:
        order = message.value
        key = message.key

        total_orders += 1
        total_amount += float(order.get("amount", 0))
        country = order.get("country", "UNKNOWN")
        orders_by_country[country] += 1

        print(f"[analytics-service] Received order: key={key}, value={order}")

        # Optionally store each order document in Couchbase
        if collection is not None:
            doc_id = f"order::{order.get('order_id')}"
            try:
                collection.upsert(doc_id, order)
                print(f"[analytics-service] Stored order in Couchbase with id={doc_id}")
            except Exception as e:
                print(f"[analytics-service] ERROR storing to Couchbase: {e}")

        if total_orders % PRINT_EVERY == 0:
            print("\n[analytics-service] ===== STATS =====")
            print(f"Total orders: {total_orders}")
            avg_amount = total_amount / total_orders if total_orders else 0
            print(f"Total amount: {total_amount:.2f}")
            print(f"Average amount: {avg_amount:.2f}")
            print("Orders by country:")
            for c, count in orders_by_country.items():
                print(f"  {c}: {count}")
            print("[analytics-service] ====================\n")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

12. How to run and test

From the root folder kafka-enterprise-orders:

  1. Build and start everything
   docker-compose up -d --build
Enter fullscreen mode Exit fullscreen mode
  1. Initialize Couchbase once
  • Go to http://localhost:8091 in browser.
  • Follow setup wizard:

    • Username: Administrator
    • Password: password
  • Create a bucket named: order_analytics.

  1. Check UIs
  • Kafdrop: http://localhost:9000
  • Control Center: http://localhost:9021/clusters
  • Schema Registry: http://localhost:8081/subjects
  • ksqlDB server info: http://localhost:8088/info
  • Couchbase UI: http://localhost:8091
  1. Create connectors

In a terminal:

   # JDBC source
   curl -X POST -H "Content-Type: application/json" \
     --data @connect/connectors/jdbc-source.json \
     http://localhost:8083/connectors

   # Couchbase sink
   curl -X POST -H "Content-Type: application/json" \
     --data @connect/connectors/couchbase-sink.json \
     http://localhost:8083/connectors
Enter fullscreen mode Exit fullscreen mode
  1. Check logs
   docker logs -f order-producer
   docker logs -f fraud-service
   docker logs -f payment-service
   docker logs -f analytics-service
Enter fullscreen mode Exit fullscreen mode

You now have:

  • Orders streaming into Kafka.
  • Fraud, payment, analytics services consuming.
  • Postgres simulating Oracle with CDC via JDBC Source.
  • Couchbase as NoSQL store for analytics and raw orders.
  • Confluent UIs available on the ports you listed.

Top comments (0)