DEV Community

ahmet gedik
ahmet gedik

Posted on

Postgres LISTEN/NOTIFY for Real-Time Video Metadata Invalidation Across Regions

At TrendVidStream we discover streaming-platform titles across eight regions (US, GB, DE, JP, KR, BR, IN, AU) and surface them through a PHP 8.4 frontend backed by SQLite FTS5 read replicas. The discovery cron runs on a staggered schedule, FTP-deploys SQLite snapshots to edge nodes, and the public site reads from a local copy. That works beautifully for catalog data that changes hourly. It falls apart for the metadata that changes every few seconds: a title flipping from available to expiring_soon, a regional license being revoked at 03:14 UTC, a thumbnail being re-encoded, or a popularity score recalculated after a traffic spike.

The naive fix is to poll. Every edge node hits a central endpoint every N seconds, asks "what changed?", and patches its local cache. We ran that for six months. It cost us roughly 4.2 million wasted HTTP requests per day across regions, and the staleness window was still 15-30 seconds because the poll interval couldn't go lower without melting the origin. The right tool turned out to be something Postgres has shipped since version 9.0 and almost nobody reaches for: LISTEN / NOTIFY. This article is the production write-up of how we wired it into a PHP + Python stack that was never designed for push semantics, what broke, and what the final architecture looks like.

Why LISTEN/NOTIFY and Not Redis Pub/Sub or Kafka

We already had Postgres. That's the honest answer. But there are real technical reasons it beat the alternatives for this specific workload:

  • Transactional coupling. NOTIFY is sent at COMMIT time. If the transaction rolls back, the notification never fires. Redis pub/sub will happily publish a message about a row that never made it to disk. With Kafka you end up implementing the outbox pattern just to get this property back.
  • No second system to operate. Our SRE rotation is two people. Every additional moving part is an additional 3am page.
  • Payload-with-channel routing. A single channel like metadata_invalidation can carry a JSON payload describing exactly which video IDs and which regions are affected, so subscribers filter cheaply.
  • Zero retention semantics match the problem. We don't need replay. If a notification is missed, the next cron pass will resync anyway. Cache invalidation is idempotent.

The tradeoffs are real and you need to know them before you ship this:

  • The payload limit is 8000 bytes by default (NAMEDATALEN-derived). Don't put rows in there. Put IDs.
  • If no listener is connected, the notification is dropped. There is no queue.
  • Notifications are coalesced inside a transaction — duplicate NOTIFY calls with identical payloads in the same tx fire once.
  • The notification queue is global to the cluster and has a hard 8GB cap. A stuck listener will eventually block all COMMITs cluster-wide. This is the one that bites people.

The Schema and the Trigger

Metadata lives in a video_metadata table. The columns that matter for invalidation are availability_status, regional_licenses (jsonb), popularity_score, and updated_at. We don't want to fire an invalidation on every UPDATE — the popularity recalculation alone touches 200k rows every 10 minutes and most of those deltas are noise. The trigger filters first, then notifies.

CREATE OR REPLACE FUNCTION notify_metadata_change()
RETURNS trigger AS $$
DECLARE
  payload jsonb;
  affected_regions text[];
