Concurrent Video Fetching with Go Goroutines Across 8 Global Regions
TrendVidStream fetches trending video data from 8 regions: AE, FI, DK, CZ, BE, CH, US, GB. Go's goroutines and channels make concurrent fetching clean and controllable. Here's the production-ready implementation.
Worker Pool Architecture
package fetcher
import (
"context"
"log"
"sync"
"time"
)
type FetchJob struct {
Region string
APIKey string
}
type FetchResult struct {
Region string
Videos []Video
Error error
Took time.Duration
}
type Pool struct {
workers int
jobs chan FetchJob
results chan FetchResult
client *YTClient
wg sync.WaitGroup
}
func NewPool(workers int, client *YTClient) *Pool {
return &Pool{
workers: workers,
jobs: make(chan FetchJob, 20),
results: make(chan FetchResult, 20),
client: client,
}
}
func (p *Pool) Start(ctx context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(ctx)
}
go func() {
p.wg.Wait()
close(p.results)
}()
}
func (p *Pool) worker(ctx context.Context) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-p.jobs:
if !ok {
return
}
start := time.Now()
videos, err := p.client.FetchTrending(ctx, job.Region, job.APIKey, 50)
p.results <- FetchResult{
Region: job.Region, Videos: videos,
Error: err, Took: time.Since(start),
}
}
}
}
func (p *Pool) Submit(job FetchJob) { p.jobs <- job }
func (p *Pool) Close() { close(p.jobs) }
func (p *Pool) Results() <-chan FetchResult { return p.results }
YouTube Client for TrendVidStream Regions
package fetcher
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
"golang.org/x/time/rate"
)
// Arabic (AE) titles require no special HTTP handling — YouTube returns UTF-8.
// The direction (RTL) is a display concern, not a fetch concern.
type YTClient struct {
http *http.Client
limiter *rate.Limiter
}
func NewYTClient() *YTClient {
return &YTClient{
http: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: 10,
},
},
limiter: rate.NewLimiter(rate.Limit(4), 8), // 4 req/s, burst 8
}
}
func (c *YTClient) FetchTrending(ctx context.Context, region, apiKey string, n int) ([]Video, error) {
if err := c.limiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("rate limit: %w", err)
}
q := url.Values{
"part": {"snippet,statistics"},
"chart": {"mostPopular"},
"regionCode": {region},
"maxResults": {fmt.Sprintf("%d", n)},
"key": {apiKey},
}
req, _ := http.NewRequestWithContext(
ctx, http.MethodGet,
"https://www.googleapis.com/youtube/v3/videos?"+q.Encode(),
nil,
)
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
// good
case http.StatusTooManyRequests:
// Back off and retry once
select {
case <-time.After(10 * time.Second):
case <-ctx.Done():
return nil, ctx.Err()
}
return c.FetchTrending(ctx, region, apiKey, n)
default:
return nil, fmt.Errorf("youtube API HTTP %d for region %s", resp.StatusCode, region)
}
var payload struct {
Items []struct {
ID string `json:"id"`
Snippet struct {
Title string `json:"title"` // UTF-8, handles Arabic natively
ChannelTitle string `json:"channelTitle"`
DefaultAudioLang string `json:"defaultAudioLanguage"`
} `json:"snippet"`
Statistics struct {
ViewCount string `json:"viewCount"`
} `json:"statistics"`
} `json:"items"`
}
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
return nil, err
}
vids := make([]Video, 0, len(payload.Items))
for _, item := range payload.Items {
vids = append(vids, Video{
VideoID: item.ID,
Title: item.Snippet.Title,
ChannelTitle: item.Snippet.ChannelTitle,
Language: item.Snippet.DefaultAudioLang,
Region: region,
})
}
return vids, nil
}
Running the Pipeline
var tvStreamRegions = []string{"AE", "FI", "DK", "CZ", "BE", "CH", "US", "GB"}
func RunPipeline(ctx context.Context, apiKey string) ([]Video, error) {
client := NewYTClient()
pool := NewPool(3, client) // 3 workers for 8 regions
ctx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
pool.Start(ctx)
go func() {
for _, r := range tvStreamRegions {
pool.Submit(FetchJob{Region: r, APIKey: apiKey})
}
pool.Close()
}()
var all []Video
var errCount int
for result := range pool.Results() {
if result.Error != nil {
log.Printf("[%s] error: %v (took %v)", result.Region, result.Error, result.Took)
errCount++
continue
}
log.Printf("[%s] %d videos in %v", result.Region, len(result.Videos), result.Took)
all = append(all, result.Videos...)
}
if len(all) == 0 {
return nil, fmt.Errorf("all regions failed (%d errors)", errCount)
}
return all, nil
}
Performance for TrendVidStream
| Workers | Time (8 regions) | Notes |
|---|---|---|
| 1 | ~16s | Sequential |
| 3 | ~6s | Good balance |
| 8 | ~3s | Full parallel |
Three workers handles TrendVidStream's 8 regions in about 6 seconds — fast enough for cron-based fetching while staying well under YouTube's rate limits. The rate limiter (4 req/s) ensures we never burst past API limits regardless of the worker count.
This article is part of the Building TrendVidStream series. Check out TrendVidStream to see these techniques in action.
Top comments (0)