DEV Community

Divya Darshana
Divya Darshana

Posted on

πŸš€ Building Observable gRPC Streaming Services with GoFr: A Step-by-Step Guide

gRPC + GoFr Observability

If you've ever built real-time systems with gRPC streaming, you know how challenging it can be to monitor long-lived connections. Today, I'll show you how to add full observability (logs, metrics, traces) to your gRPC streams using GoFr – with almost zero manual instrumentation!

Why This Matters?

gRPC streaming enables powerful patterns:

  • πŸ“‘ Real-time dashboards
  • πŸ“ˆ Live data pipelines
  • πŸ€– Chatbots and multiplayer games

But without proper observability:

  • ❌ You're blind to failing streams
  • ❌ Can't track message throughput
  • ❌ Hard to debug cross-service issues

GoFr solves this with automatic instrumentation for streaming endpoints!


πŸ› οΈ What We'll Build

A chat service demonstrating all 3 streaming types:

  1. Server Streaming: Push notifications
  2. Client Streaming: Batch processing
  3. Bidirectional: Real-time chat

With built-in:

  • βœ… Structured Logging
  • βœ… Prometheus Format Metrics
  • βœ… Tracing
  • βœ… Health checks

🧰 Prerequisites

  1. Go 1.20+ installed
  2. Basic gRPC knowledge

Prerequisites:

# Install GoFr CLI
go install gofr.dev/cli/gofr@latest
Enter fullscreen mode Exit fullscreen mode

Step 1: Project Setup

mkdir grpc-streaming-demo
cd grpc-streaming-demo
go mod init github.com/yourusername/grpc-streaming-demo

# Create proto directory
mkdir -p server
Enter fullscreen mode Exit fullscreen mode

Step 2: Define Service Contract (server/chat.proto)

syntax = "proto3";
option go_package = "github.com/yourusername/grpc-streaming-demo/server";

message ChatMessage {
  string user = 1;
  string content = 2;
}

service ChatService {
  // Server pushes notifications to client
  rpc Notifications(ChatMessage) returns (stream ChatMessage);

  // Client uploads message history
  rpc UploadHistory(stream ChatMessage) returns (ChatMessage);

  // Real-time bidirectional chat
  rpc LiveChat(stream ChatMessage) returns (stream ChatMessage);
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Generate Boilerplate Code

# Generate protobuf code
protoc \
    --go_out=. \
    --go_opt=paths=source_relative \
    --go-grpc_out=. \
    --go-grpc_opt=paths=source_relative \
    server/chat.proto

# Generate GoFr observability wrappers
gofr wrap grpc server -proto=server/chat.proto
Enter fullscreen mode Exit fullscreen mode

This creates 6 key files:

File that must not be modified -

  1. server/chat_grpc.pb.go - Standard gRPC code
  2. server/chat.pb.go - Standard gRPC request/response type to protobuf conversion
  3. server/chatservice_gofr.go - GoFr service scaffolding
  4. server/health_gofr.go - Registering default health service on your gRPC server
  5. server/request_gofr.go - Handling Request/response through GoFr's context.

Files that can be modified -

  1. server/chatservice_server.go - Your implementation template

Step 4: Implement Service Logic (server/chatservice_server.go)

// versions:
//  gofr-cli v0.6.0
//  gofr.dev v1.37.0
//  source: chat.proto

package server

import (
    "fmt"
    "io"
    "time"

    "gofr.dev/pkg/gofr"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

// Register the gRPC service in your app using the following code in your main.go:
//
// server.RegisterChatServiceServerWithGofr(app, &server.NewChatServiceGoFrServer())
//
// ChatServiceGoFrServer defines the gRPC server implementation.
// Customize the struct with required dependencies and fields as needed.

type ChatServiceGoFrServer struct {
    health *healthServer
}

// Server Streaming: Sends 5 notifications to the client.
func (s *ChatServiceGoFrServer) Notifications(
    ctx *gofr.Context,
    stream ChatService_NotificationsServer,
) error {
    req := ChatMessage{}

    err := ctx.Bind(&req)
    if err != nil {
        ctx.Logger.Errorf("Failed to receive request: %v", err)
        return status.Error(codes.InvalidArgument, "invalid initial request")
    }

    ctx.Logger.Infof("Starting notifications for user: %s", req.User)

    for i := 1; i <= 5; i++ {
        msg := &ChatMessage{
            User:    "System",
            Content: fmt.Sprintf("Notification %d", i),
        }

        if err := stream.Send(msg); err != nil {
            ctx.Logger.Errorf("Stream send error: %v", err)
            return status.Error(codes.Internal, "streaming failed")
        }

        time.Sleep(2 * time.Second)
    }

    return nil
}

// Client Streaming: Accepts multiple messages and returns a summary.
func (s *ChatServiceGoFrServer) UploadHistory(
    ctx *gofr.Context,
    stream ChatService_UploadHistoryServer,
) error {
    var count int

    defer func() {
        ctx.Logger.Infof("Processed %d historical messages", count)
    }()

    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            // Send summary response and close stream.
            return stream.SendAndClose(&ChatMessage{
                User:    "System",
                Content: fmt.Sprintf("Received %d messages", count),
            })
        }
        if err != nil {
            ctx.Logger.Errorf("Receive error: %v", err)
            return status.Errorf(codes.Internal, "failed to receive message: %v", err)
        }

        count++
        processMessage(msg)
    }
}

