DEV Community

Ayush Raj Jha
Ayush Raj Jha

Posted on

How I Built a Real-Time Data Pipeline Using Google Cloud (And What I Learned the Hard Way)

gcp

A practical walkthrough of building a production-grade streaming pipeline with Pub/Sub, Dataflow, and BigQuery — including the mistakes I made so you don't have to.


The Problem

Our platform was generating millions of user events per day — clicks, purchases, errors, session data. We were batch-processing everything nightly with a cron job and an aging ETL script. By the time analysts ran their morning reports, the "live" data was already 8 hours stale.

The business need was clear: we needed real-time insights. A customer cancels their subscription — we want to trigger a retention workflow now, not tomorrow morning.

This is the story of how I built that pipeline on Google Cloud, what worked, what blew up in production, and what I'd do differently.


Architecture Overview

[App Servers]
     |
     ▼
[Cloud Pub/Sub]              ← ingestion layer, durable buffer
     |
     ▼
[Dataflow (Apache Beam)]     ← stream processing, transformations
     |
     ├──► [BigQuery]         → analytics warehouse
     ├──► [Cloud Bigtable]   → low-latency lookups
     └──► [Pub/Sub (out)]    → trigger downstream systems
Enter fullscreen mode Exit fullscreen mode

Each layer has a single responsibility. Let me walk through each one.


Step 1: Event Ingestion with Cloud Pub/Sub

Pub/Sub is Google's managed messaging service — think fully-managed Kafka without the operational overhead.

from google.cloud import pubsub_v1
import json

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "user-events")

def publish_event(event: dict):
    data = json.dumps(event).encode("utf-8")
    future = publisher.publish(
        topic_path,
        data,
        event_type=event.get("type", "unknown"),
        source_service=event.get("service", "unknown"),
    )
    return future.result()

# Example
publish_event({
    "user_id": "u_12345",
    "type": "subscription_cancelled",
    "timestamp": "2024-03-15T10:23:45Z",
    "plan": "pro",
    "reason": "too_expensive"
})
Enter fullscreen mode Exit fullscreen mode

💡 Lesson #1: Always set message attributes. You'll want to filter downstream without deserializing the full payload.

💡 Lesson #2: Pub/Sub guarantees at-least-once delivery. Your consumers must be idempotent. We got burned when a retry storm caused duplicate records in BigQuery.


Step 2: Stream Processing with Dataflow

Dataflow is Google's managed Apache Beam runner. You write the pipeline once; Dataflow handles autoscaling and fault tolerance.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import json
from datetime import datetime

class ParseAndEnrich(beam.DoFn):
    def process(self, message):
        try:
            event = json.loads(message.data.decode("utf-8"))
            event["ingestion_timestamp"] = datetime.utcnow().isoformat()
            event["pipeline_version"] = "2.1.0"

            if not event.get("user_id"):
                beam.metrics.Metrics.counter("pipeline", "invalid_events").inc()
                return

            yield event
        except json.JSONDecodeError:
            yield beam.pvalue.TaggedOutput("dead_letter", message)


def run_pipeline():
    options = PipelineOptions([
        "--project=my-project",
        "--runner=DataflowRunner",
        "--region=us-central1",
        "--streaming",
        "--enable_streaming_engine",
    ])

    with beam.Pipeline(options=options) as p:
        events = (
            p
            | "Read Pub/Sub" >> ReadFromPubSub(
                subscription="projects/my-project/subscriptions/user-events-sub",
                with_attributes=True,
            )
            | "Parse & Enrich" >> beam.ParDo(ParseAndEnrich()).with_outputs(
                "dead_letter", main="valid"
            )
        )

        (
            events.valid
            | "1-min Windows" >> beam.WindowInto(beam.transforms.window.FixedWindows(60))
            | "Write BQ" >> WriteToBigQuery(
                table="my-project:analytics.user_events",
                schema="AUTO_DETECT",
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                method=WriteToBigQuery.Method.STREAMING_INSERTS,
            )
        )

        (
            events.dead_letter
            | "Write DLQ" >> beam.io.WriteToPubSub(
                topic="projects/my-project/topics/dead-letter"
            )
        )
Enter fullscreen mode Exit fullscreen mode

💡 Lesson #3: Always implement a dead-letter queue. In week one, ~0.3% of events had malformed timestamps from a mobile client bug. Without a DLQ, they would have silently failed.

💡 Lesson #4: Windowing is a latency/cost tradeoff. Moving from 5-minute to 1-minute windows raised our Dataflow costs ~40%. Know your SLA before choosing.


Step 3: BigQuery as the Analytics Layer

CREATE TABLE analytics.user_events (
    event_id        STRING NOT NULL,
    user_id         STRING NOT NULL,
    event_type      STRING NOT NULL,
    timestamp       TIMESTAMP NOT NULL,
    ingestion_ts    TIMESTAMP NOT NULL,
    session_id      STRING,
    properties      JSON
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, user_id;
Enter fullscreen mode Exit fullscreen mode

💡 Lesson #5: Partition + cluster from day one. Our query costs dropped 60% after adding proper clustering.

💡 Lesson #6: Use insertId for deduplication — BigQuery's dedup window is only ~1 minute:

row_to_insert = {
    "insertId": event["event_id"],
    "json": event
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Observability

class ProcessEvents(beam.DoFn):
    def __init__(self):
        self.events_processed = beam.metrics.Metrics.counter("pipeline", "events_processed")
        self.latency = beam.metrics.Metrics.distribution("pipeline", "event_latency_ms")

    def process(self, event):
        import time
        start = time.time()
        # ... processing logic ...
        self.latency.update(int((time.time() - start) * 1000))
        self.events_processed.inc()
        yield event
Enter fullscreen mode Exit fullscreen mode
Alert Threshold Action
Pub/Sub oldest message age > 5 minutes Page on-call
Dataflow worker errors > 1% of events Slack alert
BigQuery streaming errors Any PagerDuty
Dead-letter queue depth > 1000 messages Investigate

Results After 3 Months in Production

  • Latency: 8 hours → under 90 seconds end-to-end
  • Reliability: 99.97% of events land in BigQuery successfully
  • Cost: ~$340/month for ~50M events/day
  • Developer velocity: analysts self-serve on real-time data without waiting for nightly runs

What I'd Do Differently

  1. Start with a schema registry from day one. Multiple teams publishing inconsistent schemas cost weeks of debugging.
  2. Use Dataflow Flex Templates earlier. Classic Templates hit limitations fast.
  3. Set Pub/Sub retention to 7 days minimum. We had it at 1 day in staging; when a bug caused a processing halt, we lost messages we could have replayed.

Resources


Have you built something similar? I'd love to hear how you approached deduplication — drop a comment below.

Tags: #googlecloud #dataengineering #gcp #bigquery #dataflow #pubsub #streaming

Top comments (0)