DEV Community

ahmet gedik
ahmet gedik

Posted on

Concurrent Video Processing with Go Goroutines and Channels

Concurrent Video Processing with Go Goroutines and Channels

Go's goroutines and channels are purpose-built for concurrent I/O operations. Here's how I'd implement the video metadata fetcher for TopVideoHub in Go — processing 9 Asia-Pacific regions in parallel with controlled concurrency.

Why the Worker Pool Pattern

For TopVideoHub, launching 9 goroutines (one per region) is perfectly fine. But a worker pool teaches a more general pattern that scales to hundreds of regions or other batch tasks:

package fetcher

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

type FetchJob struct {
    Region string
    APIKey string
}

type FetchResult struct {
    Region string
    Videos []Video
    Error  error
    Took   time.Duration
}

// Pool runs fetch jobs with limited concurrency.
type Pool struct {
    workers int
    jobs    chan FetchJob
    Results chan FetchResult
    client  *YouTubeClient
    wg      sync.WaitGroup
}

func NewPool(workers int, client *YouTubeClient) *Pool {
    return &Pool{
        workers: workers,
        jobs:    make(chan FetchJob, 50),
        Results: make(chan FetchResult, 50),
        client:  client,
    }
}

func (p *Pool) Start(ctx context.Context) {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(ctx, i)
    }
    // Close Results when all workers finish
    go func() {
        p.wg.Wait()
        close(p.Results)
    }()
}

func (p *Pool) worker(ctx context.Context, id int) {
    defer p.wg.Done()
    for {
        select {
        case <-ctx.Done():
            log.Printf("worker %d: context cancelled", id)
            return
        case job, ok := <-p.jobs:
            if !ok {
                return // Channel closed, no more jobs
            }
            start := time.Now()
            videos, err := p.client.FetchTrending(ctx, job.Region, job.APIKey, 50)
            p.Results <- FetchResult{
                Region: job.Region,
                Videos: videos,
                Error:  err,
                Took:   time.Since(start),
            }
        }
    }
}

// Submit adds a job to the queue (blocks if queue is full).
func (p *Pool) Submit(job FetchJob) { p.jobs <- job }

// Close signals no more jobs and waits for workers to drain.
func (p *Pool) Close() { close(p.jobs) }
Enter fullscreen mode Exit fullscreen mode

YouTube Client with Rate Limiting

package fetcher

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "net/url"
    "time"

    "golang.org/x/time/rate"
)

type YouTubeClient struct {
    httpClient  *http.Client
    rateLimiter *rate.Limiter
}

func NewYouTubeClient() *YouTubeClient {
    return &YouTubeClient{
        httpClient: &http.Client{
            Timeout: 30 * time.Second,
            Transport: &http.Transport{
                MaxIdleConnsPerHost: 10,
                IdleConnTimeout:     90 * time.Second,
            },
        },
        // 5 requests/second max, burst of 10
        rateLimiter: rate.NewLimiter(rate.Limit(5), 10),
    }
}

func (c *YouTubeClient) FetchTrending(ctx context.Context, region, apiKey string, maxResults int) ([]Video, error) {
    // Block until rate limiter allows this request
    if err := c.rateLimiter.Wait(ctx); err != nil {
        return nil, fmt.Errorf("rate limit wait: %w", err)
    }

    params := url.Values{
        "part":       {"snippet,statistics"},
        "chart":      {"mostPopular"},
        "regionCode": {region},
        "maxResults": {fmt.Sprintf("%d", maxResults)},
        "key":        {apiKey},
    }

    reqURL := "https://www.googleapis.com/youtube/v3/videos?" + params.Encode()
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
    if err != nil {
        return nil, err
    }

    resp, err := c.httpClient.Do(req)
    if err != nil {
        return nil, fmt.Errorf("http: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode == 429 {
        // Back off and signal retry
        select {
        case <-time.After(5 * time.Second):
        case <-ctx.Done():
            return nil, ctx.Err()
        }
        return c.FetchTrending(ctx, region, apiKey, maxResults) // retry
    }
    if resp.StatusCode != 200 {
        return nil, fmt.Errorf("youtube API: HTTP %d", resp.StatusCode)
    }

    var apiResp struct {
        Items []struct {
            ID      string `json:"id"`
            Snippet struct {
                Title        string `json:"title"`
                ChannelTitle string `json:"channelTitle"`
            } `json:"snippet"`
            Statistics struct {
                ViewCount string `json:"viewCount"`
            } `json:"statistics"`
        } `json:"items"`
    }
    if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil {
        return nil, err
    }

    // Go's json.Unmarshal handles UTF-8 titles natively
    videos := make([]Video, 0, len(apiResp.Items))
    for _, item := range apiResp.Items {
        videos = append(videos, Video{
            VideoID:      item.ID,
            Title:        item.Snippet.Title,
            ChannelTitle: item.Snippet.ChannelTitle,
            Region:       region,
        })
    }
    return videos, nil
}
Enter fullscreen mode Exit fullscreen mode

Running the Full Pipeline

var regions = []string{"JP", "KR", "TW", "SG", "VN", "TH", "HK", "US", "GB"}

func RunFetchPipeline(ctx context.Context, apiKey string) ([]Video, error) {
    client := NewYouTubeClient()
    pool := NewPool(3, client) // 3 concurrent workers

    ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
    defer cancel()

    pool.Start(ctx)

    // Submit all jobs in a separate goroutine so we can read results concurrently
    go func() {
        for _, region := range regions {
            pool.Submit(FetchJob{Region: region, APIKey: apiKey})
        }
        pool.Close()
    }()

    var all []Video
    var errs []error
    for result := range pool.Results { // Range closes when Results is closed
        if result.Error != nil {
            log.Printf("[%s] error: %v (took %v)", result.Region, result.Error, result.Took)
            errs = append(errs, result.Error)
            continue
        }
        log.Printf("[%s] %d videos in %v", result.Region, len(result.Videos), result.Took)
        all = append(all, result.Videos...)
    }

    if len(all) == 0 {
        return nil, fmt.Errorf("no videos fetched, %d errors", len(errs))
    }
    return all, nil
}
Enter fullscreen mode Exit fullscreen mode

Performance for TopVideoHub

Workers Typical Time Notes
1 ~20s Sequential
3 ~8s Balanced
9 ~3s Full parallel

Three workers gives TopVideoHub an 8-second fetch cycle for all 9 Asia-Pacific regions — fast enough to keep data fresh, conservative enough to avoid API rate limits. The channel-based design means adding Singapore or Vietnam to the regions list requires no architectural changes.


This article is part of the Building TopVideoHub series. Check out TopVideoHub to see these techniques in action.

Top comments (0)