DEV Community

ahmet gedik
ahmet gedik

Posted on

Real-Time Video Metadata Invalidation With Postgres LISTEN and NOTIFY

A viewer hits a watch page on DailyWatch and sees a view count that is six hours stale, a title that was corrected last night, and a thumbnail that points at a video we already pulled for a copyright strike. Every one of those is a caching bug, and every one of them traces back to the same root cause: we had no fast, reliable way to tell the edge that a single video's metadata had changed. We cache aggressively — LiteSpeed page cache in front of PHP 8.4, a SQLite FTS5 index for discovery, and Cloudflare in front of all of it. Aggressive caching is what lets a free video discovery platform serve millions of requests on a small budget. The price is invalidation. When the canonical metadata for one video changes, that change has to ripple outward in seconds, not on the next cron tick.

The naive fixes all fail in instructive ways. Short TTLs throw away the cache hit rate we depend on. A cron job that scans for updated_at > last_run is coarse, polls a hot table constantly, and still leaves a window of staleness equal to its interval. Pushing every edit through an application-level message bus means the bus and the database can disagree the moment a transaction rolls back. What I actually wanted was for the database — the single source of truth for metadata — to be the thing that announces changes, transactionally, the instant a row is committed. That is exactly what Postgres LISTEN/NOTIFY gives you. This post walks through how we wired it up: the trigger that fires on commit, a long-lived Go listener that fans changes out, and the PHP and Python pieces that turn a notification into a surgical cache purge.

Why the source of truth should announce its own changes

Our metadata pipeline is Postgres, even though the read path that serves pages is SQLite FTS5. That split is deliberate. Ingestion workers in Python pull from upstream APIs, normalize, dedupe, and write canonical rows into Postgres. A separate process projects the relevant columns into the per-edge SQLite databases that LiteSpeed reads from. Postgres is where truth lives and where mutations happen, so Postgres is the natural place to emit "this changed" events.

NOTIFY is a Postgres command that sends a payload on a named channel. LISTEN subscribes a connection to that channel. The property that makes it useful for invalidation — and that an external broker cannot give you for free — is that notifications are delivered as part of the transaction. If you NOTIFY inside a transaction and the transaction commits, the notification is delivered. If it rolls back, the notification is discarded. You can never tell a downstream cache to purge a change that never actually landed. That single guarantee eliminates an entire category of phantom-invalidation bugs.

There are real limits worth stating up front, because they shape the design:

  • The payload is capped at 8000 bytes. You send identifiers and a reason code, never the full row.
  • Delivery is at-most-once to currently connected listeners. If your listener is down when the NOTIFY fires, that event is gone. You need a catch-up path.
  • Notifications are not queued durably. This is a signaling mechanism, not Kafka. Treat it as a hint that says "go look," not as the system of record.

Those constraints push you toward a specific shape: send tiny payloads, design the consumer to be idempotent, and keep a reconciliation sweep as a safety net.

The trigger that fires on commit

The cleanest place to emit the event is a row-level trigger on the videos table. It fires for every insert, update, and delete, builds a compact JSON payload, and calls pg_notify. Because the trigger runs inside the same transaction as the write, the notification inherits the transaction's commit/rollback fate automatically.

CREATE OR REPLACE FUNCTION notify_video_change()
RETURNS trigger AS $$
DECLARE
  payload json;
  changed_id text;
BEGIN
  changed_id := COALESCE(NEW.video_id, OLD.video_id);

  -- For updates, only emit when a cache-relevant column actually changed.
  IF (TG_OP = 'UPDATE') THEN
    IF NEW.title IS NOT DISTINCT FROM OLD.title
       AND NEW.description IS NOT DISTINCT FROM OLD.description
       AND NEW.thumbnail_url IS NOT DISTINCT FROM OLD.thumbnail_url
       AND NEW.status IS NOT DISTINCT FROM OLD.status
       AND NEW.category_id IS NOT DISTINCT FROM OLD.category_id THEN
      RETURN NEW;  -- view-count-only update, not worth a purge
    END IF;
  END IF;

  payload := json_build_object(
    'video_id', changed_id,
    'op', TG_OP,
    'category_id', COALESCE(NEW.category_id, OLD.category_id),
    'ts', extract(epoch FROM clock_timestamp())
  );

  PERFORM pg_notify('video_metadata', payload::text);
  RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_video_metadata_change
AFTER INSERT OR UPDATE OR DELETE ON videos
FOR EACH ROW EXECUTE FUNCTION notify_video_change();
Enter fullscreen mode Exit fullscreen mode