BEGIN
  -- Skip if nothing meaningful changed
  IF NEW.availability_status IS NOT DISTINCT FROM OLD.availability_status
     AND NEW.regional_licenses IS NOT DISTINCT FROM OLD.regional_licenses
     AND abs(NEW.popularity_score - OLD.popularity_score) < 0.15 THEN
    RETURN NEW;
  END IF;

  -- Compute which regions actually care about this change
  SELECT array_agg(DISTINCT region) INTO affected_regions
  FROM jsonb_object_keys(NEW.regional_licenses) AS region
  WHERE NEW.regional_licenses -> region IS DISTINCT FROM
        COALESCE(OLD.regional_licenses, '{}'::jsonb) -> region;

  IF affected_regions IS NULL THEN
    affected_regions := ARRAY['*'];
  END IF;

  payload := jsonb_build_object(
    'v', NEW.video_id,
    's', NEW.availability_status,
    'r', affected_regions,
    't', extract(epoch from now())::int
  );

  -- 8000 byte ceiling — if we ever overflow, fall back to a wildcard
  IF octet_length(payload::text) > 7500 THEN
    payload := jsonb_build_object('v', NEW.video_id, 'r', ARRAY['*'], 'overflow', true);
  END IF;

  PERFORM pg_notify('metadata_invalidation', payload::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_notify_metadata_change
AFTER UPDATE ON video_metadata
FOR EACH ROW EXECUTE FUNCTION notify_metadata_change();
Enter fullscreen mode Exit fullscreen mode

A few non-obvious things worth flagging:

  • IS NOT DISTINCT FROM instead of = because regional_licenses is nullable and NULL = NULL is NULL, not TRUE.
  • The popularity score has a 0.15 deadband. Below that, the fluctuation is statistical noise from the ranking job and not worth invalidating caches over. This single line dropped notification volume by 73%.
  • Short JSON keys (v, s, r, t). When you're capped at 8000 bytes and burst-notifying, this matters.
  • The overflow fallback. If somehow the payload exceeds the budget, we degrade to a wildcard invalidation rather than dropping the event. Better to over-invalidate than miss.

The Listener: A Python Daemon That Fans Out to Edge Nodes

We wrote the listener in Python because psycopg 3.2 has a clean async LISTEN API and we wanted to use a single event loop to fan out HTTP webhooks to the edge nodes. One process, one Postgres connection, N outbound HTTP/2 streams.

import asyncio
import json
import logging
import os
import signal
from collections import defaultdict
from time import monotonic

import httpx
import psycopg

LOG = logging.getLogger("invalidator")
DSN = os.environ["POSTGRES_DSN"]
EDGE_NODES = os.environ["EDGE_NODES"].split(",")  # https://us-east.tvs.internal,...
FLUSH_INTERVAL = 0.250  # 250ms coalescing window
MAX_BATCH = 500


class Coalescer:
    """Buffers invalidations for FLUSH_INTERVAL, then ships per-region batches."""

    def __init__(self):
        self.buf: dict[str, set[str]] = defaultdict(set)
        self.last_flush = monotonic()

    def add(self, payload: dict) -> None:
        video_id = payload["v"]
        for region in payload.get("r", ["*"]):
            self.buf[region].add(video_id)

    def should_flush(self) -> bool:
        if not self.buf:
            return False
        if monotonic() - self.last_flush >= FLUSH_INTERVAL:
            return True
        return sum(len(s) for s in self.buf.values()) >= MAX_BATCH

    def drain(self) -> dict[str, list[str]]:
        out = {r: sorted(ids) for r, ids in self.buf.items()}
        self.buf.clear()
        self.last_flush = monotonic()
        return out


async def ship(client: httpx.AsyncClient, region: str, ids: list[str]) -> None:
    targets = (
        EDGE_NODES if region == "*"
        else [n for n in EDGE_NODES if f"//{region.lower()}-" in n]
    )
    body = {"region": region, "video_ids": ids, "sent_at": monotonic()}
    for url in targets:
        try:
            r = await client.post(f"{url}/invalidate", json=body, timeout=2.0)
            r.raise_for_status()
        except Exception as exc:
            LOG.warning("edge %s failed: %s", url, exc)


async def flusher(coalescer: Coalescer, stop: asyncio.Event) -> None:
    async with httpx.AsyncClient(http2=True) as client:
        while not stop.is_set():
            await asyncio.sleep(0.05)
            if coalescer.should_flush():
                batches = coalescer.drain()
                await asyncio.gather(*(ship(client, r, ids) for r, ids in batches.items()))


async def listen(coalescer: Coalescer, stop: asyncio.Event) -> None:
    async with await psycopg.AsyncConnection.connect(DSN, autocommit=True) as conn:
        await conn.execute("LISTEN metadata_invalidation")
        LOG.info("listening on metadata_invalidation")
        gen = conn.notifies()
        async for n in gen:
            if stop.is_set():
                break
            try:
                coalescer.add(json.loads(n.payload))
            except json.JSONDecodeError:
                LOG.exception("bad payload: %r", n.payload)


async def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
    coalescer = Coalescer()
    stop = asyncio.Event()
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, stop.set)
    await asyncio.gather(listen(coalescer, stop), flusher(coalescer, stop))


