DEV Community

agenthustler
agenthustler

Posted on

How to monitor Bluesky posts and track keywords in real-time (Python 2026)

Bluesky has grown significantly since its launch, and in 2026 it's a meaningful signal source for developers, researchers, and toolbuilders. Unlike Twitter/X, Bluesky is built on the AT Protocol — an open, federated protocol that makes programmatic access first-class rather than an afterthought.

This article covers every practical approach to monitoring Bluesky in Python: the official API, the real-time Firehose, and search-based keyword tracking. All with working code examples.


Understanding the AT Protocol (briefly)

Before diving in, a few terms worth knowing:

  • AT Protocol (atproto): The open protocol Bluesky runs on. Think of it like ActivityPub but designed with data portability and programmatic access in mind.
  • PDS (Personal Data Server): Where user data lives. Bluesky's default PDS is bsky.social, but users can self-host.
  • Lexicon: The schema system that defines data types (posts = app.bsky.feed.post, likes = app.bsky.feed.like, etc.)
  • DID: Decentralized identifier — the stable ID for each user, even if their handle changes.
  • CID: Content identifier — a hash-based ID for each record (post, like, follow, etc.)

You don't need to understand all of this to get data out of Bluesky, but it helps when you're reading API responses.


Option 1: The Official Bluesky HTTP API (AppView)

Bluesky runs a public HTTP API on top of the AT Protocol. No authentication is required for most read operations.

The main endpoint is: https://public.api.bsky.app/xrpc/

Get a user's recent posts

import requests

def get_user_posts(handle, limit=10):
    resp = requests.get(
        "https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed",
        params={"actor": handle, "limit": limit}
    )
    resp.raise_for_status()
    feed = resp.json()
    posts = []
    for item in feed.get("feed", []):
        post = item["post"]
        posts.append({
            "text": post["record"]["text"],
            "created_at": post["record"]["createdAt"],
            "likes": post["likeCount"],
            "reposts": post["repostCount"],
            "uri": post["uri"],
        })
    return posts

for post in get_user_posts("bsky.app", limit=5):
    print(f"[{post['likes']} likes] {post['text'][:100]}")
    print(f"  Posted: {post['created_at']}")
Enter fullscreen mode Exit fullscreen mode

Get a post's thread and replies

def get_thread(post_uri):
    resp = requests.get(
        "https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread",
        params={"uri": post_uri, "depth": 3}
    )
    thread = resp.json().get("thread", {})
    return thread

# post_uri looks like: at://did:plc:xyz.../app.bsky.feed.post/abc123
Enter fullscreen mode Exit fullscreen mode

Get the home timeline (requires auth)

For timeline access, you need to authenticate:

import requests

def get_session(handle, password):
    resp = requests.post(
        "https://bsky.social/xrpc/com.atproto.server.createSession",
        json={"identifier": handle, "password": password}
    )
    resp.raise_for_status()
    return resp.json()

def get_timeline(access_jwt, limit=20):
    resp = requests.get(
        "https://bsky.social/xrpc/app.bsky.feed.getTimeline",
        headers={"Authorization": f"Bearer {access_jwt}"},
        params={"limit": limit}
    )
    return resp.json().get("feed", [])

session = get_session("yourhandle.bsky.social", "your-app-password")
posts = get_timeline(session["accessJwt"])
Enter fullscreen mode Exit fullscreen mode

Note: Use an App Password, not your main password. You can create one in Bluesky's settings.


Option 2: Search API for Keyword Monitoring

Bluesky's search API is the practical choice for keyword-based monitoring. No authentication needed for basic searches.

import requests

def search_bluesky(query, limit=25):
    resp = requests.get(
        "https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts",
        params={"q": query, "limit": limit}
    )
    resp.raise_for_status()
    data = resp.json()
    results = []
    for post in data.get("posts", []):
        results.append({
            "text": post["record"]["text"],
            "author": post["author"]["handle"],
            "created_at": post["record"]["createdAt"],
            "likes": post.get("likeCount", 0),
            "uri": post["uri"],
        })
    return results

