A video platform needs fresh content constantly. Here's how the automated content pipeline works at TrendVidStream, fetching, processing, and serving trending videos from 8 global regions.
Pipeline Overview
┌───────────┐ ┌────────────┐ ┌──────────┐ ┌─────────┐ ┌───────────┐
│ Fetch │──▶│ Normalize │──▶│ Store │──▶│ Index │──▶│ Serve │
│ (Cron) │ │ (Process) │ │ (SQLite) │ │ (FTS5) │ │ (Cache) │
└───────────┘ └────────────┘ └──────────┘ └─────────┘ └───────────┘
Step 1: Fetch (Cron-Triggered)
Each server runs a cron job at its configured interval:
<?php
// fetch_videos.php
require __DIR__ . '/bootstrap.php';
$pipeline = new ContentPipeline(
db: Database::connect(DB_PATH),
youtube: new YouTubeClient(API_KEY),
cache: new Cache(CACHE_PATH),
regions: FETCH_REGIONS
);
$pipeline->run();
<?php
class ContentPipeline
{
private PDO $db;
private YouTubeClient $youtube;
private Cache $cache;
private array $regions;
public function run(): void
{
$startTime = microtime(true);
$stats = ['fetched' => 0, 'updated' => 0, 'errors' => 0];
echo "[" . date('Y-m-d H:i:s') . "] Pipeline started\n";
// Step 1: Fetch popular videos (global)
$this->fetchPopular($stats);
// Step 2: Update category mappings
$this->updateCategories();
// Step 3: Fetch trending per region
foreach ($this->regions as $region) {
$this->fetchRegion($region, $stats);
usleep(500000); // Rate limit
}
// Step 4: Refresh stale entries
$this->refreshStale($stats);
// Step 5: Cleanup old data
$this->cleanup();
// Step 6: Rebuild search index
$this->rebuildIndex();
// Step 7: Invalidate caches
$this->invalidateCaches();
$elapsed = round(microtime(true) - $startTime, 2);
echo "Pipeline complete in {$elapsed}s: " . json_encode($stats) . "\n";
}
private function fetchRegion(string $region, array &$stats): void
{
try {
$response = $this->youtube->getTrending($region);
$items = $response['items'] ?? [];
$this->db->beginTransaction();
foreach ($items as $item) {
$video = $this->normalize($item, $region);
$this->store($video);
$stats['fetched']++;
}
$this->db->commit();
echo " [{$region}] Fetched " . count($items) . " videos\n";
} catch (\Exception $e) {
if ($this->db->inTransaction()) {
$this->db->rollBack();
}
$stats['errors']++;
echo " [{$region}] Error: {$e->getMessage()}\n";
}
}
}
Step 2: Normalize
<?php
class VideoNormalizer
{
public function normalize(array $apiItem, string $region): array
{
$snippet = $apiItem['snippet'];
$stats = $apiItem['statistics'] ?? [];
$contentDetails = $apiItem['contentDetails'] ?? [];
return [
'id' => $apiItem['id'],
'region' => $region,
'title' => $snippet['title'],
'description' => mb_substr($snippet['description'] ?? '', 0, 500),
'thumbnail' => $this->getBestThumbnail($snippet['thumbnails']),
'channel_id' => $snippet['channelId'],
'channel_title' => $snippet['channelTitle'],
'category_id' => (int)($snippet['categoryId'] ?? 0),
'view_count' => (int)($stats['viewCount'] ?? 0),
'duration' => $this->parseDuration($contentDetails['duration'] ?? ''),
'published_at' => $snippet['publishedAt'],
'fetched_at' => date('Y-m-d H:i:s'),
];
}
private function getBestThumbnail(array $thumbs): string
{
foreach (['maxres', 'standard', 'high', 'medium', 'default'] as $size) {
if (isset($thumbs[$size])) {
return $thumbs[$size]['url'];
}
}
return '';
}
private function parseDuration(string $iso8601): int
{
// PT4M13S -> 253 seconds
preg_match('/PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?/', $iso8601, $m);
return (($m[1] ?? 0) * 3600) + (($m[2] ?? 0) * 60) + ($m[3] ?? 0);
}
}
Step 3: Store and Deduplicate
<?php
class VideoStore
{
public function upsert(PDO $db, array $video): void
{
$stmt = $db->prepare('
INSERT INTO videos (id, region, title, description, thumbnail,
channel_id, channel_title, category_id, view_count,
duration, published_at, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id, region) DO UPDATE SET
title = excluded.title,
thumbnail = excluded.thumbnail,
view_count = excluded.view_count,
fetched_at = excluded.fetched_at
');
$stmt->execute(array_values($video));
}
}
Step 4: Index
<?php
class SearchIndexer
{
public function rebuild(PDO $db): void
{
$db->exec("INSERT INTO videos_fts(videos_fts) VALUES('rebuild')");
}
}
Step 5: Cache Invalidation
<?php
class CacheInvalidator
{
public function afterFetch(Cache $cache): void
{
// Clear page caches so new content is visible
$cache->delete('home:index');
$cache->deletePattern('category:*');
// Clear LiteSpeed page cache
$lscachePath = __DIR__ . '/../lscache';
if (is_dir($lscachePath)) {
$this->removeDirectory($lscachePath);
}
}
}
Cron Configuration
# Server 3 (TrendVidStream): every 4 hours
28 */4 * * * php /path/to/fetch_videos.php >> /var/log/fetch.log 2>&1
This pipeline keeps TrendVidStream continuously updated with fresh trending content from all 8 regions. The entire fetch-normalize-store-index-serve cycle completes in under 2 minutes.
The key design principles: batch database operations, graceful error handling per region (one region's failure does not affect others), and automatic cache invalidation.
Top comments (0)