The IS NOT DISTINCT FROM guard matters more than it looks. View counts and watch timestamps churn constantly; if every increment triggered an edge purge we would be invalidating the same hot pages thousands of times an hour and defeating the cache entirely. We only signal when a column that actually appears in rendered HTML or the search index changes. View counts get reconciled lazily on a slower path because nobody files a bug report over a count that is two minutes behind.

One caution about firing pg_notify in AFTER triggers under heavy write load: notifications on a given channel are coalesced and ordered, and a flood of distinct payloads can add commit latency. We measured it at our write volume and it is negligible, but if you ingest tens of thousands of rows in a single transaction, batch the notify into one statement-level call instead of one per row.

A long-lived Go listener that fans out

The consumer is a separate daemon. It holds one dedicated Postgres connection open, blocks on incoming notifications, and translates each into concrete invalidation actions. I wrote ours in Go because the listener needs to be a rock-solid long-running process with good connection handling and cheap concurrency for the fan-out. pgx exposes the LISTEN/NOTIFY primitives directly through WaitForNotification.

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/jackc/pgx/v5"
)

type VideoChange struct {
    VideoID    string  `json:"video_id"`
    Op         string  `json:"op"`
    CategoryID *int    `json:"category_id"`
    TS         float64 `json:"ts"`
}

func main() {
    ctx := context.Background()

    for {
        if err := listen(ctx); err != nil {
            log.Printf("listener dropped: %v; reconnecting in 2s", err)
            time.Sleep(2 * time.Second)
            // On reconnect we may have missed events: trigger a catch-up sweep.
            go reconcileSince(ctx, time.Now().Add(-5*time.Minute))
        }
    }
}

func listen(ctx context.Context) error {
    conn, err := pgx.Connect(ctx, "postgres://ingest@localhost/dailywatch")
    if err != nil {
        return err
    }
    defer conn.Close(ctx)

    if _, err := conn.Exec(ctx, "LISTEN video_metadata"); err != nil {
        return err
    }
    log.Println("listening on channel video_metadata")

    for {
        n, err := conn.WaitForNotification(ctx)
        if err != nil {
            return err // surface to the reconnect loop
        }

        var c VideoChange
        if err := json.Unmarshal([]byte(n.Payload), &c); err != nil {
            log.Printf("bad payload %q: %v", n.Payload, err)
            continue
        }

        // Fan out concurrently; each action is idempotent.
        go handleChange(ctx, c)
    }
}

func handleChange(ctx context.Context, c VideoChange) {
    projectToSQLite(c.VideoID, c.Op)        // refresh the read-path FTS5 row
    purgeCloudflare("/watch/" + c.VideoID)  // edge purge for the watch page
    if c.CategoryID != nil {
        purgeLocalPageCache(c.VideoID, *c.CategoryID)
    }
    log.Printf("invalidated %s (%s)", c.VideoID, c.Op)
}
Enter fullscreen mode Exit fullscreen mode

The reconnect loop is the whole point of using a dedicated daemon. WaitForNotification returns an error when the connection drops, we log it, sleep, reconnect, and — critically — kick off a reconciliation sweep for the window we might have missed. Because NOTIFY is at-most-once to live listeners, every reconnect is a potential gap, and the catch-up sweep is what makes the system eventually consistent rather than silently lossy.

Turning a notification into a surgical purge

The handleChange fan-out does three things, and each deserves a note.

  • Project to SQLite FTS5. The read path queries SQLite, so the listener pushes the changed columns into the per-edge database and runs an INSERT OR REPLACE plus an FTS5 index update. This is the step that makes search results and listing pages reflect the edit.
  • Purge the edge. A single targeted Cloudflare cache purge for the exact watch URL, not a zone-wide flush. Zone purges are a blunt instrument that tank your hit rate for minutes afterward.
  • Purge the local page cache. LiteSpeed and our PHP file cache hold rendered HTML for the watch page and any category listing the video appears on, so we delete those specific files.

The local PHP side is where the LiteSpeed and file-cache invalidation actually happens. This runs as a tiny endpoint the Go listener calls, or you can have PHP itself hold the listener connection on smaller deployments. Here is the file-cache purge in PHP 8.4:

<?php
declare(strict_types=1);

final class CacheInvalidator
{
    public function __construct(
        private readonly string $pageCacheDir = '/var/cache/dailywatch/pages',
    ) {}

    public function purgeVideo(string $videoId, int $categoryId): void
    {
        // Watch page and the category listing the video appears on.
        $keys = [
            "watch:{$videoId}",
            "category:{$categoryId}",
            'home:index',
        ];

        foreach ($keys as $key) {
            $path = $this->pageCacheDir . '/' . hash('xxh128', $key) . '.html';
            if (is_file($path)) {
                @unlink($path);
            }
        }

        // Tell LiteSpeed to drop its public cache entry for this tag.
        $this->purgeLiteSpeed("watch_{$videoId}");
    }

