If you've ever built real-time systems with gRPC streaming, you know how challenging it can be to monitor long-lived connections. Today, I'll show you how to add full observability (logs, metrics, traces) to your gRPC streams using GoFr β with almost zero manual instrumentation!
Why This Matters?
gRPC streaming enables powerful patterns:
- π‘ Real-time dashboards
- π Live data pipelines
- π€ Chatbots and multiplayer games
But without proper observability:
- β You're blind to failing streams
- β Can't track message throughput
- β Hard to debug cross-service issues
GoFr solves this with automatic instrumentation for streaming endpoints!
π οΈ What We'll Build
A chat service demonstrating all 3 streaming types:
- Server Streaming: Push notifications
- Client Streaming: Batch processing
- Bidirectional: Real-time chat
With built-in:
- β Structured Logging
- β Prometheus Format Metrics
- β Tracing
- β Health checks
π§° Prerequisites
- Go 1.20+ installed
- Basic gRPC knowledge
Prerequisites:
# Install GoFr CLI
go install gofr.dev/cli/gofr@latest
Step 1: Project Setup
mkdir grpc-streaming-demo
cd grpc-streaming-demo
go mod init github.com/yourusername/grpc-streaming-demo
# Create proto directory
mkdir -p server
Step 2: Define Service Contract (server/chat.proto
)
syntax = "proto3";
option go_package = "github.com/yourusername/grpc-streaming-demo/server";
message ChatMessage {
string user = 1;
string content = 2;
}
service ChatService {
// Server pushes notifications to client
rpc Notifications(ChatMessage) returns (stream ChatMessage);
// Client uploads message history
rpc UploadHistory(stream ChatMessage) returns (ChatMessage);
// Real-time bidirectional chat
rpc LiveChat(stream ChatMessage) returns (stream ChatMessage);
}
Step 3: Generate Boilerplate Code
# Generate protobuf code
protoc \
--go_out=. \
--go_opt=paths=source_relative \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
server/chat.proto
# Generate GoFr observability wrappers
gofr wrap grpc server -proto=server/chat.proto
This creates 6 key files:
File that must not be modified -
-
server/chat_grpc.pb.go
- Standard gRPC code -
server/chat.pb.go
- Standard gRPC request/response type to protobuf conversion -
server/chatservice_gofr.go
- GoFr service scaffolding -
server/health_gofr.go
- Registering default health service on your gRPC server -
server/request_gofr.go
- Handling Request/response through GoFr's context.
Files that can be modified -
-
server/chatservice_server.go
- Your implementation template
Step 4: Implement Service Logic (server/chatservice_server.go
)
// versions:
// gofr-cli v0.6.0
// gofr.dev v1.37.0
// source: chat.proto
package server
import (
"fmt"
"io"
"time"
"gofr.dev/pkg/gofr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Register the gRPC service in your app using the following code in your main.go:
//
// server.RegisterChatServiceServerWithGofr(app, &server.NewChatServiceGoFrServer())
//
// ChatServiceGoFrServer defines the gRPC server implementation.
// Customize the struct with required dependencies and fields as needed.
type ChatServiceGoFrServer struct {
health *healthServer
}
// Server Streaming: Sends 5 notifications to the client.
func (s *ChatServiceGoFrServer) Notifications(
ctx *gofr.Context,
stream ChatService_NotificationsServer,
) error {
req := ChatMessage{}
err := ctx.Bind(&req)
if err != nil {
ctx.Logger.Errorf("Failed to receive request: %v", err)
return status.Error(codes.InvalidArgument, "invalid initial request")
}
ctx.Logger.Infof("Starting notifications for user: %s", req.User)
for i := 1; i <= 5; i++ {
msg := &ChatMessage{
User: "System",
Content: fmt.Sprintf("Notification %d", i),
}
if err := stream.Send(msg); err != nil {
ctx.Logger.Errorf("Stream send error: %v", err)
return status.Error(codes.Internal, "streaming failed")
}
time.Sleep(2 * time.Second)
}
return nil
}
// Client Streaming: Accepts multiple messages and returns a summary.
func (s *ChatServiceGoFrServer) UploadHistory(
ctx *gofr.Context,
stream ChatService_UploadHistoryServer,
) error {
var count int
defer func() {
ctx.Logger.Infof("Processed %d historical messages", count)
}()
for {
msg, err := stream.Recv()
if err == io.EOF {
// Send summary response and close stream.
return stream.SendAndClose(&ChatMessage{
User: "System",
Content: fmt.Sprintf("Received %d messages", count),
})
}
if err != nil {
ctx.Logger.Errorf("Receive error: %v", err)
return status.Errorf(codes.Internal, "failed to receive message: %v", err)
}
count++
processMessage(msg)
}
}
// Helper: Business logic placeholder.
func processMessage(msg *ChatMessage) {
// Stub: Log or process each message as needed.
// For example: store in DB, filter, etc.
}
// Bidirectional Streaming: Handles live chat interaction.
func (s *ChatServiceGoFrServer) LiveChat(
ctx *gofr.Context,
stream ChatService_LiveChatServer,
) error {
errChan := make(chan error)
ctx.Logger.Info("New chat session started")
go func() {
for {
msg, err := stream.Recv()
if err == io.EOF {
errChan <- nil // Graceful end
return
}
if err != nil {
errChan <- status.Errorf(codes.Internal, "receive failed: %v", err)
return
}
ctx.Logger.Infof("%s: %s", msg.User, msg.Content)
resp := &ChatMessage{
User: "Server",
Content: fmt.Sprintf("[%s] %s", time.Now().Format(time.RFC3339), msg.Content),
}
if err := stream.Send(resp); err != nil {
errChan <- status.Errorf(codes.Internal, "send failed: %v", err)
return
}
}
}()
// Wait for client disconnect or completion
select {
case err := <-errChan:
return err
case <-stream.Context().Done():
ctx.Logger.Warn("Client disconnected unexpectedly")
return status.Error(codes.Canceled, "client disconnected")
}
}
Step 5: Configure Main Application (main.go
)
package main
import (
"github.com/yourusername/grpc-streaming-demo/server"
"gofr.dev/pkg/gofr"
)
func main() {
app := gofr.New()
// Register service with auto-observability
server.RegisterChatServiceServerWithGofr(app, &server.ChatServiceGoFrServer{})
// Start server on port 9000
app.Run()
}
Step 6: Run & Test
# Start server
go run main.go
Step 7: Test Notification Stream (Server Streaming)
π Observability in Action
1. Logs (structured JSON)
2. Metrics (Prometheus)
3. Traces (GoFr's Open Source Tracer)
Pro Tips π‘
- Add Custom Traces
span := ctx.Trace("some-work")
defer span.End()
- Monitor Stream Health with in-built healthchecks
- Alert on Key Metrics
# prometheus-alerts.yml
- alert: HighStreamErrors
expr: rate(grpc_stream_errors_total[5m]) > 0.1
π Conclusion & Next Steps
Thats it ! You've just built a production-ready gRPC streaming service with:
β
Auto-instrumented observability
β
All 3 streaming types
β
Error handling and metrics
Next Level Ideas:
- Add JWT authentication
- Implement rate limiting
- Build a React frontend
Resources:
This implementation was part of my open source Contribution on GoFr β would love your feedback!
Happy Streaming! π₯
Let me know your use cases in the comments π
Top comments (0)