DEV Community

Cover image for End-to-End Testing: Validating Kafka Messages with Playwright
Rama Krishna Reddy Arumalla
Rama Krishna Reddy Arumalla

Posted on

End-to-End Testing: Validating Kafka Messages with Playwright

End-to-End Testing: Validating Kafka Messages with Playwright

Testing distributed systems is one of the hardest problems in modern development. You click a button in your web app, something happens asynchronously in Kafka, and you need to verify the entire flow worked correctly.

But here's the good news: you can build robust end-to-end tests that validate your entire message pipeline using Playwright for UI automation and a Kafka consumer for message validation.

In this article, I'll show you how to build a testing setup that captures UI interactions, validates them in Kafka, and ensures your distributed system works as expected.

What We'll Build

By the end of this article, you'll have:

  • A local Kafka cluster running in Docker
  • Reusable Kafka consumer utilities for message validation
  • Playwright tests that trigger UI actions and validate resulting Kafka messages
  • Advanced validation patterns for real-world scenarios

Prerequisites

Before we start, make sure you have:

  • Node.js v16 or higher
  • Docker and Docker Compose installed
  • Basic understanding of Kafka (topics, producers, consumers)
  • Familiarity with JavaScript and async/await

Not familiar with Kafka? Think of it as a distributed message queue where your services publish events (like "user signed up" or "order created") that other services consume and process.

Step 1: Setting Up Kafka Locally with Docker

Let's get Kafka running locally without any complex setup.

Create a docker-compose.yml file in your project root:

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    healthcheck:
      test: ["CMD", "echo", "ruok", "|", "nc", "localhost", "2181"]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    healthcheck:
      test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
      interval: 10s
      timeout: 5s
      retries: 5
Enter fullscreen mode Exit fullscreen mode

Start Kafka:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Verify it's running:

docker-compose ps
Enter fullscreen mode Exit fullscreen mode

You should see both zookeeper and kafka services with status "Up".

Step 2: Setting Up Your Project

Create a package.json with the necessary dependencies:

{
  "name": "kafka-playwright-validation",
  "version": "1.0.0",
  "description": "End-to-end testing with Kafka and Playwright",
  "scripts": {
    "test": "playwright test",
    "test:headed": "playwright test --headed",
    "test:debug": "playwright test --debug"
  },
  "dependencies": {
    "kafkajs": "^2.2.4",
    "@playwright/test": "^1.40.0"
  }
}
Enter fullscreen mode Exit fullscreen mode

Install dependencies:

npm install
Enter fullscreen mode Exit fullscreen mode

Create the playwright.config.js file:

const { defineConfig, devices } = require('@playwright/test');

module.exports = defineConfig({
  testDir: './tests',
  fullyParallel: false,
  forbidOnly: !!process.env.CI,
  retries: process.env.CI ? 2 : 0,
  workers: process.env.CI ? 1 : 1,
  reporter: 'html',
  use: {
    baseURL: 'http://localhost:3000',
    trace: 'on-first-retry',
  },
});
Enter fullscreen mode Exit fullscreen mode

Step 3: Building the Kafka Consumer Utility

Create utils/KafkaConsumer.js - this is the heart of our testing framework:

const { Kafka, logLevel } = require('kafkajs');

class KafkaConsumer {
  constructor(brokers = ['localhost:9092'], options = {}) {
    this.kafka = new Kafka({
      clientId: options.clientId || `test-consumer-${Date.now()}`,
      brokers,
      logLevel: logLevel.ERROR,
    });
    this.messages = [];
    this.consumer = null;
    this.isConnected = false;
  }

  async connect() {
    if (this.isConnected) return;

    this.consumer = this.kafka.consumer({
      groupId: `test-group-${Date.now()}`,
      sessionTimeout: 30000,
      heartbeatInterval: 10000,
    });

    await this.consumer.connect();
    this.isConnected = true;
  }

  async subscribe(topic) {
    if (!this.isConnected) {
      throw new Error('Consumer not connected. Call connect() first.');
    }
    await this.consumer.subscribe({ topic, fromBeginning: false });
  }

