From Prototype to Production: Lessons Scaling an AI Interview Platform
Introduction: The Reality Gap
Building a working AI prototype is exciting. Making it production-ready for hundreds of concurrent users, enterprise clients, and mission-critical hiring decisions is an entirely different challenge. As a founder who bootstrapped an AI interview platform from concept to production, I've learned that the gap between "it works on my laptop" and "it reliably serves thousands of users" is filled with technical challenges that textbooks rarely cover.
This article shares practical lessons from scaling an AI-powered interview platform, focusing on the technical, operational, and business challenges unique to production AI systems in the HR technology space.
The Journey: From MVP to Production
Phase 1: The MVP (Months 0-3)
What We Built:
- Basic chatbot using OpenAI API
- Simple Flask web server
- SQLite database
- Deployed on a single AWS EC2 instance
- ~20 test users from personal network
What Worked:
- Proved the concept
- Got initial user feedback
- Validated market need
- Cost: ~$200/month
What Broke Immediately at Scale:
- Database locks with concurrent users
- API rate limits during peak usage
- Memory leaks crashed server after ~100 interviews
- No queue system for processing
- Zero monitoring or observability
Phase 2: Production-Ready Architecture (Months 4-8)
Moving from prototype to production required rebuilding nearly everything:
# Before (MVP)
@app.route('/interview', methods=['POST'])
def conduct_interview():
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": request.json['message']}]
)
db.execute("INSERT INTO responses VALUES (?)", (response,))
return jsonify(response)
# After (Production)
@app.route('/interview', methods=['POST'])
@rate_limit(max_calls=100, period=60)
@authenticate_user
@validate_input
async def conduct_interview():
"""Production-ready interview endpoint"""
try:
# Validate and sanitize input
message = sanitize_input(request.json.get('message'))
# Add to processing queue
task_id = await queue.enqueue(
'process_interview_response',
message=message,
user_id=g.user_id,
session_id=g.session_id,
priority='high'
)
# Return immediately, process asynchronously
return jsonify({
'task_id': task_id,
'status': 'processing'
}), 202
except Exception as e:
logger.error(f"Interview error: {str(e)}", exc_info=True)
metrics.increment('interview_errors')
return jsonify({'error': 'Internal error'}), 500
Challenge 1: Handling Concurrent Users
The Problem
AI interview platforms have unique concurrency patterns:
- Burst traffic: Companies hire in waves, not steady streams
- Long-running operations: AI inference takes 2-5 seconds per response
- Stateful conversations: Each interview is a multi-turn dialogue
- Peak hours: Most interviews happen 9 AM - 5 PM in each timezone
When we hit 50 concurrent users, everything broke:
- Database connection pool exhausted
- LLM API rate limits exceeded
- Server CPU maxed out
- Response times jumped from 2s to 30s+
The Solution: Queue-Based Architecture
import redis
from rq import Queue, Worker
import asyncio
class InterviewProcessor:
def __init__(self):
self.redis_conn = redis.Redis(
host='redis-cluster',
port=6379,
db=0,
decode_responses=True
)
# Separate queues for different priorities
self.high_priority = Queue('high', connection=self.redis_conn)
self.normal_priority = Queue('normal', connection=self.redis_conn)
self.low_priority = Queue('low', connection=self.redis_conn)
def enqueue_interview_task(
self,
message: str,
session_id: str,
priority: str = 'normal'
) -> str:
"""Add interview processing task to queue"""
queue = self.get_queue_by_priority(priority)
job = queue.enqueue(
'workers.process_interview_message',
message=message,
session_id=session_id,
job_timeout='60s',
result_ttl=3600, # Keep results for 1 hour
failure_ttl=86400 # Keep failures for debugging
)
return job.id
def get_job_status(self, job_id: str) -> Dict:
"""Check status of queued job"""
job = Job.fetch(job_id, connection=self.redis_conn)
return {
'status': job.get_status(),
'result': job.result if job.is_finished else None,
'error': str(job.exc_info) if job.is_failed else None,
'position_in_queue': job.get_position() if job.is_queued else None
}
Worker Process:
# workers.py
import openai
from rq import get_current_job
def process_interview_message(message: str, session_id: str) -> Dict:
"""Background worker for processing interview messages"""
job = get_current_job()
# Update job progress
job.meta['stage'] = 'retrieving_context'
job.save_meta()
# Get conversation context
context = retrieve_conversation_context(session_id)
job.meta['stage'] = 'generating_response'
job.save_meta()
# Generate AI response
response = generate_ai_response(message, context)
job.meta['stage'] = 'saving_results'
job.save_meta()
# Save to database
save_interview_response(session_id, message, response)
job.meta['stage'] = 'completed'
job.save_meta()
return {
'response': response,
'session_id': session_id,
'timestamp': datetime.now().isoformat()
}
Results:
- Handled 500+ concurrent users without degradation
- Average response time: 2.3 seconds (down from 30s)
- Clear visibility into processing queue
- Graceful degradation under extreme load
Challenge 2: LLM API Management
The Cost Problem
OpenAI API costs scale quickly:
- GPT-4: $0.03 per 1K input tokens, $0.06 per 1K output tokens
- Average interview: 50 turns × 500 tokens/turn = 25K tokens
- Cost per interview: $0.75 - $1.50
At 1,000 interviews/day: $750 - $1,500/day = $22,500 - $45,000/month
For a bootstrapped startup, this was unsustainable.
Cost Optimization Strategies
1. Intelligent Model Selection:
class ModelSelector:
def select_optimal_model(
self,
task_complexity: str,
required_quality: str,
context_length: int
) -> str:
"""Select cheapest model that meets requirements"""
if task_complexity == 'simple' and context_length < 4000:
# Acknowledgments, simple follow-ups
return 'gpt-3.5-turbo' # $0.0015/1K tokens
elif task_complexity == 'medium' and required_quality == 'standard':
# Most interview questions
return 'gpt-3.5-turbo-16k' # $0.003/1K tokens
elif context_length > 8000 or required_quality == 'high':
# Complex evaluations, long context
return 'gpt-4-turbo-preview' # $0.01/1K tokens
else:
return 'gpt-4' # $0.03/1K tokens
Savings: 40% reduction by routing simple tasks to cheaper models
2. Aggressive Caching:
import hashlib
from functools import lru_cache
class ResponseCache:
def __init__(self):
self.redis_client = redis.Redis()
self.ttl = 3600 * 24 * 7 # 1 week
def get_cached_response(
self,
prompt: str,
model: str,
temperature: float
) -> Optional[str]:
"""Check if we have cached response for this prompt"""
# Create cache key
cache_key = self.create_cache_key(prompt, model, temperature)
# Check cache
cached = self.redis_client.get(cache_key)
if cached:
metrics.increment('cache_hits')
return json.loads(cached)
metrics.increment('cache_misses')
return None
def cache_response(
self,
prompt: str,
model: str,
temperature: float,
response: str
):
"""Cache response for future use"""
cache_key = self.create_cache_key(prompt, model, temperature)
self.redis_client.setex(
cache_key,
self.ttl,
json.dumps(response)
)
def create_cache_key(
self,
prompt: str,
model: str,
temperature: float
) -> str:
"""Generate deterministic cache key"""
content = f"{prompt}|{model}|{temperature}"
return f"llm_cache:{hashlib.sha256(content.encode()).hexdigest()}"
Savings: 35% reduction from cache hits on common questions
3. Prompt Optimization:
# Before: Verbose prompt (1,200 tokens)
prompt = f"""
You are an AI interviewer conducting an interview for the position
of {job_title} at {company_name}.
The job description is as follows:
{job_description} # Often 500+ tokens
The candidate's background includes:
{candidate_background} # Another 300+ tokens
Previous conversation:
{full_conversation_history} # Could be 400+ tokens
Guidelines for interviewing:
{lengthy_guidelines} # 200+ tokens
Now, given the candidate said: "{candidate_response}"
Generate the next interview question.
"""
# After: Optimized prompt (400 tokens)
prompt = f"""
Interview: {job_title}
Key requirements: {extract_key_requirements(job_description)} # 100 tokens
Recent context: {summarize_recent_turns(history, n=3)} # 150 tokens
Candidate said: "{candidate_response}"
Next question:
"""
Savings: 65% token reduction per API call
Combined Result: 70% cost reduction ($22.5K → $6.75K/month)
Challenge 3: Database Performance at Scale
The Problem
Our SQLite database worked fine for prototyping but couldn't handle production load:
- Write locks blocked concurrent interviews
- No query optimization
- No connection pooling
- Slow full-text search
- No backup/recovery strategy
Migration to Production Database
# Production database schema
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
Base = declarative_base()
class Interview(Base):
__tablename__ = 'interviews'
id = Column(Integer, primary_key=True)
session_id = Column(String(64), unique=True, nullable=False, index=True)
candidate_id = Column(Integer, ForeignKey('candidates.id'), index=True)
job_id = Column(Integer, ForeignKey('jobs.id'), index=True)
status = Column(String(20), index=True) # active, completed, abandoned
started_at = Column(DateTime, nullable=False, index=True)
completed_at = Column(DateTime)
total_duration = Column(Integer) # seconds
# Relationships
messages = relationship('Message', back_populates='interview')
evaluation = relationship('Evaluation', back_populates='interview', uselist=False)
# Indexes for common queries
__table_args__ = (
Index('ix_interview_candidate_date', 'candidate_id', 'started_at'),
Index('ix_interview_job_status', 'job_id', 'status'),
)
class Message(Base):
__tablename__ = 'messages'
id = Column(Integer, primary_key=True)
interview_id = Column(Integer, ForeignKey('interviews.id'), nullable=False, index=True)
sender = Column(String(10), nullable=False) # 'ai' or 'candidate'
content = Column(Text, nullable=False)
timestamp = Column(DateTime, nullable=False, index=True)
# For analytics
tokens_used = Column(Integer)
latency_ms = Column(Integer)
model_version = Column(String(50))
interview = relationship('Interview', back_populates='messages')
class Evaluation(Base):
__tablename__ = 'evaluations'
id = Column(Integer, primary_key=True)
interview_id = Column(Integer, ForeignKey('interviews.id'), nullable=False, unique=True)
# Scores
overall_score = Column(Float)
technical_score = Column(Float)
communication_score = Column(Float)
cultural_fit_score = Column(Float)
# Analysis
strengths = Column(Text)
weaknesses = Column(Text)
recommendation = Column(String(20)) # hire, maybe, no_hire
created_at = Column(DateTime, nullable=False)
interview = relationship('Interview', back_populates='evaluation')
# Connection pooling
engine = create_engine(
'postgresql://user:pass@host:5432/interviews',
pool_size=20, # Base pool size
max_overflow=10, # Additional connections if needed
pool_pre_ping=True, # Verify connections before using
pool_recycle=3600, # Recycle connections every hour
echo=False # Don't log SQL in production
)
SessionLocal = sessionmaker(bind=engine)
Query Optimization
from sqlalchemy.orm import joinedload, selectinload
class InterviewRepository:
def get_active_interviews_with_messages(
self,
limit: int = 100
):
"""Efficiently load interviews with related data"""
session = SessionLocal()
try:
# Use eager loading to prevent N+1 queries
interviews = session.query(Interview)\
.options(
selectinload(Interview.messages),
joinedload(Interview.evaluation)
)\
.filter(Interview.status == 'active')\
.order_by(Interview.started_at.desc())\
.limit(limit)\
.all()
return interviews
finally:
session.close()
def get_interview_statistics(
self,
job_id: int,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Optimized aggregation query"""
session = SessionLocal()
try:
from sqlalchemy import func
stats = session.query(
func.count(Interview.id).label('total_interviews'),
func.avg(Interview.total_duration).label('avg_duration'),
func.avg(Evaluation.overall_score).label('avg_score')
).join(Evaluation)\
.filter(
Interview.job_id == job_id,
Interview.started_at.between(start_date, end_date),
Interview.status == 'completed'
)\ .first()
return {
'total_interviews': stats.total_interviews,
'avg_duration_minutes': stats.avg_duration / 60 if stats.avg_duration else 0,
'avg_score': round(stats.avg_score, 2) if stats.avg_score else 0
}
finally:
session.close()
Read Replicas for Analytics
class DatabaseRouter:
"""Route queries to primary or read replica"""
def __init__(self):
self.primary = create_engine('postgresql://primary:5432/db')
self.replica = create_engine('postgresql://replica:5432/db')
def get_session(self, read_only: bool = False):
"""Get appropriate database session"""
if read_only:
return sessionmaker(bind=self.replica)()
else:
return sessionmaker(bind=self.primary)()
# Usage
def generate_analytics_report(job_id: int):
"""Heavy analytics on read replica"""
session = db_router.get_session(read_only=True)
# Complex analytics queries don't impact production writes
results = session.query(...).all()
return results
Challenge 4: Monitoring and Observability
What to Monitor in Production AI Systems
import prometheus_client as prom
from datadog import statsd
class MetricsCollector:
def __init__(self):
# Prometheus metrics
self.interview_counter = prom.Counter(
'interviews_total',
'Total interviews conducted',
['status', 'job_type']
)
self.response_latency = prom.Histogram(
'ai_response_latency_seconds',
'Time to generate AI response',
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
self.llm_cost_gauge = prom.Gauge(
'llm_cost_dollars_daily',
'Daily LLM API costs'
)
self.active_interviews = prom.Gauge(
'active_interviews',
'Number of interviews in progress'
)
self.error_counter = prom.Counter(
'interview_errors_total',
'Total interview errors',
['error_type', 'component']
)
def track_interview_message(
self,
latency: float,
tokens_used: int,
model: str,
success: bool
):
"""Track metrics for each message"""
# Response time
self.response_latency.observe(latency)
# Cost estimation
cost = self.estimate_cost(tokens_used, model)
self.llm_cost_gauge.inc(cost)
# Send to DataDog for dashboarding
statsd.histogram('interview.latency', latency)
statsd.increment('interview.messages', tags=[f'model:{model}'])
if not success:
self.error_counter.labels(
error_type='generation_failed',
component='llm'
).inc()
Logging Strategy
import structlog
import logging
# Structured logging
logger = structlog.get_logger()
class InterviewLogger:
def log_interview_event(
self,
event_type: str,
session_id: str,
**kwargs
):
"""Structured logging for interview events"""
logger.info(
event_type,
session_id=session_id,
timestamp=datetime.now().isoformat(),
**kwargs
)
def log_error_with_context(
self,
error: Exception,
session_id: str,
context: Dict
):
"""Log errors with full context for debugging"""
logger.error(
"interview_error",
session_id=session_id,
error_type=type(error).__name__,
error_message=str(error),
stack_trace=traceback.format_exc(),
context=context,
timestamp=datetime.now().isoformat()
)
# Usage
interview_logger = InterviewLogger()
try:
response = generate_ai_response(message, context)
interview_logger.log_interview_event(
'message_generated',
session_id=session_id,
message_length=len(message),
response_length=len(response),
latency=latency
)
except Exception as e:
interview_logger.log_error_with_context(
error=e,
session_id=session_id,
context={
'message': message,
'user_id': user_id,
'job_id': job_id
}
)
raise
Alerting Rules
# alerting_rules.yml
groups:
- name: interview_platform
interval: 30s
rules:
# High error rate
- alert: HighErrorRate
expr: |
rate(interview_errors_total[5m]) > 0.1
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors/second"
# Slow response times
- alert: SlowResponses
expr: |
histogram_quantile(0.95, ai_response_latency_seconds) > 10
annotations:
summary: "95th percentile response time > 10s"
# High API costs
- alert: HighAPICosts
expr: |
llm_cost_dollars_daily > 500
annotations:
summary: "Daily API costs exceeding budget"
# Queue backup
- alert: QueueBacklog
expr: |
redis_queue_length{queue="high"} > 100
annotations:
summary: "High priority queue backed up"
Challenge 5: Security and Data Privacy
Interview Data is Sensitive
Candidates share personal information:
- Employment history
- Salary expectations
- Personal projects and achievements
- Sometimes protected characteristics (inadvertently)
Security Implementation
from cryptography.fernet import Fernet
import hashlib
class SecureInterviewStorage:
def __init__(self):
self.cipher = Fernet(os.getenv('ENCRYPTION_KEY'))
def store_interview_data(
self,
session_id: str,
message: str,
candidate_id: int
):
"""Securely store interview data"""
# Encrypt sensitive content
encrypted_message = self.cipher.encrypt(message.encode())
# Hash session ID for privacy
hashed_session = hashlib.sha256(
f"{session_id}{candidate_id}".encode()
).hexdigest()
# Store with encryption
db.execute(
"""
INSERT INTO messages
(session_hash, encrypted_content, candidate_id, created_at)
VALUES (?, ?, ?, ?)
""",
(hashed_session, encrypted_message, candidate_id, datetime.now())
)
def retrieve_interview_data(
self,
session_id: str,
candidate_id: int,
requesting_user_id: int
) -> List[str]:
"""Retrieve interview data with access control"""
# Verify requesting user has permission
if not self.verify_access_permission(requesting_user_id, candidate_id):
raise PermissionError("Unauthorized access attempt")
hashed_session = hashlib.sha256(
f"{session_id}{candidate_id}".encode()
).hexdigest()
encrypted_messages = db.query(
"SELECT encrypted_content FROM messages WHERE session_hash = ?",
(hashed_session,)
)
# Decrypt messages
messages = [
self.cipher.decrypt(enc_msg).decode()
for enc_msg in encrypted_messages
]
# Log access for audit trail
self.log_data_access(requesting_user_id, candidate_id, session_id)
return messages
def anonymize_for_training(
self,
interview_data: List[Dict]
) -> List[Dict]:
"""Anonymize data before using for model training"""
anonymized = []
for interview in interview_data:
# Remove PII
clean_messages = [
self.remove_pii(msg)
for msg in interview['messages']
]
anonymized.append({
'messages': clean_messages,
'job_category': interview['job_category'], # Keep for context
'outcome': interview['outcome'] # Keep for training
# Removed: candidate_id, company_name, specific dates
})
return anonymized
GDPR Compliance
class GDPRComplianceHandler:
def handle_deletion_request(self, candidate_id: int):
"""Handle right to be forgotten"""
# 1. Delete from primary database
db.execute(
"DELETE FROM messages WHERE candidate_id = ?",
(candidate_id,)
)
db.execute(
"DELETE FROM interviews WHERE candidate_id = ?",
(candidate_id,)
)
# 2. Delete from backups (mark for deletion)
backup_manager.mark_for_deletion(candidate_id)
# 3. Delete from analytics databases
analytics_db.delete_candidate_data(candidate_id)
# 4. Remove from any cached data
cache.delete_pattern(f"candidate:{candidate_id}:*")
# 5. Log the deletion (required for compliance)
compliance_log.record_deletion(
candidate_id=candidate_id,
requested_at=datetime.now(),
completed_at=datetime.now()
)
def export_candidate_data(self, candidate_id: int) -> Dict:
"""Handle data portability request"""
return {
'interviews': self.get_interview_history(candidate_id),
'evaluations': self.get_evaluations(candidate_id),
'messages': self.get_messages(candidate_id),
'metadata': self.get_metadata(candidate_id)
}
Challenge 6: Enterprise Requirements
Multi-Tenancy
Supporting multiple companies on one platform:
class MultiTenantArchitecture:
def __init__(self):
self.tenant_cache = {}
def get_tenant_context(self, request) -> Dict:
"""Extract tenant from request"""
# From subdomain: acme.platform.com
subdomain = request.host.split('.')[0]
# Or from API key
api_key = request.headers.get('X-API-Key')
if api_key:
tenant = self.get_tenant_from_api_key(api_key)
else:
tenant = self.get_tenant_from_subdomain(subdomain)
return tenant
@cache(ttl=300)
def get_tenant_config(self, tenant_id: str) -> Dict:
"""Get tenant-specific configuration"""
config = db.query(
"SELECT * FROM tenant_configs WHERE tenant_id = ?",
(tenant_id,)
)
return {
'branding': config['branding_settings'],
'features': config['enabled_features'],
'limits': config['usage_limits'],
'llm_config': config['llm_settings']
}
def enforce_tenant_isolation(self, query, tenant_id: str):
"""Ensure queries only access tenant's data"""
# Add tenant_id to all queries automatically
return query.filter(tenant_id=tenant_id)
# Middleware
@app.before_request
def inject_tenant_context():
"""Add tenant context to all requests"""
g.tenant = multi_tenant.get_tenant_context(request)
g.tenant_config = multi_tenant.get_tenant_config(g.tenant['id'])
SSO Integration
from onelogin.saml2.auth import OneLogin_Saml2_Auth
class SSOHandler:
def handle_saml_login(self, request):
"""Handle SAML SSO authentication"""
auth = OneLogin_Saml2_Auth(request, saml_settings)
auth.process_response()
if auth.is_authenticated():
user_data = {
'email': auth.get_nameid(),
'attributes': auth.get_attributes(),
'session_index': auth.get_session_index()
}
# Create or update user
user = self.sync_sso_user(user_data)
# Create session
session = self.create_authenticated_session(user)
return redirect(url_for('dashboard'))
else:
return "Authentication failed", 401
Challenge 7: Cost Management at Scale
Final Cost Breakdown (Production)
Monthly Costs for 10,000 interviews:
Infrastructure:
- AWS EC2 (application servers): $500
- AWS RDS (PostgreSQL): $300
- Redis Cache: $100
- Load Balancer: $50
- CloudFront CDN: $100
AI/ML:
- OpenAI API (after optimization): $6,750
- Whisper API (voice): $500
Monitoring & Tools:
- DataDog: $200
- Sentry error tracking: $50
- StatusPage: $50
Total: $8,600/month
Cost per interview: $0.86
Revenue Model
Pricing Tiers:
- Starter: $99/mo (100 interviews)
- Professional: $499/mo (1,000 interviews)
- Enterprise: $2,499/mo (10,000 interviews) + custom features
At 100 enterprise customers:
Revenue: $249,900/month
Costs: $8,600/month
Gross Margin: 97%
Lessons Learned
1. Start Simple, But Plan for Scale
Don't over-engineer the MVP, but understand what will break at scale:
- Use PostgreSQL from day 1 (not SQLite)
- Design for horizontal scaling
- Implement queues early
- Add monitoring before you need it
2. Optimize Costs Aggressively
AI API costs can spiral out of control:
- Cache everything possible
- Use cheaper models for simple tasks
- Optimize prompts ruthlessly
- Monitor costs daily
3. Security Can't Be an Afterthought
Interview data is sensitive:
- Encrypt at rest and in transit
- Implement access controls from start
- Plan for GDPR/privacy compliance
- Regular security audits
4. Observability is Critical
You can't fix what you can't see:
- Comprehensive logging
- Real-time metrics
- Alerting on key thresholds
- Error tracking with context
5. Enterprise Customers Have Unique Needs
Plan for:
- SSO integration
- Multi-tenancy
- Custom branding
- Compliance requirements (SOC 2, GDPR)
- SLAs and support expectations
Conclusion
Scaling an AI platform from prototype to production serving thousands of users requires solving challenges across infrastructure, cost optimization, security, and enterprise requirements. The technical decisions made early significantly impact long-term scalability and unit economics.
Key takeaways:
- Architecture matters: Queue-based processing handles concurrency gracefully
- Cost optimization is essential: 70% cost reduction possible through caching, model selection, and prompt engineering
- Monitoring enables scale: Can't optimize what you don't measure
- Security first: Privacy and compliance must be built in, not bolted on
- Enterprise-ready takes time: SSO, multi-tenancy, and compliance aren't trivial additions
Building production AI systems is hard. But with careful planning, aggressive optimization, and commitment to reliability, it's possible to build scalable, profitable AI products that serve real business needs.
About the Author
Ademola Balogun is the founder and CEO of 180GIG Ltd, where he bootstrapped and scaled an AI interview platform from concept to production. With an MSc in Data Science from Birkbeck, University of London, he combines technical depth with entrepreneurial execution. His work focuses on building practical, scalable AI systems that solve real business problems while maintaining profitability and reliability.
Top comments (0)