# Monitor mentions of a product or topic
posts = search_bluesky("python scraping", limit=20)
for p in posts:
    print(f"@{p['author']}: {p['text'][:120]}")
    print(f"  {p['created_at']} | {p['likes']} likes")
    print()
Enter fullscreen mode Exit fullscreen mode

Paginating through search results

The search API supports cursor-based pagination:

def search_all(query, max_pages=5):
    results = []
    cursor = None

    for _ in range(max_pages):
        params = {"q": query, "limit": 100}
        if cursor:
            params["cursor"] = cursor

        resp = requests.get(
            "https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts",
            params=params
        )
        data = resp.json()
        results.extend(data.get("posts", []))
        cursor = data.get("cursor")

        if not cursor:
            break

    return results
Enter fullscreen mode Exit fullscreen mode

Building a simple keyword monitor

import requests
import time

KEYWORDS = ["bluesky", "atproto", "python"]
SEEN = set()

def check_keywords(keywords):
    new_posts = []
    for kw in keywords:
        posts = search_bluesky(kw, limit=20)
        for p in posts:
            if p["uri"] not in SEEN:
                SEEN.add(p["uri"])
                new_posts.append((kw, p))
    return new_posts

print("Starting Bluesky keyword monitor...")
while True:
    found = check_keywords(KEYWORDS)
    for keyword, post in found:
        print(f"[{keyword}] @{post['author']}: {post['text'][:100]}")
    time.sleep(60)  # Poll every minute
Enter fullscreen mode Exit fullscreen mode

Option 3: The Firehose — Real-Time Event Stream

The AT Protocol Firehose is a WebSocket stream of every event happening on the network — every post, like, repost, follow, and delete, in real time. It's the most powerful option for real-time monitoring.

The public relay is at: wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos

You'll need the atproto library for this:

pip install atproto
Enter fullscreen mode Exit fullscreen mode

Listening to the Firehose

from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message

client = FirehoseSubscribeReposClient()

def on_message(message) -> None:
    commit = parse_subscribe_repos_message(message)

    # Only process commits (not tombstones, handles, etc.)
    if not hasattr(commit, "ops"):
        return

    for op in commit.ops:
        # Only look at new posts
        if op.action == "create" and op.path.startswith("app.bsky.feed.post/"):
            record = op.record
            if record and hasattr(record, "text"):
                print(f"New post: {record.text[:100]}")

print("Connecting to Firehose...")
client.start(on_message)
Enter fullscreen mode Exit fullscreen mode

Filtering Firehose for specific keywords

from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message

KEYWORDS = ["python", "developer", "scraping"]
client = FirehoseSubscribeReposClient()

def on_message(message) -> None:
    commit = parse_subscribe_repos_message(message)
    if not hasattr(commit, "ops"):
        return

    for op in commit.ops:
        if op.action == "create" and op.path.startswith("app.bsky.feed.post/"):
            record = op.record
            if not (record and hasattr(record, "text")):
                continue
            text = record.text.lower()
            for kw in KEYWORDS:
                if kw in text:
                    print(f"[{kw}] {record.text[:120]}")
                    break

client.start(on_message)
Enter fullscreen mode Exit fullscreen mode

Firehose volume warning

The Firehose streams everything. At peak hours, Bluesky sees tens of thousands of posts per minute. Your script needs to handle high throughput or it will fall behind. For most keyword monitoring use cases, the polling approach in Option 2 is simpler and sufficient.