  async startConsuming() {
    if (!this.consumer) {
      throw new Error('Consumer not initialized.');
    }

    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        try {
          this.messages.push({
            topic,
            partition,
            offset: message.offset,
            key: message.key ? message.key.toString() : null,
            value: message.value ? JSON.parse(message.value.toString()) : null,
            timestamp: message.timestamp,
            headers: message.headers,
          });
        } catch (error) {
          console.error('Error parsing message:', error);
        }
      },
    });
  }

  async disconnect() {
    if (this.consumer) {
      await this.consumer.disconnect();
      this.isConnected = false;
    }
  }

  getMessages() {
    return [...this.messages];
  }

  clearMessages() {
    this.messages = [];
  }

  async waitForMessages(count, timeoutMs = 10000) {
    const startTime = Date.now();

    while (this.messages.length < count) {
      if (Date.now() - startTime > timeoutMs) {
        throw new Error(
          `Timeout waiting for ${count} messages. ` +
          `Got ${this.messages.length} messages after ${timeoutMs}ms`
        );
      }
      // Small delay to avoid busy waiting
      await new Promise(resolve => setTimeout(resolve, 100));
    }

    // Return the last `count` messages
    return this.messages.slice(-count);
  }

  findMessage(predicate) {
    return this.messages.find(predicate);
  }

  findMessages(predicate) {
    return this.messages.filter(predicate);
  }

  getMessageCount() {
    return this.messages.length;
  }

  getLastMessage() {
    return this.messages[this.messages.length - 1] || null;
  }
}

module.exports = KafkaConsumer;
Enter fullscreen mode Exit fullscreen mode

Step 4: Creating the Kafka Producer Utility

Create utils/KafkaProducer.js for sending test messages:

const { Kafka, logLevel } = require('kafkajs');

class KafkaProducer {
  constructor(brokers = ['localhost:9092'], options = {}) {
    this.kafka = new Kafka({
      clientId: options.clientId || `test-producer-${Date.now()}`,
      brokers,
      logLevel: logLevel.ERROR,
    });
    this.producer = null;
    this.isConnected = false;
  }

  async connect() {
    if (this.isConnected) return;

    this.producer = this.kafka.producer({
      maxInFlightRequests: 1,
      compression: 0,
    });

    await this.producer.connect();
    this.isConnected = true;
  }

  async sendMessage(topic, messages) {
    if (!this.isConnected) {
      throw new Error('Producer not connected. Call connect() first.');
    }

    const formattedMessages = messages.map(msg => ({
      key: msg.key || null,
      value: typeof msg.value === 'string' ? msg.value : JSON.stringify(msg.value),
      headers: msg.headers || {},
    }));

    const result = await this.producer.send({
      topic,
      messages: formattedMessages,
    });

    return result;
  }

  async disconnect() {
    if (this.producer) {
      await this.producer.disconnect();
      this.isConnected = false;
    }
  }
}

module.exports = KafkaProducer;
Enter fullscreen mode Exit fullscreen mode

Step 5: Your First Test - Basic Message Validation

Create tests/basic-validation.spec.js:

const { test, expect } = require('@playwright/test');
const KafkaConsumer = require('../utils/KafkaConsumer');
const KafkaProducer = require('../utils/KafkaProducer');

