It was a rainy Sunday morning. I was drinking my home-brewed coffee when my phone rang.
A customer called asking: "Can you do some data extraction job with AI?"
"Sure, why not. How many records and what kind of results do you expect?"
The answer: 10 million records of cryptic product titles that needed to be transformed into structured vehicle compatibility data.
The Challenge
- 10 million records in Microsoft SQL Database
- Minimal product info (just ID + title like:
FORD FOCUS 1.5 TDCi GEARBOX 0B5-300-057-PU
) - Budget constraint: $1,500 for AI tokens
- Need for continuous processing without manual intervention
Basically the challange was this;
The Architecture
I designed a three-tier system:
- ClickHouse - For fast data reads
- Python async processing - For concurrent AI API calls
- MongoDB - For storing structured results
- AWS ECS - For containerized deployment
My Approach
I designed this approach to achieve this challange:
It's seems easy but in the software industry, everything go wrong as you guys know.
Step 1: Data Migration to ClickHouse
I moved data from Microsoft SQL to ClickHouse because it's optimized for fast reads and can handle 100,000+ rows per second.
def fetch_records(self, limit=100):
last_row_id = self.get_last_record_id()
query = f"""
SELECT id, title, RowID
FROM records
WHERE RowID > {last_row_id}
ORDER BY RowID ASC
LIMIT {limit}
"""
return self.client.query(query).result_rows
Key lesson: I initially tried updating records in ClickHouse after processing, but this caused massive memory issues. ClickHouse queues updates, and with hundreds of concurrent updates, the system choked. Solution? Track progress using RowID in a text file instead.
Step 2: AI Processing Pipeline
Choosing the Right Model
I tested three providers:
Provider | Model | Cost per 1M tokens | Speed |
---|---|---|---|
OpenAI | GPT-4o-mini | $0.15/$0.60 | Fast |
Gemini 2.5 Flash | $0.075/$0.30 | Fastest | |
Anthropic | Claude Sonnet | $3.00/$15.00 | Medium |
Winner: Gemini 2.5 Flash Lite - Best balance of cost and performance.
The Processing Engine
async def process_batch_async(self, batch, batch_number):
tasks = [self.process_record(record) for record in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
return batch_results
Configuration:
- Batch size: 100000 records
- Batch delay: 0.5 seconds
- Async within batches, sequential between batches
The Prompt Engineering Fix
Initial attempts produced inconsistent results. The breakthrough was using few-shot prompting:
prompt = f"""You are a car parts expert. Extract compatible vehicle models in JSON format.
EXAMPLE:
[{{"brand": "Ford", "model": "Focus", "engine": "1.5 TDCi",
"category": "Gearbox", "part_number": "0B5-300-057-PU"}}]
Product: {product_title}
Respond with only the JSON array."""
This improved accuracy from 70% to 95%.
Step 3: Storing Results
def insert_record(self, record_data):
mongo_data = record_data.copy()
mongo_data['inserted_at'] = datetime.now().isoformat()
result = self.collection.insert_one(mongo_data)
return result.inserted_id is not None
MongoDB provided flexible schema for varying extraction results and easy JSON export.
Step 4: Deployment
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "run_batch.py"]
Deployed on AWS ECS with Fargate for serverless containers and auto-scaling.
The Problems I Faced
Problem 1: ClickHouse Memory Explosion
Updating processed records caused massive memory usage because ClickHouse queues all updates. With 50+ concurrent records, the system crashed.
Solution: Switched to append-only strategy using RowID
tracking in lastRecord.txt
.
Problem 2: Inconsistent AI Responses
AI would sometimes return plain text, skip fields, or use different JSON structures.
Solution: Added few-shot prompting with explicit examples and response validation with retry logic for malformed JSON.
Problem 3: Google Cloud Billing Disaster
Google Cloud billed Gemini 2.5 Flash text usage as image processing rates. My budget exploded from $1,500 to $3,400 overnight.
Resolution: Contacted Google Cloud Support. After 48 hours, they confirmed it was a billing system bug and issued a full refund.
Lesson learned: Always set up billing alerts with hard limits.
Problem 4: Failed Records Recovery
Network timeouts and API errors caused some records to fail. I needed reprocessing without starting over.
Solution: Export failed records to JSON, then use targeted retry:
def fetch_records_failed_from_json(self, json_file, start=0, limit=100):
with open(json_file, 'r') as f:
failed_data = json.load(f)
row_ids = [r['row_id'] for r in failed_data['removed_documents']]
return self.fetch_by_row_ids(row_ids[start:start + limit])
The Results
Final Statistics:
- Records processed: 10,000,000
- Success rate: 96.2%
- Total cost: $1,347 (under budget)
- Processing time: 14 days continuous run
- Average speed: 740,000 records/day
Cost Breakdown:
- Gemini API tokens: $1,234
- AWS ECS: $78
- ClickHouse hosting: $25
- MongoDB Atlas: $10
Key Takeaways
Choose the Right Database: ClickHouse for fast reads, MongoDB for flexible results. Don't force updates where append-only works better.
Async Wisely: Concurrent API calls within batches work great. Sequential batch processing prevents rate limit issues.
Prompt Engineering Matters: A well-crafted prompt with examples improved accuracy by 25%.
Always Have Resume Strategy: Track progress in the simplest way possible. Even a text file works.
Monitor Cloud Bills: Set up alerts and check daily. That $3,400 surprise taught me paranoia pays off.
Cost Optimization is Creative: Batch sizing, delay tuning, and model selection all affect the balance between cost, quality, and speed.
What's Next?
This architecture adapts well for:
- Product categorization at scale
- Content moderation pipelines
- Data enrichment workflows
- Entity extraction from unstructured data
The key principles:
- Choose the right database for reads vs writes
- Use async strategically
- Track progress for crash recovery
- Monitor costs obsessively
- Optimize prompts early
Final Thoughts
Processing 10 million records with AI seemed daunting at first. But by breaking it down into smart database choices, efficient async processing, robust error handling, and careful cost monitoring, it became reliable and cost-effective.
Total investment: $1,347
Total records: 10,000,000
Cost per record: $0.0001347
The rainy Sunday coffee call turned into a 14-day adventure in building a production-grade AI pipeline.
Github Repo: https://github.com/mstfbysl/ai-clickhouse-pipeline
Top comments (0)