Use the Firehose when you need:

  • True real-time (sub-second latency)
  • High-fidelity capture (you can't miss any post matching your filter)
  • Non-post events (likes, follows, reposts)

Option 4: A Free Hosted Endpoint for Bluesky Search

If you want Bluesky data without running your own polling loop or handling Firehose throughput, The Data Collector API at https://frog03-20494.wykr.es offers a hosted Bluesky search endpoint.

100 free calls, no credit card required. Get a key instantly:

curl -X POST https://frog03-20494.wykr.es/api/register \
  -H "Content-Type: application/json" \
  -d '{"email": "you@example.com"}'
Enter fullscreen mode Exit fullscreen mode

Then search Bluesky posts:

import requests

API_KEY = "your-key-here"
BASE = "https://frog03-20494.wykr.es/api"

resp = requests.get(
    f"{BASE}/bluesky/search",
    params={"q": "python developer", "limit": 20},
    headers={"X-API-Key": API_KEY}
)

for post in resp.json().get("results", []):
    print(f"@{post['author']}: {post['text'][:100]}")
    print(f"  {post['created_at']}")
Enter fullscreen mode Exit fullscreen mode

Useful for quick integrations, prototypes, or workflows where you don't want to manage the connection yourself.


Practical Example: Track Brand Mentions

Here's a complete script for tracking mentions of a brand or product on Bluesky, deduplicating results, and printing new mentions:

import requests
import json
from pathlib import Path

BRAND = "your-product-name"
SEEN_FILE = Path("seen_uris.json")
API_URL = "https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts"

def load_seen():
    if SEEN_FILE.exists():
        return set(json.loads(SEEN_FILE.read_text()))
    return set()

def save_seen(seen):
    SEEN_FILE.write_text(json.dumps(list(seen)))

def fetch_mentions(query, limit=50):
    resp = requests.get(API_URL, params={"q": query, "limit": limit})
    return resp.json().get("posts", [])

def run():
    seen = load_seen()
    posts = fetch_mentions(BRAND)
    new_count = 0

    for post in posts:
        uri = post["uri"]
        if uri in seen:
            continue
        seen.add(uri)
        new_count += 1
        author = post["author"]["handle"]
        text = post["record"]["text"]
        ts = post["record"]["createdAt"]
        print(f"NEW MENTION at {ts}")
        print(f"  @{author}: {text[:200]}")
        print()

    save_seen(seen)
    print(f"Checked {len(posts)} posts. {new_count} new mentions of '{BRAND}'.")

if __name__ == "__main__":
    run()
Enter fullscreen mode Exit fullscreen mode

Which Approach Should You Use?

Use case Best option
Search historical posts by keyword Search API (Option 2)
Monitor a topic every few minutes Search API + polling loop
Real-time, high-fidelity capture Firehose (Option 3)
Read a specific user's posts AppView API (Option 1)
Track likes, follows, reposts Firehose
Quick prototype, no infrastructure The Data Collector API (Option 4)
Authenticated timeline access AppView API with App Password

Tips and Gotchas

Rate limits: The public API is generally permissive for research use, but don't hammer it. The atproto library handles some backoff automatically if you use the Firehose client.

App Passwords: Always use an App Password for any authenticated operations. Your main Bluesky password should never be in code.

DIDs vs handles: Handles (like user.bsky.social) can change. DIDs are stable. When storing references to users, store the DID.

Deleted posts: The Firehose emits delete events. If you're archiving posts, listen for these too.

The atproto Python library: The official atproto package (PyPI) is the most complete Python client for the AT Protocol. Use it if you're doing anything beyond simple HTTP calls.


Final Notes

Bluesky's open protocol design makes it genuinely developer-friendly in a way that Twitter's API never was. The public endpoints are stable, documented, and don't require an application process for basic access.

For most monitoring use cases, the search API is all you need. For real-time pipelines, the Firehose is available and well-documented. And if you want a hosted, managed option without writing the infrastructure yourself, The Data Collector API offers a free tier with instant access.

The AT Protocol ecosystem is still maturing in 2026, but the tooling has improved significantly. It's a good time to build on it.

Top comments (0)