DEV Community

Mansi T
Mansi T

Posted on

How to Connect Any AI Coding Assistant to Kafka, MQTT, and Live Data Streams

Your AI coding assistant just generated a beautiful dashboard in 30 seconds. There's just one problem: it's showing static mock data.

You need live data. Sensor readings from your IoT devices. Telemetry from production systems. Event streams from Kafka. But AI tools like Claude Code, Cursor, and Copilot can't connect to MQTT brokers or Kafka topics.

Until now.

The Problem: AI Tools Don't Speak Streaming

AI coding assistants are incredible at building applications, but they all hit the same wall:

  • Can't connect to MQTT brokers
  • Don't know how to consume Kafka topics
  • Have no concept of WebSocket relays or message buffering
  • Can't deploy server-side automations for 24/7 monitoring

So you end up writing all the infrastructure code manually — exactly where you need AI help most.

The Solution: MCP + Streaming Infrastructure

Model Context Protocol (MCP) is an open standard that lets AI tools connect to external data sources. Here's the stack:

JustinX - Managed platform that:

  • Connects to MQTT, Kafka, and webhooks
  • Buffers messages via Redis Streams
  • Delivers via WebSocket with sub-second latency
  • Runs server-side automations (Watchers)
  • Exposes everything through MCP

LiveTap SDK - Open-source TypeScript client (MIT):

  • Handles WebSocket connections
  • Auto-reconnection with backoff
  • Topic filtering and backpressure
Your Data (MQTT/Kafka) → JustinX (MCP Server) → 
AI Assistant → Generated Code (LiveTap SDK) → Your App
Enter fullscreen mode Exit fullscreen mode

Works with: Claude Code, Cursor, Cline, Continue.dev, Windsurf, and any MCP-compatible tool.

Quick Start

1. Configure MCP

