Twitch generates massive amounts of real-time data — viewer counts, stream schedules, chat activity, and game trends. Extracting this data lets you build analytics dashboards, track gaming trends, and identify rising streamers.
What Twitch Data Can You Collect?
- Live stream viewer counts and metadata
- Channel statistics (followers, total views, stream schedule)
- Game/category popularity and trends
- VOD (video on demand) metadata
- Clip data and engagement metrics
Using the Twitch API
Twitch provides an official API (Helix) that's the best starting point:
import requests
import time
from datetime import datetime
class TwitchAnalytics:
API_BASE = "https://api.twitch.tv/helix"
def __init__(self, client_id, access_token):
self.session = requests.Session()
self.session.headers.update({
'Client-ID': client_id,
'Authorization': f'Bearer {access_token}',
})
def get_top_streams(self, game_id=None, first=100):
"""Get currently top live streams."""
params = {'first': first}
if game_id:
params['game_id'] = game_id
resp = self.session.get(f"{self.API_BASE}/streams", params=params)
return resp.json().get('data', [])
def get_user_info(self, usernames):
"""Get channel info for specific streamers."""
params = [('login', name) for name in usernames]
resp = self.session.get(f"{self.API_BASE}/users", params=params)
return resp.json().get('data', [])
def get_game_analytics(self, first=20):
"""Get top games by current viewer count."""
resp = self.session.get(f"{self.API_BASE}/games/top", params={'first': first})
return resp.json().get('data', [])
def get_channel_schedule(self, broadcaster_id):
"""Get a streamer's schedule."""
params = {'broadcaster_id': broadcaster_id}
resp = self.session.get(f"{self.API_BASE}/schedule", params=params)
return resp.json().get('data', {})
Building a Viewership Tracker
import pandas as pd
import json
class ViewershipTracker:
def __init__(self, analytics, history_file='twitch_history.jsonl'):
self.analytics = analytics
self.history_file = history_file
def snapshot_top_streams(self, count=50):
"""Take a snapshot of current top streams."""
streams = self.analytics.get_top_streams(first=count)
timestamp = datetime.now().isoformat()
records = []
for stream in streams:
record = {
'timestamp': timestamp,
'user_name': stream['user_name'],
'game_name': stream['game_name'],
'viewer_count': stream['viewer_count'],
'title': stream['title'],
'language': stream['language'],
'started_at': stream['started_at'],
}
records.append(record)
with open(self.history_file, 'a') as f:
for record in records:
f.write(json.dumps(record) + '\n')
print(f"Captured {len(records)} streams at {timestamp}")
return records
def analyze_trends(self, days=7):
"""Analyze viewing trends from collected data."""
records = []
with open(self.history_file) as f:
for line in f:
records.append(json.loads(line))
df = pd.DataFrame(records)
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Top games by peak viewers
game_peaks = df.groupby('game_name')['viewer_count'].max()
print("Top games by peak viewers:")
print(game_peaks.sort_values(ascending=False).head(10))
# Most consistent streamers
streamer_appearances = df['user_name'].value_counts()
print("\nMost consistently live streamers:")
print(streamer_appearances.head(10))
return df
Game Trend Detection
def detect_rising_games(tracker, threshold=2.0):
"""Find games with unusual viewership spikes."""
records = []
with open(tracker.history_file) as f:
for line in f:
records.append(json.loads(line))
df = pd.DataFrame(records)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['date'] = df['timestamp'].dt.date
# Daily average viewers per game
daily = df.groupby(['date', 'game_name'])['viewer_count'].mean().reset_index()
pivot = daily.pivot(index='date', columns='game_name', values='viewer_count').fillna(0)
# Detect spikes: today vs 7-day average
if len(pivot) >= 2:
latest = pivot.iloc[-1]
avg = pivot.iloc[-7:].mean()
ratio = latest / avg.replace(0, 1)
rising = ratio[ratio > threshold].sort_values(ascending=False)
print("Rising games (viewership spike):")
for game, spike in rising.items():
print(f" {game}: {spike:.1f}x normal viewership")
return rising
return pd.Series()
Streamer Discovery
def find_rising_streamers(analytics, game_id, min_viewers=100, max_viewers=5000):
"""Find mid-size streamers with growth potential."""
streams = analytics.get_top_streams(game_id=game_id, first=100)
rising = [
s for s in streams
if min_viewers <= s['viewer_count'] <= max_viewers
]
# Sort by viewer count relative to follower count (engagement ratio)
for streamer in rising:
user_info = analytics.get_user_info([streamer['user_login']])
if user_info:
streamer['profile_views'] = user_info[0].get('view_count', 0)
time.sleep(0.5)
rising.sort(key=lambda x: x['viewer_count'], reverse=True)
print(f"Found {len(rising)} mid-size streamers:")
for s in rising[:10]:
print(f" {s['user_name']}: {s['viewer_count']} viewers - {s['title'][:50]}")
return rising
Scaling Twitch Data Collection
For comprehensive Twitch analytics beyond what the API provides — historical data, chat logs, and cross-platform comparisons — the Twitch Scraper on Apify collects structured data at scale without API rate limit concerns.
For robust data collection with proxy rotation, ScrapeOps provides monitoring and proxy management tailored for continuous scraping operations.
Conclusion
Twitch data analytics combines API access with web scraping to build a complete picture of the streaming landscape. Start with the Helix API for real-time data, build tracking over time, and use trend detection to identify opportunities. Whether you're building a streamer discovery tool or a game trend dashboard, the data pipeline is straightforward with Python.
Top comments (0)