Business story
You’re building a real-time order processing platform for an e-commerce company:
- Existing Oracle database with orders & customers (in the lab we’ll use PostgreSQL to simulate Oracle).
- New Couchbase (NoSQL) for fast customer session & cart data (you can simulate with Couchbase or Mongo if easier).
-
Need Confluent Kafka in the middle to stream events:
-
order-servicewrites new orders to Kafka. -
payment-service,fraud-service,analytics-serviceconsume. - Kafka Connect syncs data from Oracle → Kafka and Kafka → Couchbase.
- ksqlDB / Kafka Streams does real-time aggregations (e.g., sales per minute, fraud rules).
-
What this lets you talk about:
- Kafka architecture & streaming design
- Confluent components: Brokers, Schema Registry, Connect, ksqlDB, Control Center
- SQL + NoSQL integration (Oracle/Postgres + Couchbase)
- Topics, partitions, replication, consumer groups, offsets
- Reliability, scale, monitoring, security
- How you “owned” the Confluent environment and became the escalation point
2. High-level architecture
Describe this in interviews like this:
- Producers
-
order-service(REST API) → publishesordersevents to Kafka.- Kafka / Confluent cluster
3 Kafka brokers (or 1 for lab).
-
Topics:
-
orders(3 partitions, RF=2) paymentsfraud-alertsorder-analytics
-
-
Schema Registry for Avro/JSON schemas.
- Stream processing
-
ksqlDB or Kafka Streams app:
- joins orders with payments
- flags potential fraud
- writes
fraud-alerts&order-analytics.- Connectors
JDBC Source Connector: Oracle/Postgres → topic
legacy_orders.-
Sink Connector:
order-analytics→ Couchbase collection.- Consumers
payment-service→ consumesorders, writes to DB and publishes topayments.fraud-service→ consumesorders+payments, publishes tofraud-alerts.-
analytics-service→ consumesorders& writes summary to NoSQL / analytics DB.- Ops
Confluent Control Center or CLI tools for monitoring.
Basic ACLs / SSL (at least conceptually).
3. Tech stack for the lab
- Platform: Docker Compose on your Mac
-
Core: Confluent Platform images:
-
zookeeper(or KRaft mode if you want modern setup) -
kafkabrokers schema-registry-
ksqldb-server+ksqldb-cli connectcontrol-center
-
-
Databases
-
postgres(simulating Oracle) -
couchbase(or Mongo if Couchbase is too heavy)
-
-
Microservices
- Language you like (Python or Node.js) for producer/consumer services.
-
UI
-
kafdropor Confluent Control Center to browse topics.
-
4. Step-by-step project plan
Step 1 – Bring up the Confluent stack with Docker Compose
Goal: show you can set up a Confluent environment from scratch.
-
Create
docker-compose.ymlwith:- Zookeeper (optional if not using KRaft)
- 1–3 Kafka brokers
- Schema Registry
- Connect
- ksqlDB
- Control Center
- Postgres
- Couchbase
-
Verify with:
docker ps-
kafka-topicsCLI listing - Control Center UI opens in browser.
Interview mapping:
“Tell me about a Confluent environment you set up.”
“How many brokers, what replication factor, how did you run it locally for POCs?”
Step 2 – Design & create Kafka topics
Goal: talk like an architect about topics, partitions & replication.
-
Design topics:
-
orders– 3 partitions, RF=1 or 2 (lab). -
payments– 3 partitions. -
fraud-alerts– 1 or 2 partitions. -
order-analytics– 3 partitions.
-
Use
kafka-topicsCLI or Control Center to create them.Decide partition key (e.g.,
order_idorcustomer_id).
Interview mapping:
“How do you decide number of partitions?”
“How do you handle ordering?”
“What replication factor do you choose and why?”
Step 3 – Implement an order producer service
Goal: show hands-on Kafka client experience.
-
Build
order-service:- Simple REST endpoint
/ordersthat accepts an order JSON. - Validates & publishes to
orderstopic using Kafka client library. - Adds headers (source, correlation-id) to show best practices.
- Simple REST endpoint
-
Demonstrate:
- Fire a few orders.
- Watch them appear in
orderstopic (Kafdrop orkafka-console-consumer).
Interview mapping:
“Walk me through a producer you wrote.”
“How do you handle retries, acks, idempotence?”
Step 4 – Implement consumer microservices
Goal: talk about consumer groups, scaling, offset management.
payment-service
- Consumes from
orders(grouppayments-group). - “Processes payment” (simulated) and publishes event to
paymentstopic.
fraud-service
- Consumes from
orders&payments(either directly or viaorder-paymentsstream later). - Simple rule: if amount > X and country is Y → publish alert to
fraud-alerts.
analytics-service
- Consumes from
orders& writes toorder_analyticstable in Postgres (or pushes toorder-analyticstopic for Connect).
Show:
- Scaling a consumer group: run 2 instances of
payment-serviceand watch partition assignment change. - Show offset lag using
kafka-consumer-groups --describe.
Interview mapping:
“How do consumer groups work?”
“What happens when you add/remove consumers?”
“How do you handle reprocessing / replay?”
Step 5 – Integrate with Oracle (Postgres) using Kafka Connect
Goal: show Connect and JDBC connectors.
-
In Postgres create tables:
legacy_orders- Insert some sample historical orders.
-
Configure JDBC Source Connector:
- Source: Postgres
legacy_orders. - Sink:
legacy_orderstopic.
- Source: Postgres
-
Verify:
- Rows from DB appear as messages in
legacy_orders.
- Rows from DB appear as messages in
Interview mapping:
“Have you used Kafka Connect?”
“Explain how you brought data from Oracle into Kafka.”
“How do you handle schema changes?”
Step 6 – Sink analytics to Couchbase via Connect
Goal: show Kafka → NoSQL integration.
- Create Couchbase bucket/collection
order_analytics. -
Configure Sink Connector:
- Source topic:
order-analytics. - Target: Couchbase.
- Source topic:
-
Verify:
- Aggregated analytics events appear as documents in Couchbase.
Interview mapping:
“Tell us about integrating Kafka with NoSQL / Couchbase.”
“How did you configure your sink connectors?”
“How do you handle retries / DLQs?”
Step 7 – Stream processing with ksqlDB or Kafka Streams
Goal: cover Kafka Streams / kSQL & streaming architecture.
Using ksqlDB:
-
Define streams:
-
ORDERS_STREAMonorders. -
PAYMENTS_STREAMonpayments.
-
-
Build a joined stream:
-
ORDERS_WITH_PAYMENTSjoining byorder_id.
-
-
Create aggregations:
- Total sales per country per minute.
- Count of “suspicious orders” flagged by simple rule.
Output results to
order-analyticstopic (used by sink connector above).
Interview mapping:
“What’s your experience with Kafka Streams / kSQL?”
“How do you build a real-time pipeline end-to-end?”
“How do you design stateful vs stateless processing?”
Step 8 – Schema Registry & message evolution
Goal: talk about schemas, compatibility & governance.
- Define an Avro or JSON schema for
Orderand register it in Schema Registry. - Configure producer & consumers to use that schema.
-
Demonstrate:
- Add a new optional field (e.g.,
promo_code) → show compatibility (backward/forward). - Talk about what happens if you make a breaking change.
- Add a new optional field (e.g.,
Interview mapping:
“Have you used Schema Registry?”
“How do you manage schema evolution?”
“How do you avoid breaking consumers?”
Step 9 – Reliability, monitoring & troubleshooting
Goal: show you can be the escalation point for Kafka.
Do some experiments:
- Kill one consumer instance and watch rebalance.
- Stop a connector → see lag build up, restart and recover.
- Configure producer with
acks=all,retries, and discuss durability. -
Use:
- Control Center dashboards or CLI to check:
- Consumer lag
- Broker health
- Topic throughput
Prepare talking points:
- How you would monitor in prod (Prometheus/Grafana, alerts on lag, disk, ISR count).
- Backup & disaster recovery strategy (snapshots, multi-AZ, mirror topics across clusters).
Interview mapping:
“How do you monitor Kafka?”
“What are common failure scenarios?”
“If a consumer is lagging badly, how do you troubleshoot?”
Step 10 – Security & access control (conceptual + minimal lab)
Goal: speak about security architecture even if lab is simple.
In lab (optional but nice):
-
Enable SASL/PLAINTEXT or at least explain how you’d:
- Use SASL/SCRAM or mTLS for auth.
- Use ACLs to restrict which services can read/write which topics.
- Use encryption in transit (TLS) and at rest (disks).
Interview mapping:
“How do you secure a Kafka / Confluent environment?”
“How do you isolate teams & applications?”
Step 11 – Your 2-minute “project story” for interviews
Practice saying something like:
“I recently built a real-time orders platform using Confluent Kafka. The company had an existing Oracle database and was adding Couchbase as a NoSQL store.
I designed the Kafka architecture with multiple topics (orders,payments,fraud-alerts,order-analytics) and set up a Confluent stack with Schema Registry, Connect, and ksqlDB using Docker.
Anorder-servicepublishes orders to Kafka;payment-service,fraud-service, andanalytics-serviceconsume them in different consumer groups.
I used a JDBC Source Connector to stream historical data from Oracle (simulated with Postgres) into Kafka, and a sink connector to push real-time analytics into Couchbase.
On top of that I used ksqlDB to join orders and payments, detect potential fraud, and compute per-minute sales metrics.
I monitored consumer lag and broker health through Control Center, experimented with failures, and documented how to scale consumers and handle schema evolution using Schema Registry.
This project gave me end-to-end experience as the person owning the Confluent platform and integrating it with both SQL and NoSQL systems.”
1. Project structure
Create a folder, for example:
mkdir kafka-enterprise-orders
cd kafka-enterprise-orders
Inside it, create this structure:
kafka-enterprise-orders/
├── docker-compose.yml
├── .env
├── db/
│ └── init.sql
├── connect/
│ ├── Dockerfile
│ └── connectors/
│ ├── jdbc-source.json
│ └── couchbase-sink.json
├── ksql/
│ └── streams.sql
├── producer/
│ ├── Dockerfile
│ ├── requirements.txt
│ └── order_producer.py
└── consumers/
├── fraud-service/
│ ├── Dockerfile
│ ├── requirements.txt
│ └── fraud_consumer.py
├── payment-service/
│ ├── Dockerfile
│ ├── requirements.txt
│ └── payment_consumer.py
└── analytics-service/
├── Dockerfile
├── requirements.txt
└── analytics_consumer.py
Now fill each file as below.
2. .env
# Kafka
KAFKA_BROKER=kafka:9092
# Topics
ORDERS_TOPIC=orders
PAYMENTS_TOPIC=payments
FRAUD_ALERTS_TOPIC=fraud-alerts
ORDER_ANALYTICS_TOPIC=order-analytics
# Producer config
ORDER_PRODUCER_INTERVAL_SECONDS=3
# Fraud rules
FRAUD_AMOUNT_THRESHOLD=400
FRAUD_RISKY_COUNTRIES=RU,FR,BR
# Analytics settings
ANALYTICS_PRINT_EVERY=10
# Postgres (simulating Oracle)
POSTGRES_DB=ordersdb
POSTGRES_USER=orders_user
POSTGRES_PASSWORD=orders_pass
# Couchbase (you will set these in UI)
COUCHBASE_HOST=couchbase
COUCHBASE_BUCKET=order_analytics
COUCHBASE_USERNAME=Administrator
COUCHBASE_PASSWORD=password
3. docker-compose.yml
services:
# ---------------------------
# Zookeeper & Kafka broker
# ---------------------------
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.6.1
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
# ---------------------------
# Schema Registry
# ---------------------------
schema-registry:
image: confluentinc/cp-schema-registry:7.6.1
container_name: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
# ---------------------------
# ksqlDB
# ---------------------------
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.6.1
container_name: ksqldb-server
depends_on:
- kafka
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksqldb"
KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_SERVICE_ID: "ksql-service"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.6.1
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
# ---------------------------
# Kafka Connect (custom image)
# ---------------------------
connect:
build: ./connect
container_name: connect
depends_on:
- kafka
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect-cluster"
CONNECT_CONFIG_STORAGE_TOPIC: "_connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "_connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "_connect-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
# ---------------------------
# Confluent Control Center
# ---------------------------
control-center:
image: confluentinc/cp-enterprise-control-center:7.6.1
container_name: control-center
depends_on:
- kafka
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:9092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"
CONTROL_CENTER_KSQL_KSQLDB-SERVER_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
# ---------------------------
# Kafdrop
# ---------------------------
kafdrop:
image: obsidiandynamics/kafdrop:latest
container_name: kafdrop
depends_on:
- kafka
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:9092"
SERVER_SERVLET_CONTEXTPATH: "/"
# ---------------------------
# PostgreSQL (simulating Oracle)
# ---------------------------
postgres:
image: postgres:15
container_name: postgres
ports:
- "5432:5432"
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- ./db/init.sql:/docker-entrypoint-initdb.d/init.sql
# ---------------------------
# Couchbase (single node)
# ---------------------------
couchbase:
image: couchbase:community-7.2.0
container_name: couchbase
ports:
- "8091-8094:8091-8094"
- "11210:11210"
environment:
COUCHBASE_ADMINISTRATOR_USERNAME: ${COUCHBASE_USERNAME}
COUCHBASE_ADMINISTRATOR_PASSWORD: ${COUCHBASE_PASSWORD}
# ---------------------------
# Order producer
# ---------------------------
order-producer:
build: ./producer
container_name: order-producer
depends_on:
- kafka
environment:
KAFKA_BROKER: ${KAFKA_BROKER}
ORDERS_TOPIC: ${ORDERS_TOPIC}
ORDER_PRODUCER_INTERVAL_SECONDS: ${ORDER_PRODUCER_INTERVAL_SECONDS}
# ---------------------------
# Fraud service
# ---------------------------
fraud-service:
build: ./consumers/fraud-service
container_name: fraud-service
depends_on:
- kafka
environment:
KAFKA_BROKER: ${KAFKA_BROKER}
ORDERS_TOPIC: ${ORDERS_TOPIC}
FRAUD_AMOUNT_THRESHOLD: ${FRAUD_AMOUNT_THRESHOLD}
FRAUD_RISKY_COUNTRIES: ${FRAUD_RISKY_COUNTRIES}
# ---------------------------
# Payment service
# ---------------------------
payment-service:
build: ./consumers/payment-service
container_name: payment-service
depends_on:
- kafka
environment:
KAFKA_BROKER: ${KAFKA_BROKER}
ORDERS_TOPIC: ${ORDERS_TOPIC}
PAYMENTS_TOPIC: ${PAYMENTS_TOPIC}
# ---------------------------
# Analytics service
# ---------------------------
analytics-service:
build: ./consumers/analytics-service
container_name: analytics-service
depends_on:
- kafka
- couchbase
environment:
KAFKA_BROKER: ${KAFKA_BROKER}
ORDERS_TOPIC: ${ORDERS_TOPIC}
ORDER_ANALYTICS_TOPIC: ${ORDER_ANALYTICS_TOPIC}
ANALYTICS_PRINT_EVERY: ${ANALYTICS_PRINT_EVERY}
COUCHBASE_HOST: ${COUCHBASE_HOST}
COUCHBASE_BUCKET: ${COUCHBASE_BUCKET}
COUCHBASE_USERNAME: ${COUCHBASE_USERNAME}
COUCHBASE_PASSWORD: ${COUCHBASE_PASSWORD}
4. PostgreSQL init script: db/init.sql
CREATE TABLE IF NOT EXISTS legacy_orders (
id SERIAL PRIMARY KEY,
order_id INT NOT NULL,
customer_id INT NOT NULL,
amount NUMERIC(10,2) NOT NULL,
currency VARCHAR(10) NOT NULL,
country VARCHAR(10) NOT NULL,
created_at TIMESTAMP NOT NULL
);
INSERT INTO legacy_orders (order_id, customer_id, amount, currency, country, created_at) VALUES
(1001, 2001, 123.45, 'USD', 'US', NOW() - INTERVAL '1 day'),
(1002, 2002, 555.00, 'USD', 'FR', NOW() - INTERVAL '2 days'),
(1003, 2003, 75.99, 'USD', 'DE', NOW() - INTERVAL '3 days');
5. Kafka Connect image: connect/Dockerfile
This installs JDBC and Couchbase connectors via Confluent Hub.
FROM confluentinc/cp-kafka-connect:7.6.1
# Install JDBC connector
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
# Install Couchbase Kafka connector
RUN confluent-hub install --no-prompt couchbase/kafka-connect-couchbase:latest
# Default command
CMD ["bash", "-c", "connect-distributed /etc/kafka/connect-distributed.properties"]
6. Kafka Connect connector configs
6.1 connect/connectors/jdbc-source.json
This pulls from Postgres legacy_orders → Kafka topic legacy_orders.
{
"name": "jdbc-source-legacy-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/ordersdb",
"connection.user": "orders_user",
"connection.password": "orders_pass",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "legacy_orders",
"topic.prefix": "legacy_",
"poll.interval.ms": "10000"
}
}
6.2 connect/connectors/couchbase-sink.json
This writes from topic order-analytics → Couchbase bucket order_analytics.
{
"name": "couchbase-sink-order-analytics",
"config": {
"connector.class": "com.couchbase.connect.kafka.CouchbaseSinkConnector",
"tasks.max": "1",
"topics": "order-analytics",
"couchbase.bootstrap.servers": "couchbase",
"couchbase.bucket": "order_analytics",
"couchbase.username": "Administrator",
"couchbase.password": "password",
"couchbase.enable.tls": "false",
"couchbase.document.id": "${topic}-${partition}-${offset}",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
You’ll POST these JSONs to http://localhost:8083/connectors after everything is up.
7. ksqlDB: ksql/streams.sql
Minimal example: create stream on orders, aggregate sales per country.
-- Run this inside ksqldb-cli (docker exec -it ksqldb-cli /bin/sh then ksql http://ksqldb-server:8088)
CREATE STREAM ORDERS_STREAM (
order_id INT,
customer_id INT,
amount DOUBLE,
currency VARCHAR,
country VARCHAR,
status VARCHAR,
created_at VARCHAR,
source VARCHAR
) WITH (
KAFKA_TOPIC = 'orders',
VALUE_FORMAT = 'JSON'
);
CREATE TABLE SALES_PER_COUNTRY AS
SELECT country,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM ORDERS_STREAM
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY country
EMIT CHANGES;
CREATE STREAM ORDER_ANALYTICS_STREAM
WITH (KAFKA_TOPIC = 'order-analytics', VALUE_FORMAT = 'JSON') AS
SELECT country,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM ORDERS_STREAM
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY country
EMIT CHANGES;
8. Producer service
8.1 producer/requirements.txt
kafka-python==2.0.2
8.2 producer/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY order_producer.py .
CMD ["python", "order_producer.py"]
8.3 producer/order_producer.py
import json
import os
import random
import time
from datetime import datetime, timezone
from kafka import KafkaProducer
def get_env(name: str, default: str) -> str:
return os.environ.get(name, default)
KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")
INTERVAL_SECONDS = float(get_env("ORDER_PRODUCER_INTERVAL_SECONDS", "3"))
def create_producer() -> KafkaProducer:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: str(k).encode("utf-8"),
)
print(f"[order-producer] Connected to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
return producer
def random_country() -> str:
countries = ["US", "CA", "GB", "DE", "FR", "RU", "BR", "AU", "IN"]
return random.choice(countries)
def random_currency() -> str:
return "USD"
def random_customer_id() -> int:
return random.randint(1000, 9999)
def random_amount() -> float:
return round(random.uniform(10, 600), 2)
def random_source() -> str:
return random.choice(["web", "mobile", "api"])
def generate_order(order_id: int) -> dict:
now = datetime.now(timezone.utc).isoformat()
order = {
"order_id": order_id,
"customer_id": random_customer_id(),
"amount": random_amount(),
"currency": random_currency(),
"country": random_country(),
"status": "NEW",
"created_at": now,
"source": random_source(),
}
return order
def main():
producer = create_producer()
order_id = 1
while True:
order = generate_order(order_id)
producer.send(ORDERS_TOPIC, key=order["order_id"], value=order)
producer.flush()
print(f"[order-producer] Sent order: {order}")
order_id += 1
time.sleep(INTERVAL_SECONDS)
if __name__ == "__main__":
main()
9. Fraud service
9.1 consumers/fraud-service/requirements.txt
kafka-python==2.0.2
9.2 consumers/fraud-service/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY fraud_consumer.py .
CMD ["python", "fraud_consumer.py"]
9.3 consumers/fraud-service/fraud_consumer.py
import json
import os
from typing import List
from kafka import KafkaConsumer
def get_env(name: str, default: str) -> str:
return os.environ.get(name, default)
KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")
AMOUNT_THRESHOLD = float(get_env("FRAUD_AMOUNT_THRESHOLD", "400"))
RISKY_COUNTRIES_RAW = get_env("FRAUD_RISKY_COUNTRIES", "RU,FR,BR")
RISKY_COUNTRIES: List[str] = [c.strip().upper() for c in RISKY_COUNTRIES_RAW.split(",") if c.strip()]
def create_consumer() -> KafkaConsumer:
consumer = KafkaConsumer(
ORDERS_TOPIC,
bootstrap_servers=KAFKA_BROKER,
group_id="fraud-service-group",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
key_deserializer=lambda k: int(k.decode("utf-8")) if k else None,
auto_offset_reset="earliest",
enable_auto_commit=True,
)
print(f"[fraud-service] Connected to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
print(f"[fraud-service] Amount threshold: {AMOUNT_THRESHOLD}, risky countries: {RISKY_COUNTRIES}")
return consumer
def is_fraud(order: dict) -> (bool, str):
amount = order.get("amount", 0)
country = order.get("country", "").upper()
if amount >= AMOUNT_THRESHOLD and country in RISKY_COUNTRIES:
return True, "HIGH_AMOUNT_RISKY_COUNTRY"
if amount >= AMOUNT_THRESHOLD:
return True, "HIGH_AMOUNT"
if country in RISKY_COUNTRIES:
return True, "RISKY_COUNTRY"
return False, ""
def main():
consumer = create_consumer()
for message in consumer:
order = message.value
key = message.key
print(f"[fraud-service] Received order: key={key}, value={order}")
flagged, reason = is_fraud(order)
if flagged:
alert = {
"order_id": order.get("order_id"),
"reason": reason,
"amount": order.get("amount"),
"country": order.get("country"),
"status": "CANCELLED",
}
print(f"[fraud-service] FRAUD ALERT: {alert}")
else:
print(f"[fraud-service] Order is clean, id={order.get('order_id')}")
if __name__ == "__main__":
main()
10. Payment service
10.1 consumers/payment-service/requirements.txt
kafka-python==2.0.2
10.2 consumers/payment-service/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY payment_consumer.py .
CMD ["python", "payment_consumer.py"]
10.3 consumers/payment-service/payment_consumer.py
import json
import os
from kafka import KafkaConsumer, KafkaProducer
def get_env(name: str, default: str) -> str:
return os.environ.get(name, default)
KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")
PAYMENTS_TOPIC = get_env("PAYMENTS_TOPIC", "payments")
def create_consumer() -> KafkaConsumer:
consumer = KafkaConsumer(
ORDERS_TOPIC,
bootstrap_servers=KAFKA_BROKER,
group_id="payment-service-group",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
key_deserializer=lambda k: int(k.decode("utf-8")) if k else None,
auto_offset_reset="earliest",
enable_auto_commit=True,
)
print(f"[payment-service] Connected consumer to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
return consumer
def create_producer() -> KafkaProducer:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: str(k).encode("utf-8"),
)
print(f"[payment-service] Connected producer to Kafka at {KAFKA_BROKER}, topic '{PAYMENTS_TOPIC}'")
return producer
def main():
consumer = create_consumer()
producer = create_producer()
for message in consumer:
order = message.value
key = message.key
print(f"[payment-service] Received order: key={key}, value={order}")
if order.get("status") == "NEW":
print(f"[payment-service] Processing payment for order {order.get('order_id')} "
f"amount={order.get('amount')} {order.get('currency')}")
payment_event = {
"order_id": order.get("order_id"),
"customer_id": order.get("customer_id"),
"amount": order.get("amount"),
"currency": order.get("currency"),
"status": "PAID",
}
producer.send(PAYMENTS_TOPIC, key=payment_event["order_id"], value=payment_event)
producer.flush()
print(f"[payment-service] Payment successful, event sent: {payment_event}")
else:
print(f"[payment-service] Skipping order {order.get('order_id')} with status={order.get('status')}")
if __name__ == "__main__":
main()
11. Analytics service
This one both prints stats and (optionally) writes to Couchbase directly, so even if Connect is not configured yet you still have Kafka → Couchbase path.
11.1 consumers/analytics-service/requirements.txt
kafka-python==2.0.2
couchbase==4.3.0
11.2 consumers/analytics-service/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY analytics_consumer.py .
CMD ["python", "analytics_consumer.py"]
11.3 consumers/analytics-service/analytics_consumer.py
import json
import os
from collections import Counter
from kafka import KafkaConsumer
from couchbase.cluster import Cluster, ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterTimeoutOptions
def get_env(name: str, default: str) -> str:
return os.environ.get(name, default)
KAFKA_BROKER = get_env("KAFKA_BROKER", "kafka:9092")
ORDERS_TOPIC = get_env("ORDERS_TOPIC", "orders")
PRINT_EVERY = int(get_env("ANALYTICS_PRINT_EVERY", "10"))
COUCHBASE_HOST = get_env("COUCHBASE_HOST", "couchbase")
COUCHBASE_BUCKET = get_env("COUCHBASE_BUCKET", "order_analytics")
COUCHBASE_USERNAME = get_env("COUCHBASE_USERNAME", "Administrator")
COUCHBASE_PASSWORD = get_env("COUCHBASE_PASSWORD", "password")
def create_consumer() -> KafkaConsumer:
consumer = KafkaConsumer(
ORDERS_TOPIC,
bootstrap_servers=KAFKA_BROKER,
group_id="analytics-service-group",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
key_deserializer=lambda k: int(k.decode("utf-8")) if k else None,
auto_offset_reset="earliest",
enable_auto_commit=True,
)
print(f"[analytics-service] Connected to Kafka at {KAFKA_BROKER}, topic '{ORDERS_TOPIC}'")
return consumer
def create_couchbase_bucket():
try:
cluster = Cluster(
f"couchbase://{COUCHBASE_HOST}",
ClusterOptions(
PasswordAuthenticator(COUCHBASE_USERNAME, COUCHBASE_PASSWORD),
timeout_options=ClusterTimeoutOptions(kv_timeout=10)
)
)
bucket = cluster.bucket(COUCHBASE_BUCKET)
collection = bucket.default_collection()
print(f"[analytics-service] Connected to Couchbase bucket '{COUCHBASE_BUCKET}' at {COUCHBASE_HOST}")
return collection
except Exception as e:
print(f"[analytics-service] WARNING: Could not connect to Couchbase: {e}")
return None
def main():
consumer = create_consumer()
collection = create_couchbase_bucket()
total_orders = 0
total_amount = 0.0
orders_by_country = Counter()
for message in consumer:
order = message.value
key = message.key
total_orders += 1
total_amount += float(order.get("amount", 0))
country = order.get("country", "UNKNOWN")
orders_by_country[country] += 1
print(f"[analytics-service] Received order: key={key}, value={order}")
# Optionally store each order document in Couchbase
if collection is not None:
doc_id = f"order::{order.get('order_id')}"
try:
collection.upsert(doc_id, order)
print(f"[analytics-service] Stored order in Couchbase with id={doc_id}")
except Exception as e:
print(f"[analytics-service] ERROR storing to Couchbase: {e}")
if total_orders % PRINT_EVERY == 0:
print("\n[analytics-service] ===== STATS =====")
print(f"Total orders: {total_orders}")
avg_amount = total_amount / total_orders if total_orders else 0
print(f"Total amount: {total_amount:.2f}")
print(f"Average amount: {avg_amount:.2f}")
print("Orders by country:")
for c, count in orders_by_country.items():
print(f" {c}: {count}")
print("[analytics-service] ====================\n")
if __name__ == "__main__":
main()
12. How to run and test
From the root folder kafka-enterprise-orders:
- Build and start everything
docker-compose up -d --build
- Initialize Couchbase once
- Go to
http://localhost:8091in browser. -
Follow setup wizard:
- Username:
Administrator - Password:
password
- Username:
Create a bucket named:
order_analytics.
- Check UIs
- Kafdrop:
http://localhost:9000 - Control Center:
http://localhost:9021/clusters - Schema Registry:
http://localhost:8081/subjects - ksqlDB server info:
http://localhost:8088/info - Couchbase UI:
http://localhost:8091
- Create connectors
In a terminal:
# JDBC source
curl -X POST -H "Content-Type: application/json" \
--data @connect/connectors/jdbc-source.json \
http://localhost:8083/connectors
# Couchbase sink
curl -X POST -H "Content-Type: application/json" \
--data @connect/connectors/couchbase-sink.json \
http://localhost:8083/connectors
- Check logs
docker logs -f order-producer
docker logs -f fraud-service
docker logs -f payment-service
docker logs -f analytics-service
You now have:
- Orders streaming into Kafka.
- Fraud, payment, analytics services consuming.
- Postgres simulating Oracle with CDC via JDBC Source.
- Couchbase as NoSQL store for analytics and raw orders.
- Confluent UIs available on the ports you listed.
Top comments (0)