DEV Community

Cover image for # How to Build a Production-Grade gRPC Bidirectional Streaming Server in Go
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

# How to Build a Production-Grade gRPC Bidirectional Streaming Server in Go

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Imagine you're building a system where a server and a client need to have a continuous, two-way conversation. Think of a live chat, a real-time stock ticker, or a multiplayer game where positions update instantly. This isn't about a single request and response; it's about keeping a line open for constant back-and-forth. This is where gRPC's bidirectional streaming shines, and I want to show you how to build one that won't fall over when things get tough.

Let's start with the absolute core. gRPC is a modern framework for communication between services. It uses HTTP/2 under the hood, which is what allows for these persistent, multiplexed connections. A bidirectional streaming service means once a connection is established, both the client and the server can send a sequence of messages to each other, independently and at any time. It's a powerful pattern, but a raw stream is fragile. My goal is to wrap it in layers of management to make it solid and reliable.

I'll walk you through building a production-grade service. We'll manage connections intelligently, control the flow of data so no one gets overwhelmed, and handle all the errors and edge cases you'll encounter in the real world. I'll explain it piece by piece, with plenty of code.

First, let's define our main server structure. It's the central hub that holds everything together.

type StreamingServer struct {
    server        *grpc.Server
    connections   *ConnectionPool
    sessions      *SessionManager
    metrics       *StreamMetrics
    config        ServerConfig
}
Enter fullscreen mode Exit fullscreen mode

The StreamingServer contains our gRPC server instance, but also the three key managers I've found essential: a ConnectionPool to track active clients, a SessionManager for stateful conversations, and a StreamMetrics collector to know what's happening. The ServerConfig holds all our tunable parameters, which is crucial for adapting to different environments.

Now, setting up the gRPC server itself isn't just about calling grpc.NewServer(). We need to configure it for resilience from the ground up.

func NewStreamingServer(config ServerConfig) *StreamingServer {
    serverOptions := []grpc.ServerOption{
        grpc.KeepaliveParams(keepalive.ServerParameters{
            Time:    30 * time.Second, // Ping clients periodically
            Timeout: 10 * time.Second, // Wait this long for a pong
        }),
        grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
            MinTime:             15 * time.Second, // Minimum allowed client ping interval
            PermitWithoutStream: true, // Allow pings even if no active stream
        }),
        grpc.MaxConcurrentStreams(uint32(config.MaxConcurrentStreams)),
        grpc.MaxRecvMsgSize(config.MaxMessageSize),
        grpc.MaxSendMsgSize(config.MaxMessageSize),
        grpc.ConnectionTimeout(10 * time.Second),
    }

    // Add security with TLS
    if config.TLSConfig != nil {
        creds := credentials.NewTLS(config.TLSConfig)
        serverOptions = append(serverOptions, grpc.Creds(creds))
    }

    // Add interceptors for logging and monitoring
    serverOptions = append(serverOptions,
        grpc.StreamInterceptor(streamInterceptor),
    )

    server := grpc.NewServer(serverOptions...)
    return &StreamingServer{
        server:      server,
        connections: NewConnectionPool(config.MaxConnections),
        sessions:    NewSessionManager(),
        metrics:     &StreamMetrics{},
        config:      config,
    }
}
Enter fullscreen mode Exit fullscreen mode

The keepalive parameters are your first line of defense against dead connections. They ensure the TCP link is alive even when no application data is flowing. The size and timeout limits protect your server from being bogged down by a single misbehaving client.

The heart of our service is the StreamData method. This is where a client connects and initiates the bidirectional stream.