test.describe('Basic Kafka Message Validation', () => {
  let consumer;
  let producer;

  test.beforeAll(async () => {
    // Initialize and connect consumer
    consumer = new KafkaConsumer();
    await consumer.connect();
    await consumer.subscribe('test-topic');
    await consumer.startConsuming();

    // Initialize and connect producer
    producer = new KafkaProducer();
    await producer.connect();
  });

  test.afterAll(async () => {
    await consumer.disconnect();
    await producer.disconnect();
  });

  test('should produce and validate a message', async () => {
    consumer.clearMessages();

    const orderId = `order-${Date.now()}`;

    // Send a message to Kafka
    await producer.sendMessage('test-topic', [
      {
        key: orderId,
        value: {
          orderId,
          customerId: 'cust-123',
          amount: 99.99,
          currency: 'USD',
          status: 'pending',
          createdAt: new Date().toISOString(),
        }
      }
    ]);

    // Wait for the message to appear in Kafka
    const messages = await consumer.waitForMessages(1, 5000);

    // Validate message was received
    expect(messages).toHaveLength(1);

    const message = messages[0];
    expect(message.value).toMatchObject({
      orderId: orderId,
      customerId: 'cust-123',
      amount: 99.99,
      currency: 'USD',
      status: 'pending',
    });

    // Validate message metadata
    expect(message.key).toBe(orderId);
    expect(message.topic).toBe('test-topic');
  });

  test('should filter messages by criteria', async () => {
    consumer.clearMessages();

    // Send multiple messages with different amounts
    for (let i = 1; i <= 3; i++) {
      await producer.sendMessage('test-topic', [
        {
          key: `order-${i}`,
          value: {
            orderId: `order-${i}`,
            amount: 50 * i, // 50, 100, 150
            status: 'pending'
          }
        }
      ]);
    }

    // Wait for all messages
    await consumer.waitForMessages(3, 5000);

    // Find high-value orders (> 70)
    const highValueOrders = consumer.findMessages(
      msg => msg.value.amount > 70
    );

    expect(highValueOrders).toHaveLength(2);
    expect(highValueOrders[0].value.amount).toBe(100);
    expect(highValueOrders[1].value.amount).toBe(150);
  });

  test('should validate message structure', async () => {
    consumer.clearMessages();

    await producer.sendMessage('test-topic', [
      {
        key: 'payment-1',
        value: {
          transactionId: 'txn-123',
          amount: 50.00,
          currency: 'USD',
        }
      }
    ]);

    const messages = await consumer.waitForMessages(1, 5000);
    const message = messages[0];

    // Validate required fields exist
    expect(message.value).toHaveProperty('transactionId');
    expect(message.value).toHaveProperty('amount');
    expect(message.value).toHaveProperty('currency');

    // Validate field types
    expect(typeof message.value.transactionId).toBe('string');
    expect(typeof message.value.amount).toBe('number');
    expect(message.value.amount).toBeGreaterThan(0);
  });
});
Enter fullscreen mode Exit fullscreen mode

Step 6: UI Integration Tests

Create tests/ui-kafka-integration.spec.js - this is where it gets real:

const { test, expect } = require('@playwright/test');
const KafkaConsumer = require('../utils/KafkaConsumer');

test.describe('UI to Kafka Integration', () => {
  let consumer;

  test.beforeAll(async () => {
    consumer = new KafkaConsumer();
    await consumer.connect();
    await consumer.subscribe('user-events');
    await consumer.startConsuming();
  });

  test.afterAll(async () => {
    await consumer.disconnect();
  });

  test('should emit user signup event to Kafka', async ({ page }) => {
    consumer.clearMessages();

    const email = `user-${Date.now()}@example.com`;
    const password = 'SecurePass123!';

    // Navigate to signup page
    await page.goto('/signup');

    // Fill out signup form
    await page.fill('input[name="email"]', email);
    await page.fill('input[name="password"]', password);
    await page.fill('input[name="confirmPassword"]', password);
    await page.fill('input[name="fullName"]', 'John Doe');

    // Submit form
    await page.click('button[type="submit"]:has-text("Sign Up")');

    // Wait for success message on UI
    await expect(page.locator('text=Welcome, John')).toBeVisible({
      timeout: 5000
    });

    // Validate the Kafka message was produced
    const messages = await consumer.waitForMessages(1, 5000);

    expect(messages).toHaveLength(1);

    const event = messages[0];
    expect(event.value).toMatchObject({
      eventType: 'user.signup',
      email: email,
      fullName: 'John Doe',
    });

    // Verify event metadata
    expect(event.value.timestamp).toBeDefined();
    expect(event.value.userId).toBeDefined();
  });

  test('should emit payment processed event', async ({ page }) => {
    consumer.clearMessages();

    // Navigate to checkout
    await page.goto('/checkout');

    // Fill payment details
    await page.fill('input[placeholder="Card Number"]', '4111111111111111');
    await page.fill('input[placeholder="CVC"]', '123');
    await page.fill('input[placeholder="Expiry"]', '12/25');

    // Place order
    await page.click('button:has-text("Complete Purchase")');

    // Verify UI confirmation
    await expect(page.locator('text=Order Confirmed')).toBeVisible();

    // Get the order ID from the page
    const orderId = await page.locator('[data-testid="order-id"]').textContent();

    // Validate Kafka event
    const messages = await consumer.waitForMessages(1, 5000);

    expect(messages[0].value).toMatchObject({
      eventType: 'payment.processed',
      orderId: orderId.trim(),
      status: 'success',
      amount: expect.any(Number),
    });

    expect(messages[0].value.transactionId).toBeTruthy();
    expect(messages[0].value.processedAt).toBeTruthy();
  });

  test('should emit multiple events for order creation', async ({ page }) => {
    consumer.clearMessages();

    // Navigate and complete purchase
    await page.goto('/shop');
    await page.click('button:has-text("Add to Cart")');
    await page.click('button:has-text("Checkout")');
    await page.click('button:has-text("Place Order")');

    // Wait for success
    await expect(page.locator('text=Order placed successfully')).toBeVisible();

    // Wait for multiple events (order created + payment processed + inventory updated)
    const messages = await consumer.waitForMessages(3, 10000);

    expect(messages).toHaveLength(3);

    // Validate event sequence
    const eventTypes = messages.map(m => m.value.eventType);
    expect(eventTypes).toEqual([
      'order.created',
      'payment.processed',
      'inventory.updated'
    ]);
  });
});
Enter fullscreen mode Exit fullscreen mode