// ~/.config/claude-code/mcp_servers.json (or similar for your tool)
{
  "mcpServers": {
    "justinx": {
      "url": "https://justinx.ai/mcp",
      "headers": {
        "Authorization": "Bearer YOUR_API_KEY"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Get your API key at justinx.ai (free tier, no credit card).

2. Install LiveTap

npm install @livetap/client
Enter fullscreen mode Exit fullscreen mode

3. Connect via Natural Language

Tell your AI assistant:

"Connect to my MQTT broker at mqtt://iot.example.com:1883, topic sensors/+/data"

It calls the JustinX MCP API to set up the connection.

4. Build Your Dashboard

Ask your AI:

"Create a React dashboard showing real-time sensor data with live charts"

It generates this:

import { LiveTap } from '@livetap/client';
import { useEffect, useState } from 'react';

function Dashboard() {
  const [data, setData] = useState([]);
  const tap = new LiveTap({ url: 'wss://justinx.ai/stream' });

  useEffect(() => {
    tap.subscribe('sensors/+/data', (msg) => {
      setData(prev => [...prev.slice(-99), msg.data]);
    });
    tap.connect();
    return () => tap.disconnect();
  }, []);

  return <LiveChart data={data} />;
}
Enter fullscreen mode Exit fullscreen mode

5. Deploy Server-Side Automation

Ask:

"Alert me on Slack when any sensor exceeds 100"

AI generates a Watcher that runs 24/7 on JustinX:

export default async function monitor(stream, notify) {
  for await (const msg of stream('sensors/+/data')) {
    if (msg.data.value > 100) {
      await notify.slack({
        channel: '#alerts',
        text: `⚠️ Sensor ${msg.data.id} exceeded threshold: ${msg.data.value}`
      });
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Total time: 5 minutes. Infrastructure code written: 0 lines.

Real-World Example: EV Charger Monitoring

The Ask:

"Connect to mqtt://chargers.internal:8883, topic chargers/+/telemetry. Create a map showing all active charging sessions. Alert me when efficiency drops below 88% for 10 minutes."

What AI generates:

// Dashboard with LiveTap
function ChargerMap() {
  const [chargers, setChargers] = useState(new Map());
  const tap = new LiveTap({ url: 'wss://justinx.ai/stream' });

  useEffect(() => {
    tap.subscribe('chargers/+/telemetry', (msg) => {
      setChargers(prev => new Map(prev).set(
        msg.data.deviceId, 
        { power: msg.data.power, efficiency: msg.data.efficiency, ...msg.data }
      ));
    });
    tap.connect();
  }, []);

  return (
    <MapContainer>
      {Array.from(chargers.values()).map(c => (
        <Marker position={[c.lat, c.lng]}>
          <Popup>Power: {c.power}kW | Efficiency: {c.efficiency}%</Popup>
        </Marker>
      ))}
    </MapContainer>
  );
}
Enter fullscreen mode Exit fullscreen mode
// Watcher for predictive maintenance
export default async function efficiencyMonitor(stream, notify) {
  const windows = new Map(); // deviceId -> [readings]

  for await (const msg of stream('chargers/+/telemetry')) {
    const { deviceId, efficiency } = msg.data;

    if (!windows.has(deviceId)) windows.set(deviceId, []);
    const readings = windows.get(deviceId);
    readings.push(efficiency);
    if (readings.length > 120) readings.shift(); // 10 min @ 5s intervals

    const avg = readings.reduce((a, b) => a + b) / readings.length;

    if (avg < 88 && readings.length >= 120) {
      await notify.email({
        to: 'ops@example.com',
        subject: `Charger ${deviceId} degraded`,
        body: `Avg efficiency: ${avg.toFixed(2)}%`
      });
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Result: Bearing failures detected 2-3 weeks early. Maintenance scheduled during planned downtime.

Why This Beats DIY

Without JustinX:

  • 2-3 weeks: Set up Kafka/MQTT consumers
  • Write WebSocket server with reconnection
  • Configure Redis buffering
  • Deploy and monitor everything
  • Debug at 3 AM

With JustinX:

  • 5 minutes: Paste connection string
  • AI generates code
  • Done

Architecture Highlights

  • Protocols: MQTT v3.1.1/v5, Kafka (SASL/SCRAM, SSL), Webhooks (HMAC)
  • Buffering: Redis Streams (5 min free, 30 min pro, 1 hr+ enterprise)
  • Latency: <50ms p99 (MQTT), <100ms (Kafka)
  • Scale: 100K+ msgs/sec, 1000+ concurrent WebSocket clients
  • Security: TLS 1.3, AES-256 at rest, per-org isolation

Pricing

  • Free: 3 Watchers, 5 min retention
  • Pro: $29/mo, 25 Watchers, 30 min retention
  • Enterprise: Unlimited, 1+ hr retention, SSO, SLA

No per-user fees. No message charges.

Common Issues

"Connection refused"

  • Check firewall allows JustinX IP ranges
  • Test broker: mosquitto_sub -h broker.example.com -t test

"WebSocket disconnects"

const tap = new LiveTap({
  url: 'wss://justinx.ai/stream',
  pingInterval: 30000,
  maxReconnectAttempts: 10
});
Enter fullscreen mode Exit fullscreen mode

"Watcher not receiving messages"

  • Check Watcher logs in dashboard
  • Verify topic pattern matches
  • Add console.log statements

Migration from DIY

Before (MQTT + Node.js):

const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://broker');
client.on('message', (topic, msg) => {
  processMessage(JSON.parse(msg));
});
Enter fullscreen mode Exit fullscreen mode

After (Watcher):

export default async function processor(stream, notify) {
  for await (const msg of stream('sensors/+/data')) {
    processMessage(msg.data);
  }
}
Enter fullscreen mode Exit fullscreen mode

Migration time: <1 hour

Infrastructure removed: MQTT client, process manager, deployment

Get Started

  1. Sign up: justinx.ai (free tier)
  2. Add MCP config to your AI tool
  3. npm install @livetap/client
  4. Tell your AI to connect a data source
  5. Build

Links:


Questions? Drop them below or follow us on X.

If you found this helpful:

Stop fighting infrastructure. Start building.

Top comments (0)