// Helper: Business logic placeholder.
func processMessage(msg *ChatMessage) {
    // Stub: Log or process each message as needed.
    // For example: store in DB, filter, etc.
}

// Bidirectional Streaming: Handles live chat interaction.
func (s *ChatServiceGoFrServer) LiveChat(
    ctx *gofr.Context,
    stream ChatService_LiveChatServer,
) error {
    errChan := make(chan error)

    ctx.Logger.Info("New chat session started")

    go func() {
        for {
            msg, err := stream.Recv()
            if err == io.EOF {
                errChan <- nil // Graceful end
                return
            }
            if err != nil {
                errChan <- status.Errorf(codes.Internal, "receive failed: %v", err)
                return
            }

            ctx.Logger.Infof("%s: %s", msg.User, msg.Content)

            resp := &ChatMessage{
                User:    "Server",
                Content: fmt.Sprintf("[%s] %s", time.Now().Format(time.RFC3339), msg.Content),
            }

            if err := stream.Send(resp); err != nil {
                errChan <- status.Errorf(codes.Internal, "send failed: %v", err)
                return
            }
        }
    }()

    // Wait for client disconnect or completion
    select {
    case err := <-errChan:
        return err
    case <-stream.Context().Done():
        ctx.Logger.Warn("Client disconnected unexpectedly")
        return status.Error(codes.Canceled, "client disconnected")
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: Configure Main Application (main.go)

package main

import (
    "github.com/yourusername/grpc-streaming-demo/server"
    "gofr.dev/pkg/gofr"
)

func main() {
    app := gofr.New()

    // Register service with auto-observability
    server.RegisterChatServiceServerWithGofr(app, &server.ChatServiceGoFrServer{})

    // Start server on port 9000
    app.Run()
}
Enter fullscreen mode Exit fullscreen mode

Step 6: Run & Test

# Start server
go run main.go
Enter fullscreen mode Exit fullscreen mode

Step 7: Test Notification Stream (Server Streaming)

Image description

πŸ” Observability in Action

1. Logs (structured JSON)

Image description

2. Metrics (Prometheus)

Image description

3. Traces (GoFr's Open Source Tracer)

Image description

Pro Tips πŸ’‘

  1. Add Custom Traces
span := ctx.Trace("some-work")
defer span.End()
Enter fullscreen mode Exit fullscreen mode
  1. Monitor Stream Health with in-built healthchecks

Image description

  1. Alert on Key Metrics
# prometheus-alerts.yml
- alert: HighStreamErrors
  expr: rate(grpc_stream_errors_total[5m]) > 0.1
Enter fullscreen mode Exit fullscreen mode

πŸŽ‰ Conclusion & Next Steps

Thats it ! You've just built a production-ready gRPC streaming service with:

βœ… Auto-instrumented observability

βœ… All 3 streaming types

βœ… Error handling and metrics

Next Level Ideas:

  • Add JWT authentication
  • Implement rate limiting
  • Build a React frontend

Resources:

This implementation was part of my open source Contribution on GoFr – would love your feedback!

Happy Streaming! πŸŽ₯

Let me know your use cases in the comments πŸ‘‡

Top comments (0)