    private function purgeLiteSpeed(string $tag): void
    {
        // LiteSpeed honours this response header for tag-based purges.
        if (!headers_sent()) {
            header('X-LiteSpeed-Purge: tag=' . $tag, false);
        }
    }
}

// Invoked by the listener with a small JSON POST.
$body = json_decode(file_get_contents('php://input') ?: '{}', true);
if (isset($body['video_id'], $body['category_id'])) {
    (new CacheInvalidator())->purgeVideo(
        (string) $body['video_id'],
        (int) $body['category_id'],
    );
    http_response_code(204);
}
Enter fullscreen mode Exit fullscreen mode

The X-LiteSpeed-Purge header with a tag is the clean way to evict LiteSpeed's public cache for a specific entity, assuming you tagged the page when you cached it. Combined with deleting the PHP file-cache artifacts and the targeted Cloudflare purge, an edit propagates through all three cache layers within a second or two of the database commit.

The ingestion side and the reconciliation safety net

The writers stay blissfully ignorant of all this. Python ingestion workers just write to Postgres inside a transaction; the trigger handles notification. That separation is the elegance of doing it in the database — no application code has to remember to publish an event, and there is no way to commit a metadata change without the notification going out.

import psycopg

def upsert_video(conn: psycopg.Connection, video: dict) -> None:
    """Writers do nothing special; the AFTER trigger emits the NOTIFY."""
    with conn.transaction():
        conn.execute(
            """
            INSERT INTO videos (video_id, title, description,
                                thumbnail_url, category_id, status, updated_at)
            VALUES (%(video_id)s, %(title)s, %(description)s,
                    %(thumbnail_url)s, %(category_id)s, %(status)s, now())
            ON CONFLICT (video_id) DO UPDATE SET
                title         = EXCLUDED.title,
                description   = EXCLUDED.description,
                thumbnail_url = EXCLUDED.thumbnail_url,
                category_id   = EXCLUDED.category_id,
                status        = EXCLUDED.status,
                updated_at    = now()
            """,
            video,
        )
    # On commit, trg_video_metadata_change fires pg_notify('video_metadata', ...).
Enter fullscreen mode Exit fullscreen mode

The reconciliation sweep is the other half of correctness. Because notifications can be missed during a listener restart or a network blip, a periodic job re-projects everything touched recently. It is cheap, it is idempotent, and it closes the at-most-once gap.

import datetime

def reconcile_since(conn: psycopg.Connection, since: datetime.datetime) -> int:
    rows = conn.execute(
        """
        SELECT video_id, category_id
        FROM videos
        WHERE updated_at >= %s
        ORDER BY updated_at
        """,
        (since,),
    ).fetchall()

    for video_id, category_id in rows:
        project_to_sqlite(video_id)
        purge_caches(video_id, category_id)  # same idempotent purge path
    return len(rows)
Enter fullscreen mode Exit fullscreen mode

We run reconcile_since on a five-minute overlap window so consecutive sweeps always cover any gap, and the Go listener calls the same logic immediately after every reconnect. The result is a system that is fast in the common case — sub-second propagation driven by NOTIFY — and self-healing in the failure case, because the sweep guarantees that anything the live channel dropped still gets picked up.

A few operational lessons from running this in production:

  • Make every consumer action idempotent. Re-projecting a row and re-purging a URL must be safe to do twice, because the live path and the reconciliation path will sometimes both handle the same change.
  • Keep payloads to identifiers. Sending the full row tempts you past the 8000-byte limit and makes the listener trust stale data. Send the id, fetch fresh on the consumer side if you need detail.
  • Don't NOTIFY on view-count churn. Separate the columns that affect rendered output from the columns that are pure analytics, and only signal on the former.
  • One dedicated connection per listener. Don't run LISTEN on a pooled connection that gets handed back; you will miss notifications and not know it.

Conclusion

The shape that worked for us is simple to state: let Postgres announce its own changes through a commit-bound trigger, consume those announcements in one resilient long-lived listener, fan each event out into surgical purges across SQLite FTS5, the PHP file cache, LiteSpeed, and Cloudflare, and back the whole thing with an idempotent reconciliation sweep that catches anything the at-most-once channel drops. LISTEN/NOTIFY is not a durable message queue and you will hurt yourself if you treat it like one. Treated as what it actually is — a transactional, low-latency signal that says "this id changed, go look" — it lets a free discovery platform keep an aggressive multi-layer cache and still reflect a corrected title or a pulled video within seconds. That combination, high hit rates plus fast invalidation, is exactly the thing that used to feel mutually exclusive, and it is the difference between a viewer seeing six-hour-stale metadata and seeing the truth.

Top comments (0)