DEV Community

ahmet gedik
ahmet gedik

Posted on

Creating an Automated Content Pipeline for Video Sites

A video platform needs fresh content constantly. Here's how the automated content pipeline works at TrendVidStream, fetching, processing, and serving trending videos from 8 global regions.

Pipeline Overview

┌───────────┐    ┌────────────┐    ┌──────────┐    ┌─────────┐    ┌───────────┐
│  Fetch    │──▶│  Normalize   │──▶│  Store     │──▶│  Index   │──▶│  Serve     │
│  (Cron)   │    │  (Process)   │    │  (SQLite) │    │  (FTS5)  │    │  (Cache)   │
└───────────┘    └────────────┘    └──────────┘    └─────────┘    └───────────┘
Enter fullscreen mode Exit fullscreen mode

Step 1: Fetch (Cron-Triggered)

Each server runs a cron job at its configured interval:

<?php
// fetch_videos.php

require __DIR__ . '/bootstrap.php';

$pipeline = new ContentPipeline(
    db: Database::connect(DB_PATH),
    youtube: new YouTubeClient(API_KEY),
    cache: new Cache(CACHE_PATH),
    regions: FETCH_REGIONS
);

$pipeline->run();
Enter fullscreen mode Exit fullscreen mode
<?php

class ContentPipeline
{
    private PDO $db;
    private YouTubeClient $youtube;
    private Cache $cache;
    private array $regions;

    public function run(): void
    {
        $startTime = microtime(true);
        $stats = ['fetched' => 0, 'updated' => 0, 'errors' => 0];

        echo "[" . date('Y-m-d H:i:s') . "] Pipeline started\n";

        // Step 1: Fetch popular videos (global)
        $this->fetchPopular($stats);

        // Step 2: Update category mappings
        $this->updateCategories();

        // Step 3: Fetch trending per region
        foreach ($this->regions as $region) {
            $this->fetchRegion($region, $stats);
            usleep(500000); // Rate limit
        }

        // Step 4: Refresh stale entries
        $this->refreshStale($stats);

        // Step 5: Cleanup old data
        $this->cleanup();

        // Step 6: Rebuild search index
        $this->rebuildIndex();

        // Step 7: Invalidate caches
        $this->invalidateCaches();

        $elapsed = round(microtime(true) - $startTime, 2);
        echo "Pipeline complete in {$elapsed}s: " . json_encode($stats) . "\n";
    }

    private function fetchRegion(string $region, array &$stats): void
    {
        try {
            $response = $this->youtube->getTrending($region);
            $items = $response['items'] ?? [];

            $this->db->beginTransaction();
            foreach ($items as $item) {
                $video = $this->normalize($item, $region);
                $this->store($video);
                $stats['fetched']++;
            }
            $this->db->commit();

            echo "  [{$region}] Fetched " . count($items) . " videos\n";
        } catch (\Exception $e) {
            if ($this->db->inTransaction()) {
                $this->db->rollBack();
            }
            $stats['errors']++;
            echo "  [{$region}] Error: {$e->getMessage()}\n";
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Normalize

<?php

class VideoNormalizer
{
    public function normalize(array $apiItem, string $region): array
    {
        $snippet = $apiItem['snippet'];
        $stats = $apiItem['statistics'] ?? [];
        $contentDetails = $apiItem['contentDetails'] ?? [];

        return [
            'id' => $apiItem['id'],
            'region' => $region,
            'title' => $snippet['title'],
            'description' => mb_substr($snippet['description'] ?? '', 0, 500),
            'thumbnail' => $this->getBestThumbnail($snippet['thumbnails']),
            'channel_id' => $snippet['channelId'],
            'channel_title' => $snippet['channelTitle'],
            'category_id' => (int)($snippet['categoryId'] ?? 0),
            'view_count' => (int)($stats['viewCount'] ?? 0),
            'duration' => $this->parseDuration($contentDetails['duration'] ?? ''),
            'published_at' => $snippet['publishedAt'],
            'fetched_at' => date('Y-m-d H:i:s'),
        ];
    }

    private function getBestThumbnail(array $thumbs): string
    {
        foreach (['maxres', 'standard', 'high', 'medium', 'default'] as $size) {
            if (isset($thumbs[$size])) {
                return $thumbs[$size]['url'];
            }
        }
        return '';
    }

    private function parseDuration(string $iso8601): int
    {
        // PT4M13S -> 253 seconds
        preg_match('/PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?/', $iso8601, $m);
        return (($m[1] ?? 0) * 3600) + (($m[2] ?? 0) * 60) + ($m[3] ?? 0);
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Store and Deduplicate

<?php

class VideoStore
{
    public function upsert(PDO $db, array $video): void
    {
        $stmt = $db->prepare('
            INSERT INTO videos (id, region, title, description, thumbnail,
                channel_id, channel_title, category_id, view_count,
                duration, published_at, fetched_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(id, region) DO UPDATE SET
                title = excluded.title,
                thumbnail = excluded.thumbnail,
                view_count = excluded.view_count,
                fetched_at = excluded.fetched_at
        ');
        $stmt->execute(array_values($video));
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Index

<?php

class SearchIndexer
{
    public function rebuild(PDO $db): void
    {
        $db->exec("INSERT INTO videos_fts(videos_fts) VALUES('rebuild')");
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: Cache Invalidation

<?php

class CacheInvalidator
{
    public function afterFetch(Cache $cache): void
    {
        // Clear page caches so new content is visible
        $cache->delete('home:index');
        $cache->deletePattern('category:*');

        // Clear LiteSpeed page cache
        $lscachePath = __DIR__ . '/../lscache';
        if (is_dir($lscachePath)) {
            $this->removeDirectory($lscachePath);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Cron Configuration

# Server 3 (TrendVidStream): every 4 hours
28 */4 * * * php /path/to/fetch_videos.php >> /var/log/fetch.log 2>&1
Enter fullscreen mode Exit fullscreen mode

This pipeline keeps TrendVidStream continuously updated with fresh trending content from all 8 regions. The entire fetch-normalize-store-index-serve cycle completes in under 2 minutes.

The key design principles: batch database operations, graceful error handling per region (one region's failure does not affect others), and automatic cache invalidation.

Top comments (0)