DEV Community

Cover image for Building a Scalable API Event Logger using Pub/Sub, and BigQuery
Idris Rampurawala
Idris Rampurawala

Posted on

Building a Scalable API Event Logger using Pub/Sub, and BigQuery

API logging is one of the most underrated aspects of backend engineering. We often rely on structured logs for debugging, but when it comes to analytics, user behavior tracking, or performance insights, traditional logs begin to show their limitations.

That’s where event-based API logging comes in.

In this post, I’ll walk you through building a simple but scalable API event logger using Go, Google Pub/Sub, and BigQuery. We’ll also see how this approach differs from traditional GCP logs and why it might be the better choice for data-driven teams.

☑️ Prerequisite

  • Before we begin, we are assuming that you have some knowledge of Middlewares, Google Cloud Pub/Sub, and BigQuery (data warehouse)
  • Though this approach can be implemented with any language/framework, I have created a working Go application project

Let's roll in!🍟


📇 Overview

Modern APIs generate a continuous stream of data about how users interact with your system — but traditional logging methods often treat this as an afterthought, providing debugging information rather than actionable events. Event-based API logging flips that model: every API interaction becomes a structured, analyzable event.

Before diving into how this logger works with Pub/Sub and BigQuery, let’s first understand what event-based API logging means and why conventional logging systems can fall short for analytics and long-term insights.

What does "event-based API logging" mean?

Event-based API logging treats each API request (and response) as a discrete event, capturing key attributes such as method, URL, user ID, timestamp, request/response payloads, and more. Unlike traditional logs that mix system messages, debug lines, and errors, an event-based approach focuses on meaningful business or technical interactions (e.g., "user 123 hit endpoint /v1/items", "response code 201").

By publishing these events to a message broker (such as Google Cloud Pub/Sub) and then storing them in an analytics store (like BigQuery), we enable downstream systems — dashboards, ML jobs, user-behavior pipelines — to unsubscribe, transform, and analyze without modifying the producer.

What are some limitations of "old-style" logging?

While tools like Google Cloud Logging (formerly Stackdriver) are great for operations, they impose constraints when we want long-term analytics or behavioural metrics. For instance, the default retention period for log buckets is 30 days, after which entries are deleted unless we change it.

On the querying side, Logs Explorer is purpose-built for debugging rather than large-scale analytics; we don't get the same SQL flexibility or performance as a dedicated data warehouse. In short, for one-off troubleshooting, it works; for streaming user-behaviour metrics, it often doesn't.

Now let's dive deep into our implementation 👇

🧠 API Logging Flow

The general idea is to capture all API calls using a middleware and send a Pub/Sub event with all the information. The subscriber then consumes this Pub/Sub message and creates a record in BigQuery.
Flowchart showing how API logs move from Client → API → Pub/Sub → GCP Subscriber → BigQuery
Here are the steps involved:

  1. Client makes an API call
  2. API Server (Go app in this example) processes the request and logs details like method, URL, user_id, request/response JSON
  3. The Logging Middleware publishes this log as an event to Pub/Sub
  4. A GCP BigQuery subscriber automatically consumes these events and stores them in a BigQuery table
  5. We can now run SQL queries on your API activity!

Now that we know the flow, let's discuss the Middleware implementation

⚙️ The Logging Middleware

The APILogEvent struct

// APILogEvent represents an API request/response log event
type APILogEvent struct {
    RequestID    null.String `json:"request_id"`
    Service      string      `json:"service"`
    URL          string      `json:"url"`
    Method       string      `json:"method"`
    ResponseCode int         `json:"response_code"`
    ResponseBody null.String `json:"response_body,omitempty"`
    RequestBody  null.String `json:"request_body,omitempty"`
    UserID       null.String `json:"user_id,omitempty"`
    Duration     float64     `json:"duration"`
    Version      string      `json:"version"`
    Name         string      `json:"name"`
    CreatedAt    time.Time   `json:"created_at"`
}
Enter fullscreen mode Exit fullscreen mode

Service - represents the API service name, so we can use it across multiple API services
UserID - represents the caller of the API

The Middleware

package middleware

