DEV Community

Alex Spinov
Alex Spinov

Posted on

NATS Has a Free API: High-Performance Cloud-Native Messaging System

NATS is an open-source, high-performance messaging system for cloud-native applications. It provides publish-subscribe, request-reply, and streaming with at-most-once and exactly-once delivery guarantees.

What Is NATS?

NATS is a CNCF incubating project that provides a simple, secure, and performant communications system for digital systems, services, and devices. It can handle millions of messages per second with microsecond latency.

Key Features:

  • Pub/Sub, Request/Reply, Queue Groups
  • JetStream for persistent messaging
  • Key-Value and Object stores
  • Multi-tenancy with accounts
  • Decentralized security (NKeys/JWT)
  • Leaf nodes for edge computing
  • WebSocket support
  • 40+ client libraries

Quick Start

# Install NATS server
brew install nats-server

# Start with JetStream enabled
nats-server -js

# Install CLI
brew install nats-io/nats-tools/nats

# Test pub/sub
nats sub "orders.>" &
nats pub orders.new '{"id": 1, "item": "widget"}'
Enter fullscreen mode Exit fullscreen mode

NATS with Python

import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost:4222")

    # Subscribe
    async def handler(msg):
        print(f"Received: {msg.subject}: {msg.data.decode()}")

    sub = await nc.subscribe("orders.*", cb=handler)

    # Publish
    await nc.publish("orders.new", b'{"id": 1, "total": 99.99}')
    await nc.publish("orders.update", b'{"id": 1, "status": "shipped"}')

    # Request-Reply
    async def responder(msg):
        await msg.respond(b'{"status": "ok", "balance": 150.00}')

    await nc.subscribe("account.balance", cb=responder)

    response = await nc.request("account.balance", b'{"user_id": 42}', timeout=5)
    print(f"Balance: {response.data.decode()}")

    await asyncio.sleep(1)
    await nc.close()

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

JetStream: Persistent Messaging

import asyncio
import nats
from nats.js.api import StreamConfig, ConsumerConfig

async def jetstream_example():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()

    # Create a stream
    await js.add_stream(
        StreamConfig(
            name="ORDERS",
            subjects=["orders.>"],
            retention="limits",
            max_msgs=1000000,
            max_bytes=1024*1024*1024  # 1GB
        )
    )

    # Publish to stream
    ack = await js.publish("orders.new", b'{"id": 1}')
    print(f"Published: seq={ack.seq}, stream={ack.stream}")

    # Subscribe with durable consumer
    sub = await js.subscribe(
        "orders.>",
        durable="order-processor",
        config=ConsumerConfig(ack_policy="explicit")
    )

    msg = await sub.next_msg(timeout=5)
    print(f"Got: {msg.data.decode()}")
    await msg.ack()

    await nc.close()

asyncio.run(jetstream_example())
Enter fullscreen mode Exit fullscreen mode

Key-Value Store

async def kv_example():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()

    # Create KV bucket
    kv = await js.create_key_value(bucket="CONFIG")

    # Put values
    await kv.put("app.name", b"MyApp")
    await kv.put("app.version", b"2.1.0")
    await kv.put("feature.dark_mode", b"true")

    # Get values
    entry = await kv.get("app.name")
    print(f"{entry.key}: {entry.value.decode()} (rev: {entry.revision})")

    # Watch for changes
    watcher = await kv.watchall()
    async for update in watcher:
        if update is None:
            break
        print(f"Changed: {update.key} = {update.value.decode()}")

    await nc.close()
Enter fullscreen mode Exit fullscreen mode

NATS with Node.js

import { connect, StringCodec } from "nats";

const nc = await connect({ servers: "nats://localhost:4222" });
const sc = StringCodec();

// Subscribe
const sub = nc.subscribe("events.>");
(async () => {
  for await (const msg of sub) {
    console.log(`${msg.subject}: ${sc.decode(msg.data)}`);
  }
})();

// Publish
nc.publish("events.user.login", sc.encode(JSON.stringify({ user: "alice" })));

// Request-Reply
const response = await nc.request("api.users.get", sc.encode("42"), { timeout: 5000 });
console.log(`User: ${sc.decode(response.data)}`);
Enter fullscreen mode Exit fullscreen mode

Resources


Need to scrape web data for your event-driven apps? Check out my web scraping tools on Apify — production-ready actors for Reddit, Google Maps, and more. Questions? Email me at spinov001@gmail.com

Top comments (0)