TL;DR
Built a live-syncing HackerNews search engine that:
✅ Fetches 500 recent threads + nested comments
✅ Auto-updates when content changes
✅ Searchable with PostgreSQL full-text search
✅ Zero orchestration overhead (no Airflow, Kafka, or cron)
The secret? A declarative data framework called CocoIndex that handles all the boring stuff.
The Problem: Data Pipelines Are Still Too Hard
Every dev has been there. You want to:
- Pull data from an API
- Keep it in sync
- Make it searchable
- Not spend 3 weeks on infrastructure
But you end up with:
- A rats nest of cron jobs
- Manual state management
- "Just reprocess everything" logic
- 500 lines of boilerplate
There had to be a better way.
The Solution: Declarative Data Flows
What if you could describe what you want, not how to get it?
That's the idea behind CocoIndex. Think of it like React, but for data:
- You declare the shape of your data
- The framework handles incremental updates
- State management is automatic
- Everything stays in sync
No orchestrators. No message queues. Just Python.
Building the HackerNews Index
Step 1: Define Your Data Model
First, tell CocoIndex what a HackerNews thread looks like:
class _HackerNewsThreadKey(NamedTuple):
thread_id: str
@dataclasses.dataclass
class _HackerNewsThread:
author: str | None
text: str
url: str | None
created_at: datetime | None
comments: list[_HackerNewsComment]
Simple Python types. No magic.
Step 2: Build a Custom Source
Wrap the HackerNews API in a source connector:
@source_connector(
spec_cls=HackerNewsSource,
key_type=_HackerNewsThreadKey,
value_type=_HackerNewsThread,
)
class HackerNewsConnector:
async def list(self):
# Fetch thread metadata from HN API
# Return keys + timestamps
async def get_value(self, key):
# Fetch full thread + comments
# Return structured data
CocoIndex uses this to:
- Know what threads exist
- Track what changed
- Fetch only what's needed
Step 3: Define Your Flow
Now the magic happens. Describe your pipeline like a React component:
@cocoindex.flow_def(name="HackerNewsIndex")
def hackernews_flow(flow_builder, data_scope):
# Connect the source
data_scope["threads"] = flow_builder.add_source(
HackerNewsSource(tag="story", max_results=500),
refresh_interval=timedelta(minutes=1),
)
# Create a collector for search index
message_index = data_scope.add_collector()
# Process each thread
with data_scope["threads"].row() as thread:
message_index.collect(
id=thread["thread_id"],
thread_id=thread["thread_id"],
content_type="thread",
author=thread["author"],
text=thread["text"],
created_at=thread["created_at"],
)
# Process nested comments
with thread["comments"].row() as comment:
message_index.collect(
id=comment["id"],
thread_id=thread["thread_id"],
content_type="comment",
author=comment["author"],
text=comment["text"],
created_at=comment["created_at"],
)
# Export to Postgres
message_index.export(
"hn_messages",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
)
That's it. No state management. No diffs. No "last_updated" columns.
CocoIndex figures it all out.
Running It
One-time sync:
cocoindex update main
Live mode (keeps running):
cocoindex update -L main
CocoIndex automatically:
- Polls the API every minute
- Detects changed threads
- Updates only what's needed
- Keeps Postgres in sync
Add Full-Text Search
Because everything's in Postgres, searching is trivial:
@hackernews_flow.query_handler()
def search_text(query: str):
with connection_pool().connection() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT id, thread_id, author, text, created_at,
ts_rank(to_tsvector('english', text),
plainto_tsquery('english', %s)) as rank
FROM hn_messages
WHERE to_tsvector('english', text) @@
plainto_tsquery('english', %s)
ORDER BY rank DESC, created_at DESC
""", (query, query))
return [dict(zip(["id", "thread_id", "author", "text", "created_at"], row))
for row in cur.fetchall()]
PostgreSQL's built-in full-text search. No Elasticsearch needed.
Why This Matters
Before CocoIndex:
- 500+ lines of state management
- Custom diffing logic
- Orchestration setup
- Manual incremental updates
- Prayer-based reliability
After CocoIndex:
- ~100 lines total
- Declarative
- Automatic incremental updates
- Built-in observability
- Deterministic
Real-World Use Cases
Custom Sources unlock powerful patterns:
1. Knowledge Base for AI Agents
Pull docs from Notion + GitHub + Confluence → searchable LLM context
2. "Composite" Entities
Stitch user data from Auth0 + Stripe + Redis → unified User360 view
3. Legacy System Modernization
Wrap SOAP/XML APIs → queryable SQL interface
4. Competitive Intelligence
Monitor competitor pricing/changes → trigger alerts on diffs
The "React for Data" Paradigm
CocoIndex brings React's declarative model to data pipelines:
| React | CocoIndex |
|---|---|
| Describe UI | Describe data shape |
| Virtual DOM diffs | Automatic change detection |
| Component state | Persistent state |
| Re-render on change | Re-process on change |
| Props → children | Sources → transforms |
If you can fetch it, CocoIndex can index it, diff it, and sync it.
Try It Yourself
Full code: https://github.com/cocoindex-io/cocoindex/tree/main/examples/custom_source_hn
Quick start:
pip install cocoindex
cocoindex update main
If this saves you from writing another cron job, drop a ⭐ on the CocoIndex repo!
What Will You Build?
Custom Sources turn any API into a first-class data stream:
- Reddit/Twitter aggregators
- Multi-SaaS dashboards
- Real-time monitoring
- Data warehouse mirrors
- Whatever you can dream up
No infrastructure overhead. Just describe what you want and run it.
What API will you turn into a searchable index?
Top comments (0)