import (
    "bytes"
    "context"
    "fmt"
    "io"
    "log"
    "net/http"
    "strings"
    "time"

    "api-pubsub-logger/internal/pubsub"
    "api-pubsub-logger/internal/utils"
    "api-pubsub-logger/pkg/logger"

    "github.com/gorilla/mux"
    "gopkg.in/guregu/null.v3"
)

// Routes to skip from logging (e.g., health checks)
var skipRoutes = map[string]struct{}{
    "GET::/health": {},
}

// responseRecorder is a wrapper for http.ResponseWriter to capture response data
type responseRecorder struct {
    http.ResponseWriter
    body       *bytes.Buffer
    statusCode int
}

func (rw *responseRecorder) Write(b []byte) (int, error) {
    rw.body.Write(b)
    return rw.ResponseWriter.Write(b)
}

func (rw *responseRecorder) WriteHeader(statusCode int) {
    rw.statusCode = statusCode
    rw.ResponseWriter.WriteHeader(statusCode)
}

// LoggingMiddleware logs HTTP requests and responses to Pub/Sub
func LoggingMiddleware(pubsubClient pubsub.Publisher, serviceName string) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            // Check if the request URL path should be skipped
            routeKey := fmt.Sprintf("%s::%s", strings.ToUpper(r.Method), r.URL.Path)
            if _, skip := skipRoutes[routeKey]; skip {
                next.ServeHTTP(w, r)
                return
            }

            startTime := time.Now()

            // Read request body
            var requestBody []byte
            if r.Body != nil {
                requestBody, _ = io.ReadAll(r.Body)
                r.Body = io.NopCloser(bytes.NewBuffer(requestBody)) // Restore the request body
            }

            // Create response recorder to capture response
            recorder := &responseRecorder{
                ResponseWriter: w,
                body:           &bytes.Buffer{},
                statusCode:     http.StatusOK,
            }

            // Call the next handler
            next.ServeHTTP(recorder, r)

            // Extract context values
            ctx := r.Context()
            requestID := utils.GetRequestID(ctx)
            userID := utils.GetUserID(ctx)

            // Mask sensitive data in request and response bodies
            maskedRequestBody := string(utils.MaskSensitiveData(requestBody))
            maskedResponseBody := string(utils.MaskSensitiveData(recorder.body.Bytes()))

            // Extract route version and name from mux router
            routeName, routeVersion := extractRouteVersionAndName(mux.CurrentRoute(r))

            // Create API log event
            logData := logger.APILogEvent{
                RequestID:    null.NewString(requestID, len(requestID) > 0),
                Service:      serviceName,
                Method:       r.Method,
                URL:          r.URL.String(),
                RequestBody:  null.NewString(maskedRequestBody, len(maskedRequestBody) > 0),
                ResponseBody: null.NewString(maskedResponseBody, len(maskedResponseBody) > 0),
                ResponseCode: recorder.statusCode,
                UserID:       null.NewString(userID, len(userID) > 0),
                Version:      routeVersion,
                Name:         routeName,
                CreatedAt:    startTime,
                Duration:     time.Since(startTime).Seconds(),
            }

            // Publish to Pub/Sub asynchronously using background context
            // We use context.Background() instead of the request context because
            // the request context gets canceled when the HTTP response is sent,
            // but we want the publishing to complete independently
            go sendToPubSub(context.Background(), pubsubClient, logData)
        })
    }
}

// extractRouteVersionAndName extracts the name and version from the mux route
func extractRouteVersionAndName(route *mux.Route) (string, string) {
    var name, version string
    if route != nil {
        name = route.GetName()
        pathTemplate, _ := route.GetPathTemplate()
        parts := strings.Split(strings.Trim(pathTemplate, "/"), "/")
        if len(parts) > 0 && strings.HasPrefix(parts[0], "v") {
            version = parts[0]
        }
    }
    return name, version
}

