End-to-End Testing: Validating Kafka Messages with Playwright
Testing distributed systems is one of the hardest problems in modern development. You click a button in your web app, something happens asynchronously in Kafka, and you need to verify the entire flow worked correctly.
But here's the good news: you can build robust end-to-end tests that validate your entire message pipeline using Playwright for UI automation and a Kafka consumer for message validation.
In this article, I'll show you how to build a testing setup that captures UI interactions, validates them in Kafka, and ensures your distributed system works as expected.
What We'll Build
By the end of this article, you'll have:
- A local Kafka cluster running in Docker
- Reusable Kafka consumer utilities for message validation
- Playwright tests that trigger UI actions and validate resulting Kafka messages
- Advanced validation patterns for real-world scenarios
Prerequisites
Before we start, make sure you have:
- Node.js v16 or higher
- Docker and Docker Compose installed
- Basic understanding of Kafka (topics, producers, consumers)
- Familiarity with JavaScript and async/await
Not familiar with Kafka? Think of it as a distributed message queue where your services publish events (like "user signed up" or "order created") that other services consume and process.
Step 1: Setting Up Kafka Locally with Docker
Let's get Kafka running locally without any complex setup.
Create a docker-compose.yml file in your project root:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
healthcheck:
test: ["CMD", "echo", "ruok", "|", "nc", "localhost", "2181"]
interval: 10s
timeout: 5s
retries: 5
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 10s
timeout: 5s
retries: 5
Start Kafka:
docker-compose up -d
Verify it's running:
docker-compose ps
You should see both zookeeper and kafka services with status "Up".
Step 2: Setting Up Your Project
Create a package.json with the necessary dependencies:
{
"name": "kafka-playwright-validation",
"version": "1.0.0",
"description": "End-to-end testing with Kafka and Playwright",
"scripts": {
"test": "playwright test",
"test:headed": "playwright test --headed",
"test:debug": "playwright test --debug"
},
"dependencies": {
"kafkajs": "^2.2.4",
"@playwright/test": "^1.40.0"
}
}
Install dependencies:
npm install
Create the playwright.config.js file:
const { defineConfig, devices } = require('@playwright/test');
module.exports = defineConfig({
testDir: './tests',
fullyParallel: false,
forbidOnly: !!process.env.CI,
retries: process.env.CI ? 2 : 0,
workers: process.env.CI ? 1 : 1,
reporter: 'html',
use: {
baseURL: 'http://localhost:3000',
trace: 'on-first-retry',
},
});
Step 3: Building the Kafka Consumer Utility
Create utils/KafkaConsumer.js - this is the heart of our testing framework:
const { Kafka, logLevel } = require('kafkajs');
class KafkaConsumer {
constructor(brokers = ['localhost:9092'], options = {}) {
this.kafka = new Kafka({
clientId: options.clientId || `test-consumer-${Date.now()}`,
brokers,
logLevel: logLevel.ERROR,
});
this.messages = [];
this.consumer = null;
this.isConnected = false;
}
async connect() {
if (this.isConnected) return;
this.consumer = this.kafka.consumer({
groupId: `test-group-${Date.now()}`,
sessionTimeout: 30000,
heartbeatInterval: 10000,
});
await this.consumer.connect();
this.isConnected = true;
}
async subscribe(topic) {
if (!this.isConnected) {
throw new Error('Consumer not connected. Call connect() first.');
}
await this.consumer.subscribe({ topic, fromBeginning: false });
}
async startConsuming() {
if (!this.consumer) {
throw new Error('Consumer not initialized.');
}
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
this.messages.push({
topic,
partition,
offset: message.offset,
key: message.key ? message.key.toString() : null,
value: message.value ? JSON.parse(message.value.toString()) : null,
timestamp: message.timestamp,
headers: message.headers,
});
} catch (error) {
console.error('Error parsing message:', error);
}
},
});
}
async disconnect() {
if (this.consumer) {
await this.consumer.disconnect();
this.isConnected = false;
}
}
getMessages() {
return [...this.messages];
}
clearMessages() {
this.messages = [];
}
async waitForMessages(count, timeoutMs = 10000) {
const startTime = Date.now();
while (this.messages.length < count) {
if (Date.now() - startTime > timeoutMs) {
throw new Error(
`Timeout waiting for ${count} messages. ` +
`Got ${this.messages.length} messages after ${timeoutMs}ms`
);
}
// Small delay to avoid busy waiting
await new Promise(resolve => setTimeout(resolve, 100));
}
// Return the last `count` messages
return this.messages.slice(-count);
}
findMessage(predicate) {
return this.messages.find(predicate);
}
findMessages(predicate) {
return this.messages.filter(predicate);
}
getMessageCount() {
return this.messages.length;
}
getLastMessage() {
return this.messages[this.messages.length - 1] || null;
}
}
module.exports = KafkaConsumer;
Step 4: Creating the Kafka Producer Utility
Create utils/KafkaProducer.js for sending test messages:
const { Kafka, logLevel } = require('kafkajs');
class KafkaProducer {
constructor(brokers = ['localhost:9092'], options = {}) {
this.kafka = new Kafka({
clientId: options.clientId || `test-producer-${Date.now()}`,
brokers,
logLevel: logLevel.ERROR,
});
this.producer = null;
this.isConnected = false;
}
async connect() {
if (this.isConnected) return;
this.producer = this.kafka.producer({
maxInFlightRequests: 1,
compression: 0,
});
await this.producer.connect();
this.isConnected = true;
}
async sendMessage(topic, messages) {
if (!this.isConnected) {
throw new Error('Producer not connected. Call connect() first.');
}
const formattedMessages = messages.map(msg => ({
key: msg.key || null,
value: typeof msg.value === 'string' ? msg.value : JSON.stringify(msg.value),
headers: msg.headers || {},
}));
const result = await this.producer.send({
topic,
messages: formattedMessages,
});
return result;
}
async disconnect() {
if (this.producer) {
await this.producer.disconnect();
this.isConnected = false;
}
}
}
module.exports = KafkaProducer;
Step 5: Your First Test - Basic Message Validation
Create tests/basic-validation.spec.js:
const { test, expect } = require('@playwright/test');
const KafkaConsumer = require('../utils/KafkaConsumer');
const KafkaProducer = require('../utils/KafkaProducer');
test.describe('Basic Kafka Message Validation', () => {
let consumer;
let producer;
test.beforeAll(async () => {
// Initialize and connect consumer
consumer = new KafkaConsumer();
await consumer.connect();
await consumer.subscribe('test-topic');
await consumer.startConsuming();
// Initialize and connect producer
producer = new KafkaProducer();
await producer.connect();
});
test.afterAll(async () => {
await consumer.disconnect();
await producer.disconnect();
});
test('should produce and validate a message', async () => {
consumer.clearMessages();
const orderId = `order-${Date.now()}`;
// Send a message to Kafka
await producer.sendMessage('test-topic', [
{
key: orderId,
value: {
orderId,
customerId: 'cust-123',
amount: 99.99,
currency: 'USD',
status: 'pending',
createdAt: new Date().toISOString(),
}
}
]);
// Wait for the message to appear in Kafka
const messages = await consumer.waitForMessages(1, 5000);
// Validate message was received
expect(messages).toHaveLength(1);
const message = messages[0];
expect(message.value).toMatchObject({
orderId: orderId,
customerId: 'cust-123',
amount: 99.99,
currency: 'USD',
status: 'pending',
});
// Validate message metadata
expect(message.key).toBe(orderId);
expect(message.topic).toBe('test-topic');
});
test('should filter messages by criteria', async () => {
consumer.clearMessages();
// Send multiple messages with different amounts
for (let i = 1; i <= 3; i++) {
await producer.sendMessage('test-topic', [
{
key: `order-${i}`,
value: {
orderId: `order-${i}`,
amount: 50 * i, // 50, 100, 150
status: 'pending'
}
}
]);
}
// Wait for all messages
await consumer.waitForMessages(3, 5000);
// Find high-value orders (> 70)
const highValueOrders = consumer.findMessages(
msg => msg.value.amount > 70
);
expect(highValueOrders).toHaveLength(2);
expect(highValueOrders[0].value.amount).toBe(100);
expect(highValueOrders[1].value.amount).toBe(150);
});
test('should validate message structure', async () => {
consumer.clearMessages();
await producer.sendMessage('test-topic', [
{
key: 'payment-1',
value: {
transactionId: 'txn-123',
amount: 50.00,
currency: 'USD',
}
}
]);
const messages = await consumer.waitForMessages(1, 5000);
const message = messages[0];
// Validate required fields exist
expect(message.value).toHaveProperty('transactionId');
expect(message.value).toHaveProperty('amount');
expect(message.value).toHaveProperty('currency');
// Validate field types
expect(typeof message.value.transactionId).toBe('string');
expect(typeof message.value.amount).toBe('number');
expect(message.value.amount).toBeGreaterThan(0);
});
});
Step 6: UI Integration Tests
Create tests/ui-kafka-integration.spec.js - this is where it gets real:
const { test, expect } = require('@playwright/test');
const KafkaConsumer = require('../utils/KafkaConsumer');
test.describe('UI to Kafka Integration', () => {
let consumer;
test.beforeAll(async () => {
consumer = new KafkaConsumer();
await consumer.connect();
await consumer.subscribe('user-events');
await consumer.startConsuming();
});
test.afterAll(async () => {
await consumer.disconnect();
});
test('should emit user signup event to Kafka', async ({ page }) => {
consumer.clearMessages();
const email = `user-${Date.now()}@example.com`;
const password = 'SecurePass123!';
// Navigate to signup page
await page.goto('/signup');
// Fill out signup form
await page.fill('input[name="email"]', email);
await page.fill('input[name="password"]', password);
await page.fill('input[name="confirmPassword"]', password);
await page.fill('input[name="fullName"]', 'John Doe');
// Submit form
await page.click('button[type="submit"]:has-text("Sign Up")');
// Wait for success message on UI
await expect(page.locator('text=Welcome, John')).toBeVisible({
timeout: 5000
});
// Validate the Kafka message was produced
const messages = await consumer.waitForMessages(1, 5000);
expect(messages).toHaveLength(1);
const event = messages[0];
expect(event.value).toMatchObject({
eventType: 'user.signup',
email: email,
fullName: 'John Doe',
});
// Verify event metadata
expect(event.value.timestamp).toBeDefined();
expect(event.value.userId).toBeDefined();
});
test('should emit payment processed event', async ({ page }) => {
consumer.clearMessages();
// Navigate to checkout
await page.goto('/checkout');
// Fill payment details
await page.fill('input[placeholder="Card Number"]', '4111111111111111');
await page.fill('input[placeholder="CVC"]', '123');
await page.fill('input[placeholder="Expiry"]', '12/25');
// Place order
await page.click('button:has-text("Complete Purchase")');
// Verify UI confirmation
await expect(page.locator('text=Order Confirmed')).toBeVisible();
// Get the order ID from the page
const orderId = await page.locator('[data-testid="order-id"]').textContent();
// Validate Kafka event
const messages = await consumer.waitForMessages(1, 5000);
expect(messages[0].value).toMatchObject({
eventType: 'payment.processed',
orderId: orderId.trim(),
status: 'success',
amount: expect.any(Number),
});
expect(messages[0].value.transactionId).toBeTruthy();
expect(messages[0].value.processedAt).toBeTruthy();
});
test('should emit multiple events for order creation', async ({ page }) => {
consumer.clearMessages();
// Navigate and complete purchase
await page.goto('/shop');
await page.click('button:has-text("Add to Cart")');
await page.click('button:has-text("Checkout")');
await page.click('button:has-text("Place Order")');
// Wait for success
await expect(page.locator('text=Order placed successfully')).toBeVisible();
// Wait for multiple events (order created + payment processed + inventory updated)
const messages = await consumer.waitForMessages(3, 10000);
expect(messages).toHaveLength(3);
// Validate event sequence
const eventTypes = messages.map(m => m.value.eventType);
expect(eventTypes).toEqual([
'order.created',
'payment.processed',
'inventory.updated'
]);
});
});
Step 7: Advanced Validation Patterns
Create tests/advanced-patterns.spec.js for sophisticated scenarios:
const { test, expect } = require('@playwright/test');
const KafkaConsumer = require('../utils/KafkaConsumer');
const KafkaProducer = require('../utils/KafkaProducer');
test.describe('Advanced Kafka Validation Patterns', () => {
let consumer;
let producer;
test.beforeAll(async () => {
consumer = new KafkaConsumer();
await consumer.connect();
await consumer.subscribe('events');
await consumer.startConsuming();
producer = new KafkaProducer();
await producer.connect();
});
test.afterAll(async () => {
await consumer.disconnect();
await producer.disconnect();
});
test('should maintain message ordering', async () => {
consumer.clearMessages();
const sequenceId = `seq-${Date.now()}`;
// Send 5 messages in sequence
for (let i = 0; i < 5; i++) {
await producer.sendMessage('events', [
{
key: sequenceId, // Same key maintains order
value: {
sequenceId,
sequence: i,
timestamp: Date.now() + i,
}
}
]);
// Small delay between sends
await new Promise(resolve => setTimeout(resolve, 50));
}
// Wait for all messages
const messages = await consumer.waitForMessages(5, 10000);
// Verify order is maintained
for (let i = 0; i < messages.length; i++) {
expect(messages[i].value.sequence).toBe(i);
}
});
test('should validate message timestamps', async () => {
consumer.clearMessages();
const beforeTime = Date.now();
await producer.sendMessage('events', [
{
value: {
eventType: 'test.event',
data: 'test',
}
}
]);
const afterTime = Date.now();
const messages = await consumer.waitForMessages(1, 5000);
const messageTime = parseInt(messages[0].timestamp);
expect(messageTime).toBeGreaterThanOrEqual(beforeTime);
expect(messageTime).toBeLessThanOrEqual(afterTime);
});
test('should find and validate related messages', async () => {
consumer.clearMessages();
const userId = 'user-123';
const sessionId = `session-${Date.now()}`;
// Send related messages with same user ID
await producer.sendMessage('events', [
{
key: userId,
value: {
eventType: 'user.login',
userId,
sessionId,
}
}
]);
await producer.sendMessage('events', [
{
key: userId,
value: {
eventType: 'user.action',
userId,
sessionId,
action: 'view_product',
}
}
]);
await producer.sendMessage('events', [
{
key: userId,
value: {
eventType: 'user.logout',
userId,
sessionId,
}
}
]);
// Wait for all messages
const messages = await consumer.waitForMessages(3, 10000);
// Find all messages for this session
const sessionMessages = messages.filter(
m => m.value.sessionId === sessionId
);
expect(sessionMessages).toHaveLength(3);
// Verify sequence
expect(sessionMessages[0].value.eventType).toBe('user.login');
expect(sessionMessages[1].value.eventType).toBe('user.action');
expect(sessionMessages[2].value.eventType).toBe('user.logout');
});
test('should handle message retry scenarios', async () => {
consumer.clearMessages();
const requestId = `request-${Date.now()}`;
// Simulate a retry pattern
for (let attempt = 1; attempt <= 3; attempt++) {
await producer.sendMessage('events', [
{
key: requestId,
value: {
eventType: 'api.call',
requestId,
attempt,
endpoint: '/api/data',
status: attempt === 3 ? 'success' : 'retry',
}
}
]);
await new Promise(resolve => setTimeout(resolve, 100));
}
const messages = await consumer.waitForMessages(3, 10000);
// Verify retry pattern
expect(messages[0].value.status).toBe('retry');
expect(messages[1].value.status).toBe('retry');
expect(messages[2].value.status).toBe('success');
// Verify attempts are sequential
expect(messages.map(m => m.value.attempt)).toEqual([1, 2, 3]);
});
});
Running Your Tests
Now that we have everything set up, let's run the tests.
Make sure Kafka is still running:
docker-compose ps
Run all tests:
npm test
Run tests in headed mode (see the browser):
npm run test:headed
Run a specific test file:
npx playwright test tests/basic-validation.spec.js
Run in debug mode:
npm run test:debug
Best Practices for Kafka Testing
1. Always Clear Messages Between Tests
test('my test', async () => {
consumer.clearMessages(); // Do this first!
// ... rest of test
});
2. Use Appropriate Timeouts
- Start with 5000ms for local tests
- Increase to 10000ms+ for slower systems
- Adjust based on your infrastructure
3. Validate Structure Before Values
// ✅ Good: Check structure first, then values
expect(message.value).toHaveProperty('orderId');
expect(message.value.orderId).toBe(expectedId);
// ❌ Bad: Assuming structure without checking
expect(message.value.orderId).toBe(expectedId); // May fail with unhelpful error
4. Use Message Keys for Tracking
// ✅ Good: Use meaningful keys
await producer.sendMessage('orders', [
{
key: orderId, // Easier to track related messages
value: { orderId, amount: 100 }
}
]);
5. Log Failed Messages for Debugging
test('validate message', async () => {
const messages = await consumer.waitForMessages(1, 5000);
if (messages.length === 0) {
console.log('No messages received. All messages:', consumer.getMessages());
}
expect(messages).toHaveLength(1);
});
Troubleshooting Common Issues
Issue: "Timeout waiting for messages"
Solutions:
- Verify Kafka is running:
docker-compose ps - Check topic exists:
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092 - Increase timeout value:
await consumer.waitForMessages(1, 15000) - Add debugging:
console.log('Messages received:', consumer.getMessageCount())
Issue: "Consumer not connected"
Solution:
Make sure you call await consumer.connect() in beforeAll:
test.beforeAll(async () => {
consumer = new KafkaConsumer();
await consumer.connect(); // This is required!
await consumer.subscribe('topic');
await consumer.startConsuming();
});
Issue: Messages appearing but test still fails
Solution:
Add debugging to see the actual message:
const messages = await consumer.waitForMessages(1, 5000);
console.log('Received message:', JSON.stringify(messages[0], null, 2));
expect(messages[0].value).toMatchObject({ /* your expectations */ });
Conclusion
Testing distributed systems doesn't have to be painful. By combining Playwright's powerful UI automation with Kafka consumers for message validation, you can build comprehensive end-to-end tests that give you confidence in your entire system.
The patterns shown here scale from simple single-message validation to complex multi-topic event flows. Start simple, test often, and gradually build out your test suite as your system grows.
Your turn: Try building a test for your own application's message flow. The hardest part is the first test—after that, it gets much easier!
Resources
- Playwright Documentation
- KafkaJS Documentation
- Kafka Official Guide
- Docker Compose Reference
- GitHub: Kafka + Playwright Example (coming soon)
Questions? Drop them in the comments below. Happy testing!
Top comments (0)