func (ss *StreamingServer) StreamData(stream StreamService_StreamDataServer) error {
    ctx := stream.Context()
    clientID := extractClientID(ctx) // Get a unique ID for this client

    // 1. Enforce Connection Limits
    if ss.connections.Count() >= ss.config.MaxConnections {
        return status.Errorf(codes.ResourceExhausted, "maximum connections reached")
    }

    // 2. Create and Register the Connection
    conn := &ClientConnection{
        ID:           clientID,
        Stream:       stream,
        SendQueue:    make(chan *DataMessage, ss.config.QueueSize),
        ReceiveQueue: make(chan *DataMessage, ss.config.QueueSize),
        Status:       StatusConnected,
        LastActivity: time.Now(),
    }

    if err := ss.connections.Register(conn); err != nil {
        return status.Error(codes.ResourceExhausted, err.Error())
    }
    defer ss.connections.Unregister(conn.ID) // Clean up no matter what

    // 3. Launch Independent Sender and Receiver Routines
    var wg sync.WaitGroup
    errChan := make(chan error, 2)
    wg.Add(2)

    go func() { // Receiver
        defer wg.Done()
        if err := ss.handleIncomingMessages(conn); err != nil {
            errChan <- err
        }
    }()

    go func() { // Sender
        defer wg.Done()
        if err := ss.handleOutgoingMessages(conn); err != nil {
            errChan <- err
        }
    }()

    // 4. Wait for Both Routines to Finish
    wg.Wait()
    close(errChan)

    // Return the first error that occurred, if any
    if err, ok := <-errChan; ok {
        return err
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

This pattern is vital. We separate the sending and receiving paths into their own goroutines. They run independently, connected only by the SendQueue and ReceiveQueue channels. This decoupling prevents a slow receiver on one side from blocking the sender on the other. The WaitGroup ensures we clean up all resources when the stream ends.

Let's look at the handleIncomingMessages method. Its job is to read from the client stream.

func (ss *StreamingServer) handleIncomingMessages(conn *ClientConnection) error {
    for {
        // Always use a timeout for receiving
        ctx, cancel := context.WithTimeout(conn.Stream.Context(), ss.config.ReceiveTimeout)
        msg, err := conn.Stream.Recv()
        cancel()

        if err != nil {
            // Check if it's a graceful closure
            if err == io.EOF || status.Code(err) == codes.Canceled {
                return nil // Client closed gracefully
            }
            ss.metrics.IncrementReceiveErrors()
            return status.Error(codes.Internal, "receive failed")
        }

        // Update last known activity
        conn.UpdateActivity()

        // Process based on message type
        switch msg.Type {
        case MessageTypeData:
            go ss.processDataMessage(conn, msg) // Handle in background
        case MessageTypeHeartbeat:
            ss.processHeartbeat(conn, msg) // Just update activity
        case MessageTypeControl:
            ss.processControlMessage(conn, msg) // Pause, resume, etc.
        }
        ss.metrics.IncrementMessagesReceived(len(msg.Data))
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice the timeout on Recv(). Without it, a network hiccup could leave a goroutine stuck forever. Also, see how data processing (processDataMessage) is often launched in its own goroutine? This keeps the receiver loop fast and responsive, dedicated only to reading from the network. The heartbeat messages are a simple but effective way for a client to say, "I'm still here," even when it has no data to send.

The sending side, handleOutgoingMessages, has a different challenge. It pulls messages from the SendQueue channel and writes them to the network.

func (ss *StreamingServer) handleOutgoingMessages(conn *ClientConnection) error {
    heartbeatTicker := time.NewTicker(ss.config.HeartbeatInterval)
    defer heartbeatTicker.Stop()

    for {
        select {
        case msg := <-conn.SendQueue:
            // Apply flow control before sending
            if ss.shouldThrottle(conn) {
                time.Sleep(ss.config.FlowControlDelay)
            }

            ctx, cancel := context.WithTimeout(conn.Stream.Context(), ss.config.SendTimeout)
            err := conn.Stream.Send(msg)
            cancel()

            if err != nil {
                ss.metrics.IncrementSendErrors()
                return err // This will break the loop and end the goroutine
            }
            ss.metrics.IncrementMessagesSent(len(msg.Data))

        case <-heartbeatTicker.C:
            // Send a periodic heartbeat to the client
            hb := &DataMessage{Type: MessageTypeHeartbeat, Timestamp: time.Now().Unix()}
            select {
            case conn.SendQueue <- hb: // Try to queue it
            default:
                // Queue is full, skip this heartbeat. It's okay.
            }

        case <-conn.Stream.Context().Done():
            // The stream context is cancelled (client disconnected, server shutting down)
            return nil
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This is where flow control happens. The shouldThrottle function might check if we've sent too many messages too quickly to this client recently. If so, we inject a small delay. This is a simple form of backpressure, preventing us from flooding a slow client. The heartbeat ticker ensures the connection stays warm. The select statement with a default case for the heartbeat is non-blocking; if the queue is full, we skip it to avoid the sender goroutine getting stuck.

Now, what does a ClientConnection look like in the pool?

type ConnectionPool struct {
    connections map[string]*ClientConnection
    mu          sync.RWMutex // Protects the map
    capacity    int
}

type ClientConnection struct {
    ID            string
    Stream        grpc.ServerStream
    SendQueue     chan *DataMessage
    ReceiveQueue  chan *DataMessage
    Status        ConnectionStatus
    LastActivity  time.Time
    mu            sync.RWMutex // Protects Status and LastActivity
    sendCounter   int          // For simple rate limiting
    lastSendTime  time.Time
}

func (cp *ConnectionPool) Register(conn *ClientConnection) error {
    cp.mu.Lock()
    defer cp.mu.Unlock()
    if len(cp.connections) >= cp.capacity {
        return fmt.Errorf("connection pool at capacity: %d", cp.capacity)
    }
    cp.connections[conn.ID] = conn
    return nil
}
Enter fullscreen mode Exit fullscreen mode

The pool uses a read-write mutex (sync.RWMutex). When we just need to count connections or read one, we use a read lock, which allows many goroutines to do so concurrently. Only when adding or removing a connection do we use the exclusive write lock. The ClientConnection has its own mutex to guard fields like Status that might be read by a health-checker goroutine while being written by the sender or receiver.

Sessions add a layer of state on top of connections. A single client might have multiple logical conversations over the same network stream.

type SessionManager struct {
    sessions map[string]*StreamSession // key: "clientID:sessionID"
    mu       sync.RWMutex
}

type StreamSession struct {
    ID           string
    ClientID     string
    State        SessionState
    CreatedAt    time.Time
    LastActivity time.Time
    MessageBuffer []*DataMessage // Recent messages for context
    mu           sync.RWMutex
}

func (sm *SessionManager) ProcessMessage(clientID string, msg *DataMessage) (*StreamSession, error) {
    sessionKey := fmt.Sprintf("%s:%s", clientID, msg.SessionId)

    sm.mu.Lock()
    session, exists := sm.sessions[sessionKey]
    if !exists {
        session = &StreamSession{
            ID:        msg.SessionId,
            ClientID:  clientID,
            State:     SessionStateActive,
            CreatedAt: time.Now(),
        }
        sm.sessions[sessionKey] = session
    }
    sm.mu.Unlock()

    session.mu.Lock()
    session.LastActivity = time.Now()
    // ... process the message within the session context ...
    session.mu.Unlock()

    return session, nil
}
Enter fullscreen mode Exit fullscreen mode

This allows you to handle things like resuming a chat history, pausing a file transfer, or managing independent game rooms, all within one physical connection.

A system like this is blind without metrics. We need to know how it's performing.

type StreamMetrics struct {
    connectionsActive  atomic.Int64
    messagesReceived   atomic.Int64
    messagesSent       atomic.Int64
    bytesReceived      atomic.Int64
    bytesSent          atomic.Int64
    sendErrors         atomic.Int64
    receiveErrors      atomic.Int64
}

func (sm *StreamMetrics) IncrementMessagesReceived(size int) {
    sm.messagesReceived.Add(1)
    sm.bytesReceived.Add(int64(size))
}

// ... other increment methods ...

func (sm *StreamMetrics) Report(logger *log.Logger) {
    ticker := time.NewTicker(60 * time.Second)
    for range ticker.C {
        logger.Printf(
            "METRICS: conns=%d, recv=%d/%dB, sent=%d/%dB, errs=(send:%d, recv:%d)",
            sm.connectionsActive.Load(),
            sm.messagesReceived.Load(), sm.bytesReceived.Load(),
            sm.messagesSent.Load(), sm.bytesSent.Load(),
            sm.sendErrors.Load(), sm.receiveErrors.Load(),
        )
    }
}
Enter fullscreen mode Exit fullscreen mode

Using atomic operations means these counters can be updated safely from hundreds of concurrent goroutines without needing mutexes, which is a performance lifesaver. The periodic report gives you a pulse on the system's health and load.

Finally, we need a proactive health checker. It runs in the background, looking for stuck or dead connections.

func (ss *StreamingServer) StartHealthChecker() {
    ticker := time.NewTicker(45 * time.Second)
    for range ticker.C {
        ss.connections.mu.RLock()
        var staleConns []*ClientConnection
        for _, conn := range ss.connections.connections {
            conn.mu.RLock()
            if time.Since(conn.LastActivity) > ss.config.MaxInactivityDuration {
                staleConns = append(staleConns, conn)
            }
            conn.mu.RUnlock()
        }
        ss.connections.mu.RUnlock()

        for _, conn := range staleConns {
            // Send a final health probe
            probe := &DataMessage{Type: MessageTypeHealthCheck}
            select {
            case conn.SendQueue <- probe:
                // Probe sent. If the connection is alive, the sender will get it.
            case <-time.After(2 * time.Second):
                // Couldn't even queue the probe. Connection is likely dead.
                conn.mu.Lock()
                conn.Status = StatusDead
                conn.mu.Unlock()
                ss.connections.Unregister(conn.ID) // Clean it up
                log.Printf("Cleaned up dead connection: %s", conn.ID)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This checker uses a read lock to snapshot the list of connections, then evaluates each one outside the main lock to avoid holding it for too long. If a connection hasn't shown any activity (data or heartbeat) for a configured duration, it sends a probe. If the probe can't even be queued within 2 seconds, we declare the connection dead and remove it from the pool.

Putting it all together in a main function shows the lifecycle.

func main() {
    config := ServerConfig{
        Port:                  50051,
        MaxConnections:        5000,
        MaxMessageSize:        5 * 1024 * 1024, // 5MB
        QueueSize:             512,
        ReceiveTimeout:        25 * time.Second,
        SendTimeout:           15 * time.Second,
        HeartbeatInterval:     20 * time.Second,
        MaxInactivityDuration: 2 * time.Minute,
        FlowControlDelay:      100 * time.Millisecond,
    }

    server := NewStreamingServer(config)
    RegisterStreamServiceServer(server.server, server)

    // Start background managers
    go server.metrics.Report(log.Default())
    go server.StartHealthChecker()

    // Start the gRPC server
    lis, _ := net.Listen("tcp", fmt.Sprintf(":%d", config.Port))
    log.Printf("Resilient gRPC streaming server listening on port %d", config.Port)
    if err := server.server.Serve(lis); err != nil {
        log.Fatalf("Server failed: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Building this way, you end up with a service that can handle thousands of simultaneous real-time conversations. It respects resource limits, provides clear observability, and cleans up after itself. The client gets a stable, responsive channel, and you, as the developer, get a system that's easier to reason about and debug. You move from worrying about network sockets to focusing on what your messages actually mean—the business logic of your real-time application. That's the goal: to make the complex foundation simple and reliable, so you can build something great on top of it.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)