// sendToPubSub publishes log data to Pub/Sub
func sendToPubSub(ctx context.Context, client pubsub.Publisher, logData logger.APILogEvent) {
    if err := client.PublishAPILogEvent(ctx, logData); err != nil {
        log.Printf("Failed to publish API log event: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

The router can be set up as

package http

import (
    "net/http"

    "api-pubsub-logger/internal/http/handlers"
    "api-pubsub-logger/internal/http/middleware"

    "github.com/gorilla/mux"
)

// Handler mounts all the handlers at the appropriate routes and adds any required middleware
func (h *Handler) Handler() http.Handler {
    r := mux.NewRouter()

    // Apply global middleware
    r.Use(middleware.RequestIDMiddleware)
    r.Use(middleware.UserIDMiddleware)
    r.Use(middleware.LoggingMiddleware(h.PubSubClient, h.ServiceName))

    // Health check endpoint (not logged due to skip in middleware)
    r.Methods("GET").Path("/health").Name("health").HandlerFunc(handlers.HealthCheck)

    h.router = r
    return r
}
Enter fullscreen mode Exit fullscreen mode

Let's understand a bit deeper what's happening 🙇

  • The Pub/Sub client is set up in main.go and part of the HTTP Handler struct, which the Middleware then accesses
  • We allow skipping logging for the API/routes using skipRoutes map
  • To ensure sensitive information is not exposed, we define a map sensitiveKeys defined in utils/mask.go that lists fields to mask
  • The logs are pushed to Pub/Sub in the background, so the API latency remains unaffected

🧩 Integration with BigQuery

Once Pub/Sub receives the log events, we can configure a direct BigQuery subscription in GCP.
👉 Stream Pub/Sub messages to BigQuery

Each message is appended as a new row in our api_logs table — matching the APILogEvent structure. BigQuery automatically maps these fields when the Pub/Sub topic is linked to a dataset.

That's it! 🎯

🤔 Why Not Just Use GCP Structured Logs?

GCP’s structured logs are powerful but not ideal for data analytics use cases.

Feature Cloud Logging Pub/Sub + BigQuery Logger
Query Language Logs Explorer (limited SQL) Full BigQuery SQL
Query performance Slow for analytics Fast, SQL-based via BigQuery
Custom schema Limited Fully customizable
Integration GCP only Can export anywhere

📊 What We Can Do with the Data

Once in BigQuery, we can easily compute metrics such as:

  • API response time in last 7 days
SELECT name, version, COUNT(*) api_count, MIN(duration) min_duration, AVG(duration) avg_duration,  MAX(duration) max_duration
FROM `api_logs` 
WHERE 
  created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY name, version
ORDER BY avg_duration DESC;
Enter fullscreen mode Exit fullscreen mode

🚀 Benefits

✅ Asynchronous Logging — doesn’t block API latency.
✅ Scalable — Pub/Sub handles millions of events.
✅ Analytics Ready — BigQuery can query directly.
✅ Customizable Schema — include app version, route, or user agent.
✅ Cross-Language Ready — can be replicated in Node.js, Python, etc.


📌 You can check out my GitHub repository for a complete working example of this approach 👇

GitHub logo idris-rampurawala / api-pubsub-logger

Demo Go API with middleware that logs requests/responses to Google Cloud Pub/Sub. Features versioned routes, sensitive data masking, request tracking, and comprehensive unit tests.

API Pub/Sub Logger

A demonstration project showcasing how to log API requests and responses to Google Cloud Pub/Sub for later analysis and debugging.

Blog Post

For a detailed explanation of this implementation, read the full blog post [Coming Soon - Link will be added here]

Overview

This project demonstrates a pattern for logging all API requests and responses to Google Cloud Pub/Sub. The middleware captures:

  • Request/Response bodies (with sensitive data masking)
  • HTTP method and URL
  • Response status codes
  • Request duration
  • Request IDs for tracing
  • User IDs from headers

All logs are published to a Pub/Sub topic which can then be consumed by subscribers to store in BigQuery or other analytics platforms.

Features

  • Middleware-based logging: Automatic logging of all API requests
  • Sensitive data masking: Automatically redacts email, phone numbers, and other sensitive fields
  • Request ID tracking: Unique ID for each request for distributed tracing
  • API versioning: Extracts…

📑 References


See ya! until my next post 😋

Top comments (0)