NATS is an open-source, high-performance messaging system for cloud-native applications. It provides publish-subscribe, request-reply, and streaming with at-most-once and exactly-once delivery guarantees.
What Is NATS?
NATS is a CNCF incubating project that provides a simple, secure, and performant communications system for digital systems, services, and devices. It can handle millions of messages per second with microsecond latency.
Key Features:
- Pub/Sub, Request/Reply, Queue Groups
- JetStream for persistent messaging
- Key-Value and Object stores
- Multi-tenancy with accounts
- Decentralized security (NKeys/JWT)
- Leaf nodes for edge computing
- WebSocket support
- 40+ client libraries
Quick Start
# Install NATS server
brew install nats-server
# Start with JetStream enabled
nats-server -js
# Install CLI
brew install nats-io/nats-tools/nats
# Test pub/sub
nats sub "orders.>" &
nats pub orders.new '{"id": 1, "item": "widget"}'
NATS with Python
import asyncio
import nats
async def main():
nc = await nats.connect("nats://localhost:4222")
# Subscribe
async def handler(msg):
print(f"Received: {msg.subject}: {msg.data.decode()}")
sub = await nc.subscribe("orders.*", cb=handler)
# Publish
await nc.publish("orders.new", b'{"id": 1, "total": 99.99}')
await nc.publish("orders.update", b'{"id": 1, "status": "shipped"}')
# Request-Reply
async def responder(msg):
await msg.respond(b'{"status": "ok", "balance": 150.00}')
await nc.subscribe("account.balance", cb=responder)
response = await nc.request("account.balance", b'{"user_id": 42}', timeout=5)
print(f"Balance: {response.data.decode()}")
await asyncio.sleep(1)
await nc.close()
asyncio.run(main())
JetStream: Persistent Messaging
import asyncio
import nats
from nats.js.api import StreamConfig, ConsumerConfig
async def jetstream_example():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# Create a stream
await js.add_stream(
StreamConfig(
name="ORDERS",
subjects=["orders.>"],
retention="limits",
max_msgs=1000000,
max_bytes=1024*1024*1024 # 1GB
)
)
# Publish to stream
ack = await js.publish("orders.new", b'{"id": 1}')
print(f"Published: seq={ack.seq}, stream={ack.stream}")
# Subscribe with durable consumer
sub = await js.subscribe(
"orders.>",
durable="order-processor",
config=ConsumerConfig(ack_policy="explicit")
)
msg = await sub.next_msg(timeout=5)
print(f"Got: {msg.data.decode()}")
await msg.ack()
await nc.close()
asyncio.run(jetstream_example())
Key-Value Store
async def kv_example():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
# Create KV bucket
kv = await js.create_key_value(bucket="CONFIG")
# Put values
await kv.put("app.name", b"MyApp")
await kv.put("app.version", b"2.1.0")
await kv.put("feature.dark_mode", b"true")
# Get values
entry = await kv.get("app.name")
print(f"{entry.key}: {entry.value.decode()} (rev: {entry.revision})")
# Watch for changes
watcher = await kv.watchall()
async for update in watcher:
if update is None:
break
print(f"Changed: {update.key} = {update.value.decode()}")
await nc.close()
NATS with Node.js
import { connect, StringCodec } from "nats";
const nc = await connect({ servers: "nats://localhost:4222" });
const sc = StringCodec();
// Subscribe
const sub = nc.subscribe("events.>");
(async () => {
for await (const msg of sub) {
console.log(`${msg.subject}: ${sc.decode(msg.data)}`);
}
})();
// Publish
nc.publish("events.user.login", sc.encode(JSON.stringify({ user: "alice" })));
// Request-Reply
const response = await nc.request("api.users.get", sc.encode("42"), { timeout: 5000 });
console.log(`User: ${sc.decode(response.data)}`);
Resources
- NATS Docs
- NATS GitHub — 16K+ stars
- NATS by Example
Need to scrape web data for your event-driven apps? Check out my web scraping tools on Apify — production-ready actors for Reddit, Google Maps, and more. Questions? Email me at spinov001@gmail.com
Top comments (0)