DEV Community

agenthustler
agenthustler

Posted on

Scraping Twitch: Stream Data, Viewers, and Clip Analytics

Scraping Twitch: Stream Data, Viewers, and Clip Analytics

Twitch dominates live streaming with 140+ million monthly active users. Whether building analytics dashboards or tracking content trends, Twitch data is incredibly valuable.

Setting Up the Data Collector

import requests
from bs4 import BeautifulSoup
from datetime import datetime
import sqlite3, json, time

class TwitchCollector:
    def __init__(self, client_id=None, token=None, api_key=None):
        self.api_key = api_key
        self.api = requests.Session()
        if client_id and token:
            self.api.headers.update({
                'Client-ID': client_id, 'Authorization': f'Bearer {token}'})
        self.web = requests.Session()
        self.web.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})
        self.db = sqlite3.connect('twitch.db')
        self._init_db()

    def _init_db(self):
        self.db.executescript('''
            CREATE TABLE IF NOT EXISTS streams (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                channel TEXT, game TEXT, title TEXT,
                viewers INTEGER, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP);
            CREATE TABLE IF NOT EXISTS clips (
                id TEXT PRIMARY KEY, channel TEXT, title TEXT,
                views INTEGER, duration REAL, created_at TEXT, game TEXT);
        ''')

    def web_fetch(self, url):
        if self.api_key:
            return self.web.get(
                f"http://api.scraperapi.com?api_key={self.api_key}&url={url}&render=true")
        return self.web.get(url)
Enter fullscreen mode Exit fullscreen mode

Collecting via Helix API

    def get_streams(self, game_id=None, lang='en', first=100):
        params = {'first': first, 'language': lang}
        if game_id: params['game_id'] = game_id
        resp = self.api.get('https://api.twitch.tv/helix/streams', params=params)
        streams = resp.json().get('data', [])
        for s in streams:
            self.db.execute(
                'INSERT INTO streams (channel,game,title,viewers) VALUES (?,?,?,?)',
                (s['user_name'], s['game_name'], s['title'], s['viewer_count']))
        self.db.commit()
        return streams

    def get_clips(self, broadcaster_id, first=20):
        resp = self.api.get('https://api.twitch.tv/helix/clips',
                           params={'broadcaster_id': broadcaster_id, 'first': first})
        clips = resp.json().get('data', [])
        for c in clips:
            self.db.execute('''INSERT OR REPLACE INTO clips
                (id,channel,title,views,duration,created_at,game)
                VALUES (?,?,?,?,?,?,?)''',
                (c['id'], c['broadcaster_name'], c['title'],
                 c['view_count'], c['duration'], c['created_at'], c['game_id']))
        self.db.commit()
        return clips
Enter fullscreen mode Exit fullscreen mode

Scraping Category Trends

    def scrape_category(self, slug):
        resp = self.web_fetch(f"https://www.twitch.tv/directory/game/{slug}")
        soup = BeautifulSoup(resp.text, 'html.parser')
        streams = []
        for card in soup.select('[data-a-target="preview-card"]'):
            channel = card.select_one('[data-a-target="preview-card-channel-link"]')
            title = card.select_one('h3')
            viewers = card.select_one('[data-a-target="preview-card-viewer-count"]')
            viewer_count = 0
            if viewers:
                t = viewers.get_text(strip=True).upper()
                viewer_count = int(float(t.replace('K',''))*1000) if 'K' in t else int(t.replace(',','') or 0)
            streams.append({
                'channel': channel.get_text(strip=True) if channel else '',
                'title': title.get_text(strip=True) if title else '',
                'viewers': viewer_count,
                'tags': [tag.get_text(strip=True) for tag in card.select('.tag')]
            })
        return streams
Enter fullscreen mode Exit fullscreen mode

Analytics Reports

class TwitchAnalytics:
    def __init__(self, db):
        self.db = db

    def growth_report(self, channel, days=30):
        cursor = self.db.execute('''
            SELECT DATE(timestamp) as d, AVG(viewers), MAX(viewers), COUNT(*)
            FROM streams WHERE channel=? AND timestamp > datetime('now',?)
            GROUP BY d ORDER BY d
        ''', (channel, f'-{days} days'))
        return [{'date': r[0], 'avg': round(r[1]), 'peak': r[2], 'samples': r[3]}
                for r in cursor.fetchall()]

    def trending(self, hours=6):
        cursor = self.db.execute('''
            SELECT game, SUM(viewers), COUNT(DISTINCT channel), AVG(viewers)
            FROM streams WHERE timestamp > datetime('now',?)
            GROUP BY game ORDER BY SUM(viewers) DESC LIMIT 20
        ''', (f'-{hours} hours',))
        return [{'game': r[0], 'total': r[1], 'streams': r[2], 'avg': round(r[3])}
                for r in cursor.fetchall()]
Enter fullscreen mode Exit fullscreen mode

Continuous Monitoring

def monitor(collector, channels, interval=300):
    analytics = TwitchAnalytics(collector.db)
    while True:
        streams = collector.get_streams()
        print(f"[{datetime.now()}] Tracked {len(streams)} streams")
        for ch in channels:
            clips = collector.get_clips(ch)
            print(f"  {ch}: {len(clips)} clips")
        trending = analytics.trending()
        print("Top:", [t['game'] for t in trending[:5]])
        time.sleep(interval)
Enter fullscreen mode Exit fullscreen mode

For thousands of channels, ScraperAPI handles JavaScript rendering. ThorData provides residential proxies. Track performance with ScrapeOps.


Follow for more Python scraping and analytics tutorials.

Top comments (0)