Step 7: Advanced Validation Patterns

Create tests/advanced-patterns.spec.js for sophisticated scenarios:

const { test, expect } = require('@playwright/test');
const KafkaConsumer = require('../utils/KafkaConsumer');
const KafkaProducer = require('../utils/KafkaProducer');

test.describe('Advanced Kafka Validation Patterns', () => {
  let consumer;
  let producer;

  test.beforeAll(async () => {
    consumer = new KafkaConsumer();
    await consumer.connect();
    await consumer.subscribe('events');
    await consumer.startConsuming();

    producer = new KafkaProducer();
    await producer.connect();
  });

  test.afterAll(async () => {
    await consumer.disconnect();
    await producer.disconnect();
  });

  test('should maintain message ordering', async () => {
    consumer.clearMessages();

    const sequenceId = `seq-${Date.now()}`;

    // Send 5 messages in sequence
    for (let i = 0; i < 5; i++) {
      await producer.sendMessage('events', [
        {
          key: sequenceId, // Same key maintains order
          value: {
            sequenceId,
            sequence: i,
            timestamp: Date.now() + i,
          }
        }
      ]);

      // Small delay between sends
      await new Promise(resolve => setTimeout(resolve, 50));
    }

    // Wait for all messages
    const messages = await consumer.waitForMessages(5, 10000);

    // Verify order is maintained
    for (let i = 0; i < messages.length; i++) {
      expect(messages[i].value.sequence).toBe(i);
    }
  });

  test('should validate message timestamps', async () => {
    consumer.clearMessages();

    const beforeTime = Date.now();

    await producer.sendMessage('events', [
      {
        value: {
          eventType: 'test.event',
          data: 'test',
        }
      }
    ]);

    const afterTime = Date.now();
    const messages = await consumer.waitForMessages(1, 5000);

    const messageTime = parseInt(messages[0].timestamp);

    expect(messageTime).toBeGreaterThanOrEqual(beforeTime);
    expect(messageTime).toBeLessThanOrEqual(afterTime);
  });

  test('should find and validate related messages', async () => {
    consumer.clearMessages();

    const userId = 'user-123';
    const sessionId = `session-${Date.now()}`;

    // Send related messages with same user ID
    await producer.sendMessage('events', [
      {
        key: userId,
        value: {
          eventType: 'user.login',
          userId,
          sessionId,
        }
      }
    ]);

    await producer.sendMessage('events', [
      {
        key: userId,
        value: {
          eventType: 'user.action',
          userId,
          sessionId,
          action: 'view_product',
        }
      }
    ]);

    await producer.sendMessage('events', [
      {
        key: userId,
        value: {
          eventType: 'user.logout',
          userId,
          sessionId,
        }
      }
    ]);

    // Wait for all messages
    const messages = await consumer.waitForMessages(3, 10000);

    // Find all messages for this session
    const sessionMessages = messages.filter(
      m => m.value.sessionId === sessionId
    );

    expect(sessionMessages).toHaveLength(3);

    // Verify sequence
    expect(sessionMessages[0].value.eventType).toBe('user.login');
    expect(sessionMessages[1].value.eventType).toBe('user.action');
    expect(sessionMessages[2].value.eventType).toBe('user.logout');
  });

  test('should handle message retry scenarios', async () => {
    consumer.clearMessages();

    const requestId = `request-${Date.now()}`;

    // Simulate a retry pattern
    for (let attempt = 1; attempt <= 3; attempt++) {
      await producer.sendMessage('events', [
        {
          key: requestId,
          value: {
            eventType: 'api.call',
            requestId,
            attempt,
            endpoint: '/api/data',
            status: attempt === 3 ? 'success' : 'retry',
          }
        }
      ]);

      await new Promise(resolve => setTimeout(resolve, 100));
    }

    const messages = await consumer.waitForMessages(3, 10000);

    // Verify retry pattern
    expect(messages[0].value.status).toBe('retry');
    expect(messages[1].value.status).toBe('retry');
    expect(messages[2].value.status).toBe('success');

    // Verify attempts are sequential
    expect(messages.map(m => m.value.attempt)).toEqual([1, 2, 3]);
  });
});
Enter fullscreen mode Exit fullscreen mode

