A practical walkthrough of building a production-grade streaming pipeline with Pub/Sub, Dataflow, and BigQuery — including the mistakes I made so you don't have to.
The Problem
Our platform was generating millions of user events per day — clicks, purchases, errors, session data. We were batch-processing everything nightly with a cron job and an aging ETL script. By the time analysts ran their morning reports, the "live" data was already 8 hours stale.
The business need was clear: we needed real-time insights. A customer cancels their subscription — we want to trigger a retention workflow now, not tomorrow morning.
This is the story of how I built that pipeline on Google Cloud, what worked, what blew up in production, and what I'd do differently.
Architecture Overview
[App Servers]
|
▼
[Cloud Pub/Sub] ← ingestion layer, durable buffer
|
▼
[Dataflow (Apache Beam)] ← stream processing, transformations
|
├──► [BigQuery] → analytics warehouse
├──► [Cloud Bigtable] → low-latency lookups
└──► [Pub/Sub (out)] → trigger downstream systems
Each layer has a single responsibility. Let me walk through each one.
Step 1: Event Ingestion with Cloud Pub/Sub
Pub/Sub is Google's managed messaging service — think fully-managed Kafka without the operational overhead.
from google.cloud import pubsub_v1
import json
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "user-events")
def publish_event(event: dict):
data = json.dumps(event).encode("utf-8")
future = publisher.publish(
topic_path,
data,
event_type=event.get("type", "unknown"),
source_service=event.get("service", "unknown"),
)
return future.result()
# Example
publish_event({
"user_id": "u_12345",
"type": "subscription_cancelled",
"timestamp": "2024-03-15T10:23:45Z",
"plan": "pro",
"reason": "too_expensive"
})
💡 Lesson #1: Always set message attributes. You'll want to filter downstream without deserializing the full payload.
💡 Lesson #2: Pub/Sub guarantees at-least-once delivery. Your consumers must be idempotent. We got burned when a retry storm caused duplicate records in BigQuery.
Step 2: Stream Processing with Dataflow
Dataflow is Google's managed Apache Beam runner. You write the pipeline once; Dataflow handles autoscaling and fault tolerance.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import json
from datetime import datetime
class ParseAndEnrich(beam.DoFn):
def process(self, message):
try:
event = json.loads(message.data.decode("utf-8"))
event["ingestion_timestamp"] = datetime.utcnow().isoformat()
event["pipeline_version"] = "2.1.0"
if not event.get("user_id"):
beam.metrics.Metrics.counter("pipeline", "invalid_events").inc()
return
yield event
except json.JSONDecodeError:
yield beam.pvalue.TaggedOutput("dead_letter", message)
def run_pipeline():
options = PipelineOptions([
"--project=my-project",
"--runner=DataflowRunner",
"--region=us-central1",
"--streaming",
"--enable_streaming_engine",
])
with beam.Pipeline(options=options) as p:
events = (
p
| "Read Pub/Sub" >> ReadFromPubSub(
subscription="projects/my-project/subscriptions/user-events-sub",
with_attributes=True,
)
| "Parse & Enrich" >> beam.ParDo(ParseAndEnrich()).with_outputs(
"dead_letter", main="valid"
)
)
(
events.valid
| "1-min Windows" >> beam.WindowInto(beam.transforms.window.FixedWindows(60))
| "Write BQ" >> WriteToBigQuery(
table="my-project:analytics.user_events",
schema="AUTO_DETECT",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method=WriteToBigQuery.Method.STREAMING_INSERTS,
)
)
(
events.dead_letter
| "Write DLQ" >> beam.io.WriteToPubSub(
topic="projects/my-project/topics/dead-letter"
)
)
💡 Lesson #3: Always implement a dead-letter queue. In week one, ~0.3% of events had malformed timestamps from a mobile client bug. Without a DLQ, they would have silently failed.
💡 Lesson #4: Windowing is a latency/cost tradeoff. Moving from 5-minute to 1-minute windows raised our Dataflow costs ~40%. Know your SLA before choosing.
Step 3: BigQuery as the Analytics Layer
CREATE TABLE analytics.user_events (
event_id STRING NOT NULL,
user_id STRING NOT NULL,
event_type STRING NOT NULL,
timestamp TIMESTAMP NOT NULL,
ingestion_ts TIMESTAMP NOT NULL,
session_id STRING,
properties JSON
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, user_id;
💡 Lesson #5: Partition + cluster from day one. Our query costs dropped 60% after adding proper clustering.
💡 Lesson #6: Use
insertIdfor deduplication — BigQuery's dedup window is only ~1 minute:
row_to_insert = {
"insertId": event["event_id"],
"json": event
}
Step 4: Observability
class ProcessEvents(beam.DoFn):
def __init__(self):
self.events_processed = beam.metrics.Metrics.counter("pipeline", "events_processed")
self.latency = beam.metrics.Metrics.distribution("pipeline", "event_latency_ms")
def process(self, event):
import time
start = time.time()
# ... processing logic ...
self.latency.update(int((time.time() - start) * 1000))
self.events_processed.inc()
yield event
| Alert | Threshold | Action |
|---|---|---|
| Pub/Sub oldest message age | > 5 minutes | Page on-call |
| Dataflow worker errors | > 1% of events | Slack alert |
| BigQuery streaming errors | Any | PagerDuty |
| Dead-letter queue depth | > 1000 messages | Investigate |
Results After 3 Months in Production
- Latency: 8 hours → under 90 seconds end-to-end
- Reliability: 99.97% of events land in BigQuery successfully
- Cost: ~$340/month for ~50M events/day
- Developer velocity: analysts self-serve on real-time data without waiting for nightly runs
What I'd Do Differently
- Start with a schema registry from day one. Multiple teams publishing inconsistent schemas cost weeks of debugging.
- Use Dataflow Flex Templates earlier. Classic Templates hit limitations fast.
- Set Pub/Sub retention to 7 days minimum. We had it at 1 day in staging; when a bug caused a processing halt, we lost messages we could have replayed.
Resources
- Google Cloud Pub/Sub Documentation
- Apache Beam Python SDK
- BigQuery Streaming Inserts
- Dataflow Pricing Calculator
Have you built something similar? I'd love to hear how you approached deduplication — drop a comment below.
Tags: #googlecloud #dataengineering #gcp #bigquery #dataflow #pubsub #streaming

Top comments (0)