DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

gRPC Service Templates

gRPC Service Templates

REST is great until you need streaming, strong typing, and sub-millisecond inter-service communication. This template collection gives you production-ready gRPC service definitions with Protocol Buffer schemas, server and client implementations in Python, Go, and Node.js, four streaming patterns (unary, server-stream, client-stream, bidirectional), health checking, reflection, and load balancing configs for Envoy and Kubernetes. Copy, customize, deploy.

Key Features

  • Protocol Buffer Schemas — Well-structured .proto files with proper package namespacing, field numbering conventions, and backward-compatible evolution patterns
  • Python gRPC Server & Client — Async server using grpcio with interceptors for logging, auth, and error mapping, plus a typed client wrapper
  • Go gRPC Server & Client — Idiomatic Go implementation with context propagation, graceful shutdown, and health check service
  • Node.js gRPC Server & Client — TypeScript-ready implementation using @grpc/grpc-js with Protobuf loader and middleware patterns
  • All Four Streaming Patterns — Unary, server streaming, client streaming, and bidirectional streaming examples with real use cases
  • Load Balancing Configs — Envoy proxy sidecar config and Kubernetes headless service for client-side load balancing
  • Health Checking & Reflection — Standard grpc.health.v1 service and server reflection for grpcurl debugging

Quick Start

  1. Define your service in Protocol Buffers:
// proto/user_service.proto
syntax = "proto3";
package acme.user.v1;

service UserService {
    rpc GetUser(GetUserRequest) returns (GetUserResponse);           // Unary
    rpc WatchUsers(WatchUsersRequest) returns (stream UserEvent);    // Server streaming
    rpc ImportUsers(stream ImportUserRequest) returns (ImportUsersResponse); // Client streaming
    rpc SyncUsers(stream SyncRequest) returns (stream SyncResponse); // Bidirectional
}

message GetUserRequest { string user_id = 1; }
message GetUserResponse { User user = 1; }

message User {
    string id = 1;
    string email = 2;
    string name = 3;
    Role role = 4;
    google.protobuf.Timestamp created_at = 5;
}

enum Role {
    ROLE_UNSPECIFIED = 0;
    ROLE_USER = 1;
    ROLE_ADMIN = 2;
}
Enter fullscreen mode Exit fullscreen mode
  1. Generate code and run the Python server:
# Generate Python stubs
python -m grpc_tools.protoc \
    --proto_path=proto \
    --python_out=src/generated \
    --grpc_python_out=src/generated \
    proto/user_service.proto

# Start the server
python -m src.server --port 50051 --config configs/development.yaml
Enter fullscreen mode Exit fullscreen mode
  1. Test with grpcurl:
grpcurl -plaintext -d '{"user_id": "42"}' \
    localhost:50051 acme.user.v1.UserService/GetUser
Enter fullscreen mode Exit fullscreen mode

Architecture

grpc-service-templates/
├── proto/
│   ├── user_service.proto       # Main service definition
│   ├── common/                  # Reusable pagination + error messages
│   └── buf.yaml                 # Buf linting config
├── src/
│   ├── python/                  # Async server, client, interceptors
│   ├── go/                      # Server with graceful shutdown, client with retry
│   └── node/                    # TypeScript server and client
├── configs/
│   ├── development.yaml
│   ├── production.yaml
│   └── envoy.yaml               # Envoy sidecar proxy config
└── k8s/                         # Deployment + headless service for client-side LB
Enter fullscreen mode Exit fullscreen mode

Usage Examples

Python Async Server with Interceptors

# src/python/server.py
import asyncio
import logging
import grpc
from grpc_reflection.v1alpha import reflection

logger = logging.getLogger(__name__)

class UserServicer:
    async def GetUser(self, request, context):
        user_id = request.user_id
        if not user_id:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("user_id is required")
            return GetUserResponse()
        user = await self._fetch_user(user_id)
        if not user:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {user_id} not found")
            return GetUserResponse()
        return GetUserResponse(user=user)

    async def WatchUsers(self, request, context):
        """Server streaming: yields UserEvent messages."""
        while not context.cancelled():
            event = await self._poll_events(request.filter)
            if event:
                yield UserEvent(user=event.user, action=event.action)
            await asyncio.sleep(1)

async def serve(port: int = 50051):
    server = grpc.aio.server(interceptors=[AuthInterceptor(), LoggingInterceptor()])
    add_UserServiceServicer_to_server(UserServicer(), server)
    reflection.enable_server_reflection(
        (reflection.SERVICE_NAME, "acme.user.v1.UserService"), server
    )
    server.add_insecure_port(f"[::]:{port}")
    await server.start()
    await server.wait_for_termination()
Enter fullscreen mode Exit fullscreen mode

Configuration

Key Type Default Description
server.port int 50051 gRPC server listen port
server.max_workers int 10 Thread pool size for sync calls
server.max_message_size int 4194304 Max message size in bytes (4MB)
auth.enabled bool true Enable JWT auth interceptor
auth.jwt_secret string required YOUR_JWT_SECRET_HERE
health_check.enabled bool true Enable health check service
reflection.enabled bool true Enable server reflection
logging.level string "INFO" Log level
retry.max_attempts int 3 Client retry attempts
retry.backoff_ms int 100 Initial retry backoff

Best Practices

  • Never reuse field numbers in protobuf. Deleted fields should be reserved to prevent silent data corruption in clients running older stubs.
  • Use UNSPECIFIED as enum value 0. Proto3 defaults to 0 for unset enums — if ROLE_USER = 0, you can't distinguish "set to USER" from "not set."
  • Set deadlines on every RPC call. A missing deadline means a stuck call waits forever. Use timeout=10 in Python, context.WithTimeout in Go.
  • Prefer server streaming over polling. A Watch() server-streaming RPC that pushes changes beats clients calling GetUpdates() every 5 seconds.
  • Version your proto packages. Use acme.user.v1 so you can introduce v2 without breaking existing clients.

Troubleshooting

"Received message larger than max" error
Increase max_message_size on both server and client. For bulk operations, prefer client streaming over one large message.

Client can't connect: "DNS resolution failed"
In Docker Compose, use the service name. In Kubernetes, use the fully qualified name (user-svc.namespace.svc.cluster.local).

Streaming RPC hangs after first message
Ensure you're using yield (Python) or stream.Send (Go) in a loop. For bidirectional streaming, both sides must read and write concurrently.


This is 1 of 7 resources in the API Developer Pro toolkit. Get the complete [gRPC Service Templates] with all files, templates, and documentation for $39.

Get the Full Kit →

Or grab the entire API Developer Pro bundle (7 products) for $79 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)