Running Your Tests

Now that we have everything set up, let's run the tests.

Make sure Kafka is still running:

docker-compose ps
Enter fullscreen mode Exit fullscreen mode

Run all tests:

npm test
Enter fullscreen mode Exit fullscreen mode

Run tests in headed mode (see the browser):

npm run test:headed
Enter fullscreen mode Exit fullscreen mode

Run a specific test file:

npx playwright test tests/basic-validation.spec.js
Enter fullscreen mode Exit fullscreen mode

Run in debug mode:

npm run test:debug
Enter fullscreen mode Exit fullscreen mode

Best Practices for Kafka Testing

1. Always Clear Messages Between Tests

test('my test', async () => {
  consumer.clearMessages(); // Do this first!
  // ... rest of test
});
Enter fullscreen mode Exit fullscreen mode

2. Use Appropriate Timeouts

  • Start with 5000ms for local tests
  • Increase to 10000ms+ for slower systems
  • Adjust based on your infrastructure

3. Validate Structure Before Values

// ✅ Good: Check structure first, then values
expect(message.value).toHaveProperty('orderId');
expect(message.value.orderId).toBe(expectedId);

// ❌ Bad: Assuming structure without checking
expect(message.value.orderId).toBe(expectedId); // May fail with unhelpful error
Enter fullscreen mode Exit fullscreen mode

4. Use Message Keys for Tracking

// ✅ Good: Use meaningful keys
await producer.sendMessage('orders', [
  {
    key: orderId, // Easier to track related messages
    value: { orderId, amount: 100 }
  }
]);
Enter fullscreen mode Exit fullscreen mode

5. Log Failed Messages for Debugging

test('validate message', async () => {
  const messages = await consumer.waitForMessages(1, 5000);

  if (messages.length === 0) {
    console.log('No messages received. All messages:', consumer.getMessages());
  }

  expect(messages).toHaveLength(1);
});
Enter fullscreen mode Exit fullscreen mode

Troubleshooting Common Issues

Issue: "Timeout waiting for messages"

Solutions:

  • Verify Kafka is running: docker-compose ps
  • Check topic exists: docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
  • Increase timeout value: await consumer.waitForMessages(1, 15000)
  • Add debugging: console.log('Messages received:', consumer.getMessageCount())

Issue: "Consumer not connected"

Solution:
Make sure you call await consumer.connect() in beforeAll:

test.beforeAll(async () => {
  consumer = new KafkaConsumer();
  await consumer.connect(); // This is required!
  await consumer.subscribe('topic');
  await consumer.startConsuming();
});
Enter fullscreen mode Exit fullscreen mode

Issue: Messages appearing but test still fails

Solution:
Add debugging to see the actual message:

const messages = await consumer.waitForMessages(1, 5000);
console.log('Received message:', JSON.stringify(messages[0], null, 2));
expect(messages[0].value).toMatchObject({ /* your expectations */ });
Enter fullscreen mode Exit fullscreen mode

Conclusion

Testing distributed systems doesn't have to be painful. By combining Playwright's powerful UI automation with Kafka consumers for message validation, you can build comprehensive end-to-end tests that give you confidence in your entire system.

The patterns shown here scale from simple single-message validation to complex multi-topic event flows. Start simple, test often, and gradually build out your test suite as your system grows.

Your turn: Try building a test for your own application's message flow. The hardest part is the first test—after that, it gets much easier!


Resources

Questions? Drop them in the comments below. Happy testing!

Top comments (0)