Bluesky has grown significantly since its launch, and in 2026 it's a meaningful signal source for developers, researchers, and toolbuilders. Unlike Twitter/X, Bluesky is built on the AT Protocol — an open, federated protocol that makes programmatic access first-class rather than an afterthought.
This article covers every practical approach to monitoring Bluesky in Python: the official API, the real-time Firehose, and search-based keyword tracking. All with working code examples.
Understanding the AT Protocol (briefly)
Before diving in, a few terms worth knowing:
- AT Protocol (atproto): The open protocol Bluesky runs on. Think of it like ActivityPub but designed with data portability and programmatic access in mind.
-
PDS (Personal Data Server): Where user data lives. Bluesky's default PDS is
bsky.social, but users can self-host. -
Lexicon: The schema system that defines data types (posts =
app.bsky.feed.post, likes =app.bsky.feed.like, etc.) - DID: Decentralized identifier — the stable ID for each user, even if their handle changes.
- CID: Content identifier — a hash-based ID for each record (post, like, follow, etc.)
You don't need to understand all of this to get data out of Bluesky, but it helps when you're reading API responses.
Option 1: The Official Bluesky HTTP API (AppView)
Bluesky runs a public HTTP API on top of the AT Protocol. No authentication is required for most read operations.
The main endpoint is: https://public.api.bsky.app/xrpc/
Get a user's recent posts
import requests
def get_user_posts(handle, limit=10):
resp = requests.get(
"https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed",
params={"actor": handle, "limit": limit}
)
resp.raise_for_status()
feed = resp.json()
posts = []
for item in feed.get("feed", []):
post = item["post"]
posts.append({
"text": post["record"]["text"],
"created_at": post["record"]["createdAt"],
"likes": post["likeCount"],
"reposts": post["repostCount"],
"uri": post["uri"],
})
return posts
for post in get_user_posts("bsky.app", limit=5):
print(f"[{post['likes']} likes] {post['text'][:100]}")
print(f" Posted: {post['created_at']}")
Get a post's thread and replies
def get_thread(post_uri):
resp = requests.get(
"https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread",
params={"uri": post_uri, "depth": 3}
)
thread = resp.json().get("thread", {})
return thread
# post_uri looks like: at://did:plc:xyz.../app.bsky.feed.post/abc123
Get the home timeline (requires auth)
For timeline access, you need to authenticate:
import requests
def get_session(handle, password):
resp = requests.post(
"https://bsky.social/xrpc/com.atproto.server.createSession",
json={"identifier": handle, "password": password}
)
resp.raise_for_status()
return resp.json()
def get_timeline(access_jwt, limit=20):
resp = requests.get(
"https://bsky.social/xrpc/app.bsky.feed.getTimeline",
headers={"Authorization": f"Bearer {access_jwt}"},
params={"limit": limit}
)
return resp.json().get("feed", [])
session = get_session("yourhandle.bsky.social", "your-app-password")
posts = get_timeline(session["accessJwt"])
Note: Use an App Password, not your main password. You can create one in Bluesky's settings.
Option 2: Search API for Keyword Monitoring
Bluesky's search API is the practical choice for keyword-based monitoring. No authentication needed for basic searches.
import requests
def search_bluesky(query, limit=25):
resp = requests.get(
"https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts",
params={"q": query, "limit": limit}
)
resp.raise_for_status()
data = resp.json()
results = []
for post in data.get("posts", []):
results.append({
"text": post["record"]["text"],
"author": post["author"]["handle"],
"created_at": post["record"]["createdAt"],
"likes": post.get("likeCount", 0),
"uri": post["uri"],
})
return results
# Monitor mentions of a product or topic
posts = search_bluesky("python scraping", limit=20)
for p in posts:
print(f"@{p['author']}: {p['text'][:120]}")
print(f" {p['created_at']} | {p['likes']} likes")
print()
Paginating through search results
The search API supports cursor-based pagination:
def search_all(query, max_pages=5):
results = []
cursor = None
for _ in range(max_pages):
params = {"q": query, "limit": 100}
if cursor:
params["cursor"] = cursor
resp = requests.get(
"https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts",
params=params
)
data = resp.json()
results.extend(data.get("posts", []))
cursor = data.get("cursor")
if not cursor:
break
return results
Building a simple keyword monitor
import requests
import time
KEYWORDS = ["bluesky", "atproto", "python"]
SEEN = set()
def check_keywords(keywords):
new_posts = []
for kw in keywords:
posts = search_bluesky(kw, limit=20)
for p in posts:
if p["uri"] not in SEEN:
SEEN.add(p["uri"])
new_posts.append((kw, p))
return new_posts
print("Starting Bluesky keyword monitor...")
while True:
found = check_keywords(KEYWORDS)
for keyword, post in found:
print(f"[{keyword}] @{post['author']}: {post['text'][:100]}")
time.sleep(60) # Poll every minute
Option 3: The Firehose — Real-Time Event Stream
The AT Protocol Firehose is a WebSocket stream of every event happening on the network — every post, like, repost, follow, and delete, in real time. It's the most powerful option for real-time monitoring.
The public relay is at: wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
You'll need the atproto library for this:
pip install atproto
Listening to the Firehose
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
client = FirehoseSubscribeReposClient()
def on_message(message) -> None:
commit = parse_subscribe_repos_message(message)
# Only process commits (not tombstones, handles, etc.)
if not hasattr(commit, "ops"):
return
for op in commit.ops:
# Only look at new posts
if op.action == "create" and op.path.startswith("app.bsky.feed.post/"):
record = op.record
if record and hasattr(record, "text"):
print(f"New post: {record.text[:100]}")
print("Connecting to Firehose...")
client.start(on_message)
Filtering Firehose for specific keywords
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
KEYWORDS = ["python", "developer", "scraping"]
client = FirehoseSubscribeReposClient()
def on_message(message) -> None:
commit = parse_subscribe_repos_message(message)
if not hasattr(commit, "ops"):
return
for op in commit.ops:
if op.action == "create" and op.path.startswith("app.bsky.feed.post/"):
record = op.record
if not (record and hasattr(record, "text")):
continue
text = record.text.lower()
for kw in KEYWORDS:
if kw in text:
print(f"[{kw}] {record.text[:120]}")
break
client.start(on_message)
Firehose volume warning
The Firehose streams everything. At peak hours, Bluesky sees tens of thousands of posts per minute. Your script needs to handle high throughput or it will fall behind. For most keyword monitoring use cases, the polling approach in Option 2 is simpler and sufficient.
Use the Firehose when you need:
- True real-time (sub-second latency)
- High-fidelity capture (you can't miss any post matching your filter)
- Non-post events (likes, follows, reposts)
Option 4: A Free Hosted Endpoint for Bluesky Search
If you want Bluesky data without running your own polling loop or handling Firehose throughput, The Data Collector API at https://frog03-20494.wykr.es offers a hosted Bluesky search endpoint.
100 free calls, no credit card required. Get a key instantly:
curl -X POST https://frog03-20494.wykr.es/api/register \
-H "Content-Type: application/json" \
-d '{"email": "you@example.com"}'
Then search Bluesky posts:
import requests
API_KEY = "your-key-here"
BASE = "https://frog03-20494.wykr.es/api"
resp = requests.get(
f"{BASE}/bluesky/search",
params={"q": "python developer", "limit": 20},
headers={"X-API-Key": API_KEY}
)
for post in resp.json().get("results", []):
print(f"@{post['author']}: {post['text'][:100]}")
print(f" {post['created_at']}")
Useful for quick integrations, prototypes, or workflows where you don't want to manage the connection yourself.
Practical Example: Track Brand Mentions
Here's a complete script for tracking mentions of a brand or product on Bluesky, deduplicating results, and printing new mentions:
import requests
import json
from pathlib import Path
BRAND = "your-product-name"
SEEN_FILE = Path("seen_uris.json")
API_URL = "https://public.api.bsky.app/xrpc/app.bsky.feed.searchPosts"
def load_seen():
if SEEN_FILE.exists():
return set(json.loads(SEEN_FILE.read_text()))
return set()
def save_seen(seen):
SEEN_FILE.write_text(json.dumps(list(seen)))
def fetch_mentions(query, limit=50):
resp = requests.get(API_URL, params={"q": query, "limit": limit})
return resp.json().get("posts", [])
def run():
seen = load_seen()
posts = fetch_mentions(BRAND)
new_count = 0
for post in posts:
uri = post["uri"]
if uri in seen:
continue
seen.add(uri)
new_count += 1
author = post["author"]["handle"]
text = post["record"]["text"]
ts = post["record"]["createdAt"]
print(f"NEW MENTION at {ts}")
print(f" @{author}: {text[:200]}")
print()
save_seen(seen)
print(f"Checked {len(posts)} posts. {new_count} new mentions of '{BRAND}'.")
if __name__ == "__main__":
run()
Which Approach Should You Use?
| Use case | Best option |
|---|---|
| Search historical posts by keyword | Search API (Option 2) |
| Monitor a topic every few minutes | Search API + polling loop |
| Real-time, high-fidelity capture | Firehose (Option 3) |
| Read a specific user's posts | AppView API (Option 1) |
| Track likes, follows, reposts | Firehose |
| Quick prototype, no infrastructure | The Data Collector API (Option 4) |
| Authenticated timeline access | AppView API with App Password |
Tips and Gotchas
Rate limits: The public API is generally permissive for research use, but don't hammer it. The atproto library handles some backoff automatically if you use the Firehose client.
App Passwords: Always use an App Password for any authenticated operations. Your main Bluesky password should never be in code.
DIDs vs handles: Handles (like user.bsky.social) can change. DIDs are stable. When storing references to users, store the DID.
Deleted posts: The Firehose emits delete events. If you're archiving posts, listen for these too.
The atproto Python library: The official atproto package (PyPI) is the most complete Python client for the AT Protocol. Use it if you're doing anything beyond simple HTTP calls.
Final Notes
Bluesky's open protocol design makes it genuinely developer-friendly in a way that Twitter's API never was. The public endpoints are stable, documented, and don't require an application process for basic access.
For most monitoring use cases, the search API is all you need. For real-time pipelines, the Firehose is available and well-documented. And if you want a hosted, managed option without writing the infrastructure yourself, The Data Collector API offers a free tier with instant access.
The AT Protocol ecosystem is still maturing in 2026, but the tooling has improved significantly. It's a good time to build on it.
Top comments (0)