Let me paint you a picture.
You're building a metrics dashboard. Every client needs live data — CPU usage, request rates, error counts, the works. So you do what everyone does: you set up a REST endpoint and poll it. Every second. Per client.
Ten clients? Fine. A hundred? Starting to sweat. A thousand clients each hammering your /metrics endpoint every second? You've just built the world's most expensive busy-wait loop. Your server is spending most of its time answering "anything new?" with "nope, same as before."
The other option people reach for is WebSockets. Roll your own framing protocol, handle reconnects, write serialization logic by hand, debug binary frames with a hex editor at midnight. It works. It also feels like building furniture without instructions.
gRPC streaming is what you actually wanted. It's been sitting there the whole time, and once you've used it you'll wonder why you ever polled anything.
This post covers all four gRPC patterns — unary, server streaming, client streaming, and bidirectional — with real Rust code using tonic and tokio. We'll build two real services: a live metrics telemetry system and a chunked file uploader. By the end, you'll know exactly which pattern to reach for and when.
One prerequisite: this post assumes you're comfortable with Protobuf and the basics of prost. If that's new to you, I covered it in depth in Serialization Showdown in Rust: JSON Was Fine Until It Wasn't — worth a read before continuing.
Why gRPC? The Quick Version
REST over HTTP/1.1 is request-response: one request, one response, connection closes (or idles). It's the right tool for a lot of things. It's the wrong tool when you need a server to push data to a client continuously, or when a client needs to stream data to a server without waiting for a response to each chunk.
gRPC solves this properly because it's built on HTTP/2, which supports:
- Multiplexing — multiple streams over a single connection
- Full duplex — both sides can send data simultaneously
- Binary framing — efficient, low overhead
Combined with Protobuf for serialization (fast, compact, schema-enforced) and tonic for the Rust implementation, you get a streaming system that's genuinely production-grade with very little boilerplate.
The setup cost is real — you need .proto files, a build step, generated types. We covered that cost in the serialization post. This is where you get it back.
Project Setup
# Cargo.toml
[dependencies]
tonic = "0.11"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
futures = "0.3"
[build-dependencies]
tonic-build = "0.11"
// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/telemetry.proto")?;
tonic_build::compile_protos("proto/fileupload.proto")?;
Ok(())
}
Create two proto files. First, the metrics/telemetry service covering all four patterns:
// proto/telemetry.proto
syntax = "proto3";
package telemetry;
// Unary: get a single snapshot
// Server streaming: subscribe to live metrics
// Bidirectional: stream metrics, receive alerts back
service TelemetryService {
rpc GetSnapshot (MetricsRequest) returns (MetricsResponse);
rpc Subscribe (MetricsRequest) returns (stream MetricsResponse);
rpc LiveFeed (stream MetricsPayload) returns (stream AlertResponse);
}
message MetricsRequest {
string service_name = 1;
uint32 interval_ms = 2; // how often to send updates
}
message MetricsResponse {
string service_name = 1;
double cpu_usage = 2;
double memory_mb = 3;
uint64 request_count = 4;
uint64 timestamp_ms = 5;
}
message MetricsPayload {
string service_name = 1;
double cpu_usage = 2;
double memory_mb = 3;
uint64 timestamp_ms = 4;
}
message AlertResponse {
string alert_type = 1; // "CPU_HIGH", "MEMORY_HIGH", etc.
string message = 2;
double value = 3;
uint64 timestamp_ms = 4;
}
Second, the file upload service for client streaming:
// proto/fileupload.proto
syntax = "proto3";
package fileupload;
// Client streaming: upload file in chunks, server acks when done
service FileUploadService {
rpc Upload (stream FileChunk) returns (UploadResponse);
}
message FileChunk {
string filename = 1;
bytes data = 2;
uint32 chunk_index = 3;
bool is_last = 4;
}
message UploadResponse {
string filename = 1;
uint64 total_bytes = 2;
uint32 total_chunks = 3;
bool success = 4;
string message = 5;
}
Pattern 1 — Unary (The Baseline)
Unary is just request-response. One request in, one response out. Not streaming — but establishing it first makes the streaming patterns easier to follow, and you'll use unary for things like "give me the current state right now."
Server:
use tonic::{Request, Response, Status};
use telemetry::telemetry_service_server::TelemetryService;
use telemetry::{MetricsRequest, MetricsResponse};
pub mod telemetry {
tonic::include_proto!("telemetry");
}
#[derive(Default)]
pub struct TelemetryServer;
#[tonic::async_trait]
impl TelemetryService for TelemetryServer {
// Unary — simple snapshot, no streaming
async fn get_snapshot(
&self,
request: Request<MetricsRequest>,
) -> Result<Response<MetricsResponse>, Status> {
let req = request.into_inner();
println!("Snapshot requested for: {}", req.service_name);
// In production, you'd fetch real metrics here
let response = MetricsResponse {
service_name: req.service_name,
cpu_usage: 42.5,
memory_mb: 1024.0,
request_count: 9_823,
timestamp_ms: current_timestamp_ms(),
};
Ok(Response::new(response))
}
// ... streaming methods below
}
Client:
use tonic::transport::Channel;
use telemetry::telemetry_service_client::TelemetryServiceClient;
use telemetry::MetricsRequest;
async fn get_snapshot(client: &mut TelemetryServiceClient<Channel>) {
let request = MetricsRequest {
service_name: "api-gateway".to_string(),
interval_ms: 0,
};
match client.get_snapshot(request).await {
Ok(response) => {
let metrics = response.into_inner();
println!("CPU: {:.1}% Memory: {:.0}MB Requests: {}",
metrics.cpu_usage, metrics.memory_mb, metrics.request_count);
}
Err(e) => eprintln!("Error: {}", e),
}
}
Pattern 2 — Server Streaming
The client makes one request and the server keeps sending responses. This is the "subscribe" pattern — the client says "I want live metrics for this service" and the server streams updates until the client disconnects or the stream ends.
This is what you use instead of polling.
Server:
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::StreamExt;
type MetricsStream = ReceiverStream<Result<MetricsResponse, Status>>;
#[tonic::async_trait]
impl TelemetryService for TelemetryServer {
type SubscribeStream = MetricsStream;
async fn subscribe(
&self,
request: Request<MetricsRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let req = request.into_inner();
println!("Client subscribed to: {}", req.service_name);
let (tx, rx) = mpsc::channel(32);
let service_name = req.service_name.clone();
let interval_ms = req.interval_ms.max(500) as u64; // minimum 500ms
// Spawn a task that generates metric updates
tokio::spawn(async move {
let mut interval = tokio::time::interval(
tokio::time::Duration::from_millis(interval_ms)
);
loop {
interval.tick().await;
let metrics = MetricsResponse {
service_name: service_name.clone(),
cpu_usage: fetch_cpu_usage(),
memory_mb: fetch_memory_mb(),
request_count: fetch_request_count(),
timestamp_ms: current_timestamp_ms(),
};
// If the receiver is gone, the client disconnected — stop streaming
if tx.send(Ok(metrics)).await.is_err() {
println!("Client disconnected from: {}", service_name);
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
The key pattern here: mpsc::channel as the bridge between your data-producing logic and the gRPC stream. The spawned task produces metrics at the requested interval and sends them down the channel. ReceiverStream wraps the receiver end and turns it into the stream tonic expects.
When the client disconnects, tx.send() returns an error because the receiver is gone. That's your signal to stop. No explicit cleanup code, no manual connection tracking — the channel handles it.
Client:
async fn subscribe_to_metrics(client: &mut TelemetryServiceClient<Channel>) {
let request = MetricsRequest {
service_name: "api-gateway".to_string(),
interval_ms: 1000, // update every second
};
let mut stream = client
.subscribe(request)
.await
.expect("Failed to subscribe")
.into_inner();
println!("Subscribed — receiving live metrics:");
while let Some(result) = stream.next().await {
match result {
Ok(metrics) => {
println!(
"[{}] CPU: {:.1}% Mem: {:.0}MB Requests: {}",
metrics.service_name,
metrics.cpu_usage,
metrics.memory_mb,
metrics.request_count,
);
}
Err(status) => {
eprintln!("Stream error: {}", status);
break;
}
}
}
println!("Stream ended.");
}
stream.next().await blocks until the next message arrives — no spinning, no polling, no wasted CPU. The client is just waiting. When the server closes the stream or something goes wrong, next() returns None and the loop exits cleanly.
Pattern 3 — Client Streaming
Now flip it. The client sends a stream of messages; the server collects them all and responds once at the end.
The canonical use case is file upload — sending a large file as a sequence of chunks rather than one enormous HTTP request. Each chunk is small, independently framed, and if something breaks partway through you know exactly where it stopped.
Server:
use fileupload::file_upload_service_server::FileUploadService;
use fileupload::{FileChunk, UploadResponse};
use tonic::Streaming;
pub mod fileupload {
tonic::include_proto!("fileupload");
}
#[derive(Default)]
pub struct FileUploadServer;
#[tonic::async_trait]
impl FileUploadService for FileUploadServer {
async fn upload(
&self,
request: Request<Streaming<FileChunk>>,
) -> Result<Response<UploadResponse>, Status> {
let mut stream = request.into_inner();
let mut filename = String::new();
let mut total_bytes = 0u64;
let mut total_chunks = 0u32;
let mut file_data = Vec::new();
// Consume the stream chunk by chunk
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| {
Status::internal(format!("Stream error: {}", e))
})?;
// First chunk sets the filename
if filename.is_empty() {
filename = chunk.filename.clone();
println!("Receiving upload: {}", filename);
}
total_bytes += chunk.data.len() as u64;
total_chunks += 1;
file_data.extend_from_slice(&chunk.data);
println!(
" Chunk {}: {} bytes (is_last: {})",
chunk.chunk_index,
chunk.data.len(),
chunk.is_last,
);
if chunk.is_last {
break;
}
}
// In production, write file_data to disk or object storage here
println!("Upload complete: {} ({} bytes, {} chunks)",
filename, total_bytes, total_chunks);
Ok(Response::new(UploadResponse {
filename,
total_bytes,
total_chunks,
success: true,
message: "Upload successful".to_string(),
}))
}
}
Client:
use fileupload::file_upload_service_client::FileUploadServiceClient;
use fileupload::FileChunk;
use tokio_stream::iter;
const CHUNK_SIZE: usize = 64 * 1024; // 64KB per chunk
async fn upload_file(
client: &mut FileUploadServiceClient<Channel>,
filename: &str,
data: &[u8],
) {
let total_chunks = (data.len() + CHUNK_SIZE - 1) / CHUNK_SIZE;
// Build the stream of chunks
let chunks: Vec<FileChunk> = data
.chunks(CHUNK_SIZE)
.enumerate()
.map(|(i, chunk_data)| FileChunk {
filename: filename.to_string(),
data: chunk_data.to_vec(),
chunk_index: i as u32,
is_last: i == total_chunks - 1,
})
.collect();
println!("Uploading {} in {} chunks...", filename, total_chunks);
let stream = iter(chunks);
match client.upload(stream).await {
Ok(response) => {
let r = response.into_inner();
println!(
"Upload complete: {} bytes in {} chunks — {}",
r.total_bytes, r.total_chunks, r.message
);
}
Err(e) => eprintln!("Upload failed: {}", e),
}
}
The client side is refreshingly clean. Build your chunks, wrap them in a stream with tokio_stream::iter, hand it to tonic. The server gets a Streaming<FileChunk> and consumes it with stream.next().await in a loop — same API as the server streaming pattern, just in reverse.
Pattern 4 — Bidirectional Streaming
Both sides stream simultaneously. The client streams data in; the server streams responses back. Neither waits for the other to finish.
This is the most powerful pattern and also the trickiest to reason about. Think of it less like HTTP and more like two people talking on a phone call — both can speak at the same time, messages cross each other in flight, and either side can hang up.
Our use case: the client continuously streams metrics, and the server watches for threshold violations and streams back alerts in real time. If CPU spikes above 90%, the server sends an alert immediately — it doesn't wait for the client to finish sending metrics.
Server:
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
type AlertStream = ReceiverStream<Result<AlertResponse, Status>>;
#[tonic::async_trait]
impl TelemetryService for TelemetryServer {
type LiveFeedStream = AlertStream;
async fn live_feed(
&self,
request: Request<tonic::Streaming<MetricsPayload>>,
) -> Result<Response<Self::LiveFeedStream>, Status> {
let mut inbound = request.into_inner();
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
// Read from the inbound metrics stream
while let Some(result) = inbound.next().await {
let payload = match result {
Ok(p) => p,
Err(e) => {
eprintln!("Inbound stream error: {}", e);
break;
}
};
// Check thresholds and send alerts if needed
if payload.cpu_usage > 90.0 {
let alert = AlertResponse {
alert_type: "CPU_HIGH".to_string(),
message: format!(
"{}: CPU at {:.1}% — above 90% threshold",
payload.service_name, payload.cpu_usage
),
value: payload.cpu_usage,
timestamp_ms: current_timestamp_ms(),
};
if tx.send(Ok(alert)).await.is_err() {
break; // client disconnected
}
}
if payload.memory_mb > 4096.0 {
let alert = AlertResponse {
alert_type: "MEMORY_HIGH".to_string(),
message: format!(
"{}: Memory at {:.0}MB — above 4GB threshold",
payload.service_name, payload.memory_mb
),
value: payload.memory_mb,
timestamp_ms: current_timestamp_ms(),
};
if tx.send(Ok(alert)).await.is_err() {
break;
}
}
}
println!("Live feed ended");
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
Client:
async fn live_feed(client: &mut TelemetryServiceClient<Channel>) {
let (tx, rx) = mpsc::channel(32);
// Spawn a task to continuously send metrics
tokio::spawn(async move {
let mut interval = tokio::time::interval(
tokio::time::Duration::from_millis(500)
);
let mut tick = 0u32;
loop {
interval.tick().await;
tick += 1;
// Simulate occasionally spiking CPU for demo purposes
let cpu = if tick % 10 == 0 { 95.0 } else { 45.0 + (tick % 5) as f64 };
let payload = MetricsPayload {
service_name: "api-gateway".to_string(),
cpu_usage: cpu,
memory_mb: 2048.0 + (tick as f64 * 10.0),
timestamp_ms: current_timestamp_ms(),
};
if tx.send(payload).await.is_err() {
break;
}
}
});
let outbound_stream = ReceiverStream::new(rx);
let mut response_stream = client
.live_feed(outbound_stream)
.await
.expect("Failed to start live feed")
.into_inner();
println!("Live feed started — watching for alerts:");
// Receive alerts from the server while sending metrics concurrently
while let Some(result) = response_stream.next().await {
match result {
Ok(alert) => {
println!(
"[ALERT] {} — {} (value: {:.1})",
alert.alert_type, alert.message, alert.value
);
}
Err(status) => {
eprintln!("Alert stream error: {}", status);
break;
}
}
}
}
The client does two things at once: one task sends metrics into a channel, and the main task reads alerts from the response stream. They run independently. The sending task doesn't wait for an alert before sending the next metric; the receiving loop doesn't wait for the next metric before processing an alert.
This is genuinely hard to replicate cleanly with REST. With bidirectional gRPC streaming and tokio, it's about 30 lines.
Wiring It All Up
// src/server.rs
use tonic::transport::Server;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50051".parse()?;
println!("gRPC server listening on {}", addr);
Server::builder()
.add_service(
telemetry::telemetry_service_server::TelemetryServiceServer::new(
TelemetryServer::default()
)
)
.add_service(
fileupload::file_upload_service_server::FileUploadServiceServer::new(
FileUploadServer::default()
)
)
.serve(addr)
.await?;
Ok(())
}
// src/client.rs
use tonic::transport::Channel;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let channel = Channel::from_static("http://localhost:50051")
.connect()
.await?;
let mut telemetry_client = TelemetryServiceClient::new(channel.clone());
let mut upload_client = FileUploadServiceClient::new(channel);
// 1. Unary
get_snapshot(&mut telemetry_client).await;
// 2. Server streaming — runs for 10 seconds
tokio::time::timeout(
tokio::time::Duration::from_secs(10),
subscribe_to_metrics(&mut telemetry_client),
).await.ok();
// 3. Client streaming — upload a file
let file_data = std::fs::read("large_file.bin").unwrap_or_else(|_| vec![0u8; 1_000_000]);
upload_file(&mut upload_client, "large_file.bin", &file_data).await;
// 4. Bidirectional — live feed for 30 seconds
tokio::time::timeout(
tokio::time::Duration::from_secs(30),
live_feed(&mut telemetry_client),
).await.ok();
Ok(())
}
Error Handling and Gotchas
tonic::Status — the error type you'll live with
Every gRPC error is a Status with a code and a message. The codes you'll actually hit:
// The ones you'll use most
Status::ok(msg) // success (rarely explicit)
Status::invalid_argument(msg) // bad input from client
Status::not_found(msg) // resource doesn't exist
Status::internal(msg) // something broke server-side
Status::unavailable(msg) // server can't handle it right now
Status::cancelled(msg) // client cancelled mid-stream
Always return meaningful messages. When a stream dies at 3am and you're reading logs, "stream error" tells you nothing. "Client disconnected during chunk 47 of upload large_file.bin" tells you everything.
Clean vs dirty disconnects
When a client disconnects cleanly (calls drop on the stream or the connection closes normally), stream.next().await returns None. When it disconnects uncleanly (network drop, process killed), it returns an Err(Status::cancelled(...)) or Err(Status::unknown(...)).
Always handle both:
while let Some(result) = stream.next().await {
match result {
Ok(msg) => { /* process */ }
Err(status) if status.code() == tonic::Code::Cancelled => {
println!("Client cancelled — cleaning up");
break;
}
Err(e) => {
eprintln!("Unexpected stream error: {}", e);
break;
}
}
}
Backpressure — the free lunch you didn't know you had
When your server is generating data faster than the client can consume it, you need backpressure — a way to slow down the producer before your memory fills up with unsent messages.
With mpsc::channel(N), you get this for free. The N is the buffer size. When the buffer is full, tx.send().await blocks the producing task until there's space. The client consumption rate naturally regulates the producer. No explicit flow control code required.
// Channel of 32 means at most 32 unsent messages buffered
// Producer naturally slows if the consumer falls behind
let (tx, rx) = mpsc::channel(32);
Pick your buffer size based on how much memory you're willing to use per client and how bursty your data is. For smooth metric streams, 16–32 is usually fine. For bursty event streams, go higher.
When to Use What
| Pattern | Reach for it when |
|---|---|
| Unary | Simple request/response. No streaming needed. Default choice. |
| Server streaming | Server has ongoing data to push. Replaces polling. Metrics, logs, live events. |
| Client streaming | Client sends bulk data. File uploads, batch ingest, sensor readings. |
| Bidirectional | Full duplex. Real-time control loops, alerting systems, collaborative features. |
The honest rule: start with unary. When you find yourself polling or building a WebSocket, reach for server streaming. When you're chunking large uploads over HTTP, reach for client streaming. When both sides need to talk simultaneously, bidirectional.
Wrapping Up
gRPC streaming isn't a niche feature for massive-scale systems — it's the right tool every time you need data to flow continuously between services. The patterns are clear, the Rust implementation with tonic is clean, and tokio's async machinery handles the hard parts (backpressure, disconnects, concurrency) without you having to think about them.
The Protobuf setup cost we talked about in the serialization post? This is exactly where you get it back. Generated types, schema-enforced contracts, binary efficiency — it all compounds when you're streaming millions of messages through a live system.
And the thousand clients polling your metrics endpoint every second? One server-streaming subscription each. Same data, a fraction of the requests, and no more "anything new?" — "nope, same as before."
Next in this series: Beyond HTTP in Rust — Real-Time Sockets with socketioxide and FTP with suppaftp. We'll build a file-upload notification system that ties both together.
Top comments (0)