DEV Community

ahmet gedik
ahmet gedik

Posted on

Concurrent Video Fetching with Go Goroutines Across 8 Global Regions

Concurrent Video Fetching with Go Goroutines Across 8 Global Regions

TrendVidStream fetches trending video data from 8 regions: AE, FI, DK, CZ, BE, CH, US, GB. Go's goroutines and channels make concurrent fetching clean and controllable. Here's the production-ready implementation.

Worker Pool Architecture

package fetcher

import (
    "context"
    "log"
    "sync"
    "time"
)

type FetchJob struct {
    Region string
    APIKey string
}

type FetchResult struct {
    Region string
    Videos []Video
    Error  error
    Took   time.Duration
}

type Pool struct {
    workers int
    jobs    chan FetchJob
    results chan FetchResult
    client  *YTClient
    wg      sync.WaitGroup
}

func NewPool(workers int, client *YTClient) *Pool {
    return &Pool{
        workers: workers,
        jobs:    make(chan FetchJob, 20),
        results: make(chan FetchResult, 20),
        client:  client,
    }
}

func (p *Pool) Start(ctx context.Context) {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(ctx)
    }
    go func() {
        p.wg.Wait()
        close(p.results)
    }()
}

func (p *Pool) worker(ctx context.Context) {
    defer p.wg.Done()
    for {
        select {
        case <-ctx.Done():
            return
        case job, ok := <-p.jobs:
            if !ok {
                return
            }
            start := time.Now()
            videos, err := p.client.FetchTrending(ctx, job.Region, job.APIKey, 50)
            p.results <- FetchResult{
                Region: job.Region, Videos: videos,
                Error: err, Took: time.Since(start),
            }
        }
    }
}

func (p *Pool) Submit(job FetchJob) { p.jobs <- job }
func (p *Pool) Close()             { close(p.jobs) }
func (p *Pool) Results() <-chan FetchResult { return p.results }
Enter fullscreen mode Exit fullscreen mode

YouTube Client for TrendVidStream Regions

package fetcher

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "net/url"
    "time"

    "golang.org/x/time/rate"
)

// Arabic (AE) titles require no special HTTP handling — YouTube returns UTF-8.
// The direction (RTL) is a display concern, not a fetch concern.
type YTClient struct {
    http    *http.Client
    limiter *rate.Limiter
}

func NewYTClient() *YTClient {
    return &YTClient{
        http: &http.Client{
            Timeout: 30 * time.Second,
            Transport: &http.Transport{
                MaxIdleConnsPerHost: 10,
            },
        },
        limiter: rate.NewLimiter(rate.Limit(4), 8), // 4 req/s, burst 8
    }
}

func (c *YTClient) FetchTrending(ctx context.Context, region, apiKey string, n int) ([]Video, error) {
    if err := c.limiter.Wait(ctx); err != nil {
        return nil, fmt.Errorf("rate limit: %w", err)
    }

    q := url.Values{
        "part":       {"snippet,statistics"},
        "chart":      {"mostPopular"},
        "regionCode": {region},
        "maxResults": {fmt.Sprintf("%d", n)},
        "key":        {apiKey},
    }

    req, _ := http.NewRequestWithContext(
        ctx, http.MethodGet,
        "https://www.googleapis.com/youtube/v3/videos?"+q.Encode(),
        nil,
    )

    resp, err := c.http.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    switch resp.StatusCode {
    case http.StatusOK:
        // good
    case http.StatusTooManyRequests:
        // Back off and retry once
        select {
        case <-time.After(10 * time.Second):
        case <-ctx.Done():
            return nil, ctx.Err()
        }
        return c.FetchTrending(ctx, region, apiKey, n)
    default:
        return nil, fmt.Errorf("youtube API HTTP %d for region %s", resp.StatusCode, region)
    }

    var payload struct {
        Items []struct {
            ID      string `json:"id"`
            Snippet struct {
                Title             string `json:"title"`  // UTF-8, handles Arabic natively
                ChannelTitle      string `json:"channelTitle"`
                DefaultAudioLang  string `json:"defaultAudioLanguage"`
            } `json:"snippet"`
            Statistics struct {
                ViewCount string `json:"viewCount"`
            } `json:"statistics"`
        } `json:"items"`
    }
    if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
        return nil, err
    }

    vids := make([]Video, 0, len(payload.Items))
    for _, item := range payload.Items {
        vids = append(vids, Video{
            VideoID:      item.ID,
            Title:        item.Snippet.Title,
            ChannelTitle: item.Snippet.ChannelTitle,
            Language:     item.Snippet.DefaultAudioLang,
            Region:       region,
        })
    }
    return vids, nil
}
Enter fullscreen mode Exit fullscreen mode

Running the Pipeline

var tvStreamRegions = []string{"AE", "FI", "DK", "CZ", "BE", "CH", "US", "GB"}

func RunPipeline(ctx context.Context, apiKey string) ([]Video, error) {
    client := NewYTClient()
    pool := NewPool(3, client)  // 3 workers for 8 regions

    ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
    defer cancel()

    pool.Start(ctx)

    go func() {
        for _, r := range tvStreamRegions {
            pool.Submit(FetchJob{Region: r, APIKey: apiKey})
        }
        pool.Close()
    }()

    var all []Video
    var errCount int
    for result := range pool.Results() {
        if result.Error != nil {
            log.Printf("[%s] error: %v (took %v)", result.Region, result.Error, result.Took)
            errCount++
            continue
        }
        log.Printf("[%s] %d videos in %v", result.Region, len(result.Videos), result.Took)
        all = append(all, result.Videos...)
    }

    if len(all) == 0 {
        return nil, fmt.Errorf("all regions failed (%d errors)", errCount)
    }
    return all, nil
}
Enter fullscreen mode Exit fullscreen mode

Performance for TrendVidStream

Workers Time (8 regions) Notes
1 ~16s Sequential
3 ~6s Good balance
8 ~3s Full parallel

Three workers handles TrendVidStream's 8 regions in about 6 seconds — fast enough for cron-based fetching while staying well under YouTube's rate limits. The rate limiter (4 req/s) ensures we never burst past API limits regardless of the worker count.


This article is part of the Building TrendVidStream series. Check out TrendVidStream to see these techniques in action.

Top comments (0)