As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Imagine you're building a system where a server and a client need to have a continuous, two-way conversation. Think of a live chat, a real-time stock ticker, or a multiplayer game where positions update instantly. This isn't about a single request and response; it's about keeping a line open for constant back-and-forth. This is where gRPC's bidirectional streaming shines, and I want to show you how to build one that won't fall over when things get tough.
Let's start with the absolute core. gRPC is a modern framework for communication between services. It uses HTTP/2 under the hood, which is what allows for these persistent, multiplexed connections. A bidirectional streaming service means once a connection is established, both the client and the server can send a sequence of messages to each other, independently and at any time. It's a powerful pattern, but a raw stream is fragile. My goal is to wrap it in layers of management to make it solid and reliable.
I'll walk you through building a production-grade service. We'll manage connections intelligently, control the flow of data so no one gets overwhelmed, and handle all the errors and edge cases you'll encounter in the real world. I'll explain it piece by piece, with plenty of code.
First, let's define our main server structure. It's the central hub that holds everything together.
type StreamingServer struct {
server *grpc.Server
connections *ConnectionPool
sessions *SessionManager
metrics *StreamMetrics
config ServerConfig
}
The StreamingServer contains our gRPC server instance, but also the three key managers I've found essential: a ConnectionPool to track active clients, a SessionManager for stateful conversations, and a StreamMetrics collector to know what's happening. The ServerConfig holds all our tunable parameters, which is crucial for adapting to different environments.
Now, setting up the gRPC server itself isn't just about calling grpc.NewServer(). We need to configure it for resilience from the ground up.
func NewStreamingServer(config ServerConfig) *StreamingServer {
serverOptions := []grpc.ServerOption{
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 30 * time.Second, // Ping clients periodically
Timeout: 10 * time.Second, // Wait this long for a pong
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 15 * time.Second, // Minimum allowed client ping interval
PermitWithoutStream: true, // Allow pings even if no active stream
}),
grpc.MaxConcurrentStreams(uint32(config.MaxConcurrentStreams)),
grpc.MaxRecvMsgSize(config.MaxMessageSize),
grpc.MaxSendMsgSize(config.MaxMessageSize),
grpc.ConnectionTimeout(10 * time.Second),
}
// Add security with TLS
if config.TLSConfig != nil {
creds := credentials.NewTLS(config.TLSConfig)
serverOptions = append(serverOptions, grpc.Creds(creds))
}
// Add interceptors for logging and monitoring
serverOptions = append(serverOptions,
grpc.StreamInterceptor(streamInterceptor),
)
server := grpc.NewServer(serverOptions...)
return &StreamingServer{
server: server,
connections: NewConnectionPool(config.MaxConnections),
sessions: NewSessionManager(),
metrics: &StreamMetrics{},
config: config,
}
}
The keepalive parameters are your first line of defense against dead connections. They ensure the TCP link is alive even when no application data is flowing. The size and timeout limits protect your server from being bogged down by a single misbehaving client.
The heart of our service is the StreamData method. This is where a client connects and initiates the bidirectional stream.
func (ss *StreamingServer) StreamData(stream StreamService_StreamDataServer) error {
ctx := stream.Context()
clientID := extractClientID(ctx) // Get a unique ID for this client
// 1. Enforce Connection Limits
if ss.connections.Count() >= ss.config.MaxConnections {
return status.Errorf(codes.ResourceExhausted, "maximum connections reached")
}
// 2. Create and Register the Connection
conn := &ClientConnection{
ID: clientID,
Stream: stream,
SendQueue: make(chan *DataMessage, ss.config.QueueSize),
ReceiveQueue: make(chan *DataMessage, ss.config.QueueSize),
Status: StatusConnected,
LastActivity: time.Now(),
}
if err := ss.connections.Register(conn); err != nil {
return status.Error(codes.ResourceExhausted, err.Error())
}
defer ss.connections.Unregister(conn.ID) // Clean up no matter what
// 3. Launch Independent Sender and Receiver Routines
var wg sync.WaitGroup
errChan := make(chan error, 2)
wg.Add(2)
go func() { // Receiver
defer wg.Done()
if err := ss.handleIncomingMessages(conn); err != nil {
errChan <- err
}
}()
go func() { // Sender
defer wg.Done()
if err := ss.handleOutgoingMessages(conn); err != nil {
errChan <- err
}
}()
// 4. Wait for Both Routines to Finish
wg.Wait()
close(errChan)
// Return the first error that occurred, if any
if err, ok := <-errChan; ok {
return err
}
return nil
}
This pattern is vital. We separate the sending and receiving paths into their own goroutines. They run independently, connected only by the SendQueue and ReceiveQueue channels. This decoupling prevents a slow receiver on one side from blocking the sender on the other. The WaitGroup ensures we clean up all resources when the stream ends.
Let's look at the handleIncomingMessages method. Its job is to read from the client stream.
func (ss *StreamingServer) handleIncomingMessages(conn *ClientConnection) error {
for {
// Always use a timeout for receiving
ctx, cancel := context.WithTimeout(conn.Stream.Context(), ss.config.ReceiveTimeout)
msg, err := conn.Stream.Recv()
cancel()
if err != nil {
// Check if it's a graceful closure
if err == io.EOF || status.Code(err) == codes.Canceled {
return nil // Client closed gracefully
}
ss.metrics.IncrementReceiveErrors()
return status.Error(codes.Internal, "receive failed")
}
// Update last known activity
conn.UpdateActivity()
// Process based on message type
switch msg.Type {
case MessageTypeData:
go ss.processDataMessage(conn, msg) // Handle in background
case MessageTypeHeartbeat:
ss.processHeartbeat(conn, msg) // Just update activity
case MessageTypeControl:
ss.processControlMessage(conn, msg) // Pause, resume, etc.
}
ss.metrics.IncrementMessagesReceived(len(msg.Data))
}
}
Notice the timeout on Recv(). Without it, a network hiccup could leave a goroutine stuck forever. Also, see how data processing (processDataMessage) is often launched in its own goroutine? This keeps the receiver loop fast and responsive, dedicated only to reading from the network. The heartbeat messages are a simple but effective way for a client to say, "I'm still here," even when it has no data to send.
The sending side, handleOutgoingMessages, has a different challenge. It pulls messages from the SendQueue channel and writes them to the network.
func (ss *StreamingServer) handleOutgoingMessages(conn *ClientConnection) error {
heartbeatTicker := time.NewTicker(ss.config.HeartbeatInterval)
defer heartbeatTicker.Stop()
for {
select {
case msg := <-conn.SendQueue:
// Apply flow control before sending
if ss.shouldThrottle(conn) {
time.Sleep(ss.config.FlowControlDelay)
}
ctx, cancel := context.WithTimeout(conn.Stream.Context(), ss.config.SendTimeout)
err := conn.Stream.Send(msg)
cancel()
if err != nil {
ss.metrics.IncrementSendErrors()
return err // This will break the loop and end the goroutine
}
ss.metrics.IncrementMessagesSent(len(msg.Data))
case <-heartbeatTicker.C:
// Send a periodic heartbeat to the client
hb := &DataMessage{Type: MessageTypeHeartbeat, Timestamp: time.Now().Unix()}
select {
case conn.SendQueue <- hb: // Try to queue it
default:
// Queue is full, skip this heartbeat. It's okay.
}
case <-conn.Stream.Context().Done():
// The stream context is cancelled (client disconnected, server shutting down)
return nil
}
}
}
This is where flow control happens. The shouldThrottle function might check if we've sent too many messages too quickly to this client recently. If so, we inject a small delay. This is a simple form of backpressure, preventing us from flooding a slow client. The heartbeat ticker ensures the connection stays warm. The select statement with a default case for the heartbeat is non-blocking; if the queue is full, we skip it to avoid the sender goroutine getting stuck.
Now, what does a ClientConnection look like in the pool?
type ConnectionPool struct {
connections map[string]*ClientConnection
mu sync.RWMutex // Protects the map
capacity int
}
type ClientConnection struct {
ID string
Stream grpc.ServerStream
SendQueue chan *DataMessage
ReceiveQueue chan *DataMessage
Status ConnectionStatus
LastActivity time.Time
mu sync.RWMutex // Protects Status and LastActivity
sendCounter int // For simple rate limiting
lastSendTime time.Time
}
func (cp *ConnectionPool) Register(conn *ClientConnection) error {
cp.mu.Lock()
defer cp.mu.Unlock()
if len(cp.connections) >= cp.capacity {
return fmt.Errorf("connection pool at capacity: %d", cp.capacity)
}
cp.connections[conn.ID] = conn
return nil
}
The pool uses a read-write mutex (sync.RWMutex). When we just need to count connections or read one, we use a read lock, which allows many goroutines to do so concurrently. Only when adding or removing a connection do we use the exclusive write lock. The ClientConnection has its own mutex to guard fields like Status that might be read by a health-checker goroutine while being written by the sender or receiver.
Sessions add a layer of state on top of connections. A single client might have multiple logical conversations over the same network stream.
type SessionManager struct {
sessions map[string]*StreamSession // key: "clientID:sessionID"
mu sync.RWMutex
}
type StreamSession struct {
ID string
ClientID string
State SessionState
CreatedAt time.Time
LastActivity time.Time
MessageBuffer []*DataMessage // Recent messages for context
mu sync.RWMutex
}
func (sm *SessionManager) ProcessMessage(clientID string, msg *DataMessage) (*StreamSession, error) {
sessionKey := fmt.Sprintf("%s:%s", clientID, msg.SessionId)
sm.mu.Lock()
session, exists := sm.sessions[sessionKey]
if !exists {
session = &StreamSession{
ID: msg.SessionId,
ClientID: clientID,
State: SessionStateActive,
CreatedAt: time.Now(),
}
sm.sessions[sessionKey] = session
}
sm.mu.Unlock()
session.mu.Lock()
session.LastActivity = time.Now()
// ... process the message within the session context ...
session.mu.Unlock()
return session, nil
}
This allows you to handle things like resuming a chat history, pausing a file transfer, or managing independent game rooms, all within one physical connection.
A system like this is blind without metrics. We need to know how it's performing.
type StreamMetrics struct {
connectionsActive atomic.Int64
messagesReceived atomic.Int64
messagesSent atomic.Int64
bytesReceived atomic.Int64
bytesSent atomic.Int64
sendErrors atomic.Int64
receiveErrors atomic.Int64
}
func (sm *StreamMetrics) IncrementMessagesReceived(size int) {
sm.messagesReceived.Add(1)
sm.bytesReceived.Add(int64(size))
}
// ... other increment methods ...
func (sm *StreamMetrics) Report(logger *log.Logger) {
ticker := time.NewTicker(60 * time.Second)
for range ticker.C {
logger.Printf(
"METRICS: conns=%d, recv=%d/%dB, sent=%d/%dB, errs=(send:%d, recv:%d)",
sm.connectionsActive.Load(),
sm.messagesReceived.Load(), sm.bytesReceived.Load(),
sm.messagesSent.Load(), sm.bytesSent.Load(),
sm.sendErrors.Load(), sm.receiveErrors.Load(),
)
}
}
Using atomic operations means these counters can be updated safely from hundreds of concurrent goroutines without needing mutexes, which is a performance lifesaver. The periodic report gives you a pulse on the system's health and load.
Finally, we need a proactive health checker. It runs in the background, looking for stuck or dead connections.
func (ss *StreamingServer) StartHealthChecker() {
ticker := time.NewTicker(45 * time.Second)
for range ticker.C {
ss.connections.mu.RLock()
var staleConns []*ClientConnection
for _, conn := range ss.connections.connections {
conn.mu.RLock()
if time.Since(conn.LastActivity) > ss.config.MaxInactivityDuration {
staleConns = append(staleConns, conn)
}
conn.mu.RUnlock()
}
ss.connections.mu.RUnlock()
for _, conn := range staleConns {
// Send a final health probe
probe := &DataMessage{Type: MessageTypeHealthCheck}
select {
case conn.SendQueue <- probe:
// Probe sent. If the connection is alive, the sender will get it.
case <-time.After(2 * time.Second):
// Couldn't even queue the probe. Connection is likely dead.
conn.mu.Lock()
conn.Status = StatusDead
conn.mu.Unlock()
ss.connections.Unregister(conn.ID) // Clean it up
log.Printf("Cleaned up dead connection: %s", conn.ID)
}
}
}
}
This checker uses a read lock to snapshot the list of connections, then evaluates each one outside the main lock to avoid holding it for too long. If a connection hasn't shown any activity (data or heartbeat) for a configured duration, it sends a probe. If the probe can't even be queued within 2 seconds, we declare the connection dead and remove it from the pool.
Putting it all together in a main function shows the lifecycle.
func main() {
config := ServerConfig{
Port: 50051,
MaxConnections: 5000,
MaxMessageSize: 5 * 1024 * 1024, // 5MB
QueueSize: 512,
ReceiveTimeout: 25 * time.Second,
SendTimeout: 15 * time.Second,
HeartbeatInterval: 20 * time.Second,
MaxInactivityDuration: 2 * time.Minute,
FlowControlDelay: 100 * time.Millisecond,
}
server := NewStreamingServer(config)
RegisterStreamServiceServer(server.server, server)
// Start background managers
go server.metrics.Report(log.Default())
go server.StartHealthChecker()
// Start the gRPC server
lis, _ := net.Listen("tcp", fmt.Sprintf(":%d", config.Port))
log.Printf("Resilient gRPC streaming server listening on port %d", config.Port)
if err := server.server.Serve(lis); err != nil {
log.Fatalf("Server failed: %v", err)
}
}
Building this way, you end up with a service that can handle thousands of simultaneous real-time conversations. It respects resource limits, provides clear observability, and cleans up after itself. The client gets a stable, responsive channel, and you, as the developer, get a system that's easier to reason about and debug. You move from worrying about network sockets to focusing on what your messages actually mean—the business logic of your real-time application. That's the goal: to make the complex foundation simple and reliable, so you can build something great on top of it.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)