if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

The coalescer is the part that earns its keep. Without it, a popularity_score recalculation burst would send tens of thousands of individual HTTP requests to each edge node in a few seconds. With a 250ms window, those collapse into one batch per region per quarter-second. We picked 250ms after measuring user-perceived latency: anything under 400ms reads as "instant" for a content-change indicator on the page.

Note also that the psycopg async API delivers notifications via conn.notifies() as an async iterator — you do not poll with a sleep. The connection is socket-readable when the server pushes, and the driver wakes the coroutine. Zero CPU at idle.

The PHP Edge: Receiving the Webhook and Patching the SQLite Read Cache

Each edge node runs PHP 8.4 behind nginx with a /invalidate route hit only by the listener daemon. The endpoint is gated by a shared secret in a header and a source-IP allowlist. The handler updates the local SQLite FTS5 mirror in place rather than triggering a full snapshot redeploy.

<?php
declare(strict_types=1);

final class InvalidateController
{
    public function __construct(
        private readonly \PDO $db,
        private readonly string $sharedSecret,
        private readonly \Psr\Log\LoggerInterface $log,
    ) {}

    public function handle(array $server, string $rawBody): array
    {
        if (!hash_equals($this->sharedSecret, $server['HTTP_X_INVALIDATE_TOKEN'] ?? '')) {
            return ['status' => 401, 'body' => ['error' => 'unauthorized']];
        }

        $payload = json_decode($rawBody, true, flags: JSON_THROW_ON_ERROR);
        $region = (string)($payload['region'] ?? '*');
        $ids = array_values(array_filter(
            $payload['video_ids'] ?? [],
            static fn($v) => is_string($v) && preg_match('/^[A-Za-z0-9_-]{6,32}$/', $v),
        ));

        if ($ids === []) {
            return ['status' => 204, 'body' => null];
        }

        $localRegion = getenv('EDGE_REGION') ?: 'us';
        if ($region !== '*' && strcasecmp($region, $localRegion) !== 0) {
            return ['status' => 204, 'body' => null];
        }

        $start = hrtime(true);
        $this->db->beginTransaction();
        try {
            $placeholders = implode(',', array_fill(0, count($ids), '?'));
            $stmt = $this->db->prepare(
                "UPDATE video_cache SET dirty = 1, dirtied_at = strftime('%s','now') "
                . "WHERE video_id IN ($placeholders)"
            );
            $stmt->execute($ids);
            $touched = $stmt->rowCount();

            // Drop the FTS rows so the next read re-pulls from origin
            $fts = $this->db->prepare(
                "DELETE FROM video_fts WHERE video_id IN ($placeholders)"
            );
            $fts->execute($ids);

            $this->db->commit();
        } catch (\Throwable $e) {
            $this->db->rollBack();
            $this->log->error('invalidate failed', ['err' => $e->getMessage()]);
            return ['status' => 500, 'body' => ['error' => 'db_error']];
        }

        $this->purgePageCache($ids);

        $elapsed_us = (int)((hrtime(true) - $start) / 1000);
        $this->log->info('invalidated', [
            'region' => $region,
            'count' => count($ids),
            'touched' => $touched,
            'us' => $elapsed_us,
        ]);

        return ['status' => 200, 'body' => ['ok' => true, 'touched' => $touched]];
    }

    private function purgePageCache(array $ids): void
    {
        $cacheDir = '/var/www/data/pagecache';
        foreach ($ids as $id) {
            $file = $cacheDir . '/watch_' . substr($id, 0, 2) . '/' . $id . '.html';
            if (is_file($file)) {
                @unlink($file);
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The regex on video_id is not paranoia, it's necessary. The endpoint is internal but the input is still untrusted from a SQL injection standpoint if you ever forget to use prepared statements. Validate at the boundary, every time.

The FTS5 rows are deleted rather than updated. Re-inserting the FTS row needs the new title and description, which we don't have in the payload. The next user request rehydrates from the origin and re-indexes. This is the SQLite analog of a cache-aside write-through pattern.

Operational Lessons That Cost Us a Weekend

The Stuck-Listener Problem

Week two of production, our primary started rejecting COMMIT at 04:00 UTC. The cause: a listener daemon had been OOM-killed but its connection was held by a misbehaving connection pooler that didn't close the socket. Postgres kept queuing notifications for that orphaned listener. The queue grew to 7.8 GB before we noticed. Once it hits 8 GB, every transaction that calls NOTIFY will block until space frees up.

The fix was two layers:

  • A monitoring query that runs every minute: SELECT pg_notification_queue_usage(). Alert at 0.3, page at 0.6.
  • A statement-timeout-equivalent on listeners: any backend that hasn't called a function in 5 minutes gets pg_terminate_backend()-ed by a janitor cron. Real listeners send a no-op SELECT 1 every 30 seconds anyway, so they never trip it.

The Logical Replication Interaction

We added a read replica using logical replication. Notifications do not cross logical replication. They are connection-scoped, server-local. If you want your replica to send notifications you either need the trigger on the replica too (and accept double-firing if both sides do it) or you LISTEN on the primary only. We LISTEN on the primary. The replica exists for read scale, not for fanout.

Connection Poolers

If you put PgBouncer in transaction-pooling mode in front of your listener, LISTEN will silently break. The session-level state is the whole point of LISTEN, and transaction pooling throws away session state. Either use session pooling for the listener connection or bypass the pooler entirely. We give the listener a direct connection on a dedicated pool with max_connections=4.

Multi-Region Clock Skew

The t field in our payload is server time on the Postgres primary. We initially used it to detect stale invalidations ("if t is more than 60s ago, ignore"). That broke catastrophically when one of our edge nodes had NTP drift of 90 seconds and started silently dropping every legitimate invalidation. Now we don't trust t for correctness, only for logging. The listener's coalescer timestamp is the source of truth for ordering.

What This Replaced and What the Numbers Look Like

Before: a 30-second poll loop on each edge node, hitting a /changes?since=T endpoint that scanned an updated_at index. Origin saw ~140 requests/second of pure polling traffic across regions. Median staleness was 15 seconds, p99 was 31 seconds.

After: zero polling traffic. Origin sees an outbound NOTIFY stream that averages 6 events/second with bursts to 400/second during popularity recalcs. Edge nodes receive coalesced batches every 250ms when there's anything to send and nothing otherwise. Median staleness is 380 milliseconds, p99 is 1.1 seconds (almost entirely the 250ms coalescing window plus HTTP/2 RTT to the furthest region).

The operational cost dropped too. The listener daemon is a single Python process using 38 MB RSS. The Postgres NOTIFY queue runs at <0.001 utilization in steady state. We removed a Redis instance that we'd briefly considered for the same job.

When You Should Not Use This

LISTEN/NOTIFY is right when invalidations are bursty, payloads are small, you already operate Postgres, and missing an occasional event is recoverable. It is wrong when:

  • You need durable delivery with replay. Use Kafka or the outbox pattern.
  • You need cross-region active-active writes. Notifications don't replicate.
  • Your payloads are large (>5 KB realistically). Put a pointer in the notification and have subscribers fetch the body.
  • You have hundreds of distinct channels with hundreds of listeners each. The dispatch is O(listeners * notifications) and you will find the ceiling.

Conclusion

The lesson from running this in production for nine months is that Postgres LISTEN/NOTIFY is a sharper tool than its reputation suggests, but it punishes operational sloppiness with a global blast radius. The combination of a filtering trigger, a coalescing async listener, and HTTP fanout to edge nodes that own their own SQLite mirrors turned a 30-second staleness window into a sub-second one without adding a single new piece of infrastructure to our runbook. If you already have Postgres and your problem is invalidation latency rather than durable messaging, try this before you reach for Kafka. Just monitor the notification queue, give the listener a session-pooled connection, and never let NOTIFY payloads grow unbounded.

Top comments (0)