Your AI coding assistant just generated a beautiful dashboard in 30 seconds. There's just one problem: it's showing static mock data.
You need live data. Sensor readings from your IoT devices. Telemetry from production systems. Event streams from Kafka. But AI tools like Claude Code, Cursor, and Copilot can't connect to MQTT brokers or Kafka topics.
Until now.
The Problem: AI Tools Don't Speak Streaming
AI coding assistants are incredible at building applications, but they all hit the same wall:
- Can't connect to MQTT brokers
- Don't know how to consume Kafka topics
- Have no concept of WebSocket relays or message buffering
- Can't deploy server-side automations for 24/7 monitoring
So you end up writing all the infrastructure code manually — exactly where you need AI help most.
The Solution: MCP + Streaming Infrastructure
Model Context Protocol (MCP) is an open standard that lets AI tools connect to external data sources. Here's the stack:
JustinX - Managed platform that:
- Connects to MQTT, Kafka, and webhooks
- Buffers messages via Redis Streams
- Delivers via WebSocket with sub-second latency
- Runs server-side automations (Watchers)
- Exposes everything through MCP
LiveTap SDK - Open-source TypeScript client (MIT):
- Handles WebSocket connections
- Auto-reconnection with backoff
- Topic filtering and backpressure
Your Data (MQTT/Kafka) → JustinX (MCP Server) →
AI Assistant → Generated Code (LiveTap SDK) → Your App
Works with: Claude Code, Cursor, Cline, Continue.dev, Windsurf, and any MCP-compatible tool.
Quick Start
1. Configure MCP
// ~/.config/claude-code/mcp_servers.json (or similar for your tool)
{
"mcpServers": {
"justinx": {
"url": "https://justinx.ai/mcp",
"headers": {
"Authorization": "Bearer YOUR_API_KEY"
}
}
}
}
Get your API key at justinx.ai (free tier, no credit card).
2. Install LiveTap
npm install @livetap/client
3. Connect via Natural Language
Tell your AI assistant:
"Connect to my MQTT broker at mqtt://iot.example.com:1883, topic sensors/+/data"
It calls the JustinX MCP API to set up the connection.
4. Build Your Dashboard
Ask your AI:
"Create a React dashboard showing real-time sensor data with live charts"
It generates this:
import { LiveTap } from '@livetap/client';
import { useEffect, useState } from 'react';
function Dashboard() {
const [data, setData] = useState([]);
const tap = new LiveTap({ url: 'wss://justinx.ai/stream' });
useEffect(() => {
tap.subscribe('sensors/+/data', (msg) => {
setData(prev => [...prev.slice(-99), msg.data]);
});
tap.connect();
return () => tap.disconnect();
}, []);
return <LiveChart data={data} />;
}
5. Deploy Server-Side Automation
Ask:
"Alert me on Slack when any sensor exceeds 100"
AI generates a Watcher that runs 24/7 on JustinX:
export default async function monitor(stream, notify) {
for await (const msg of stream('sensors/+/data')) {
if (msg.data.value > 100) {
await notify.slack({
channel: '#alerts',
text: `⚠️ Sensor ${msg.data.id} exceeded threshold: ${msg.data.value}`
});
}
}
}
Total time: 5 minutes. Infrastructure code written: 0 lines.
Real-World Example: EV Charger Monitoring
The Ask:
"Connect to mqtt://chargers.internal:8883, topic chargers/+/telemetry. Create a map showing all active charging sessions. Alert me when efficiency drops below 88% for 10 minutes."
What AI generates:
// Dashboard with LiveTap
function ChargerMap() {
const [chargers, setChargers] = useState(new Map());
const tap = new LiveTap({ url: 'wss://justinx.ai/stream' });
useEffect(() => {
tap.subscribe('chargers/+/telemetry', (msg) => {
setChargers(prev => new Map(prev).set(
msg.data.deviceId,
{ power: msg.data.power, efficiency: msg.data.efficiency, ...msg.data }
));
});
tap.connect();
}, []);
return (
<MapContainer>
{Array.from(chargers.values()).map(c => (
<Marker position={[c.lat, c.lng]}>
<Popup>Power: {c.power}kW | Efficiency: {c.efficiency}%</Popup>
</Marker>
))}
</MapContainer>
);
}
// Watcher for predictive maintenance
export default async function efficiencyMonitor(stream, notify) {
const windows = new Map(); // deviceId -> [readings]
for await (const msg of stream('chargers/+/telemetry')) {
const { deviceId, efficiency } = msg.data;
if (!windows.has(deviceId)) windows.set(deviceId, []);
const readings = windows.get(deviceId);
readings.push(efficiency);
if (readings.length > 120) readings.shift(); // 10 min @ 5s intervals
const avg = readings.reduce((a, b) => a + b) / readings.length;
if (avg < 88 && readings.length >= 120) {
await notify.email({
to: 'ops@example.com',
subject: `Charger ${deviceId} degraded`,
body: `Avg efficiency: ${avg.toFixed(2)}%`
});
}
}
}
Result: Bearing failures detected 2-3 weeks early. Maintenance scheduled during planned downtime.
Why This Beats DIY
Without JustinX:
- 2-3 weeks: Set up Kafka/MQTT consumers
- Write WebSocket server with reconnection
- Configure Redis buffering
- Deploy and monitor everything
- Debug at 3 AM
With JustinX:
- 5 minutes: Paste connection string
- AI generates code
- Done
Architecture Highlights
- Protocols: MQTT v3.1.1/v5, Kafka (SASL/SCRAM, SSL), Webhooks (HMAC)
- Buffering: Redis Streams (5 min free, 30 min pro, 1 hr+ enterprise)
- Latency: <50ms p99 (MQTT), <100ms (Kafka)
- Scale: 100K+ msgs/sec, 1000+ concurrent WebSocket clients
- Security: TLS 1.3, AES-256 at rest, per-org isolation
Pricing
- Free: 3 Watchers, 5 min retention
- Pro: $29/mo, 25 Watchers, 30 min retention
- Enterprise: Unlimited, 1+ hr retention, SSO, SLA
No per-user fees. No message charges.
Common Issues
"Connection refused"
- Check firewall allows JustinX IP ranges
- Test broker:
mosquitto_sub -h broker.example.com -t test
"WebSocket disconnects"
const tap = new LiveTap({
url: 'wss://justinx.ai/stream',
pingInterval: 30000,
maxReconnectAttempts: 10
});
"Watcher not receiving messages"
- Check Watcher logs in dashboard
- Verify topic pattern matches
- Add
console.logstatements
Migration from DIY
Before (MQTT + Node.js):
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://broker');
client.on('message', (topic, msg) => {
processMessage(JSON.parse(msg));
});
After (Watcher):
export default async function processor(stream, notify) {
for await (const msg of stream('sensors/+/data')) {
processMessage(msg.data);
}
}
Migration time: <1 hour
Infrastructure removed: MQTT client, process manager, deployment
Get Started
- Sign up: justinx.ai (free tier)
- Add MCP config to your AI tool
npm install @livetap/client- Tell your AI to connect a data source
- Build
Links:
- JustinX Platform
- LiveTap SDK (⭐ if useful!)
- Documentation
- Examples
- X
Questions? Drop them below or follow us on X.
If you found this helpful:
- ⭐ Star LiveTap on GitHub
- 🔄 Share with your team
- 💬 Join the community
Stop fighting infrastructure. Start building.
Top comments (0)