In this practical guide, you will learn how to create a highly scalable model deployment solution with built-in LLMs for your applications.
In your examples, we will use Hugging Face’s ChatGPT2 model, but you can easily plug in any other model including ChatGPT4, Claude, etc.
Whether you are designing a new application with AI capabilities or improving the existing AI systems, this guide will help you step by step to create a strong LLM integration.
Understanding LLM Integration Fundamentals
Before we start writing code, let’s figure out what it takes to build a production LLM integration. API calls are not the only thing you need to consider when building production-ready LLM integration, you also need to consider things like reliability, cost, and stability. Your production applications must address issues such as service outages, rate limits, and variability in response time while keeping costs under control.
Here's what we'll build together:
- A robust API client that gracefully handles failures
- A smart caching system to optimize costs and speed
- A proper prompt management system
- Comprehensive error handling and monitoring
- A complete content moderation system as your example project
Prerequisites
Before we start coding, make sure you have:
- Python 3.8 or newer installed on your machine
- Redis cloud account or locally installed
- Basic Python programming knowledge
- Basic understanding of REST APIs
- A Hugging Face API key (or any other LLM provider key)
Want to follow along? The complete code is available in your GitHub repository.
Setting Up your Development Environment
Let's start by getting your development environment ready. We'll create a clean project structure and install all the necessary packages.
First, let's create your project directory and set up a Python virtual environment. Open your terminal and run:
mkdir llm_integration && cd llm_integration
python3 -m venv env
syource env/bin/activate
Now let's set up your project dependencies. Create a new requirements.txt
file with these essential packages:
transformers==4.36.0
huggingface-hub==0.19.4
redis==4.6.0
pydantic==2.5.0
pydantic-settings==2.1.0
tenacity==8.2.3
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
torch==2.1.0
numpy==1.24.3
Let's break down why we need each of these packages:
-
transformers
: This is Hugging Face’s powerful library that we will be using to interface with the Qwen2.5-Coder model. -
huggingface-hub
: Enables us to handle model loading and versioning redis: For implementing request caching -
pydantic
: Used for data validation and settings. -
tenacity
: Responsible for your retrying functionality for increased reliability -
python-dotenv
: For loading environment variables -
fastapi
: Builds your API endpoints with a small amount of code -
uvicorn
: Used for running your FastAPI application with great efficiency -
torch
: For running transformer models and handling machine learning operations -
numpy
: Used for numerical computing.
Install all the packages with the command:
pip install -r requirements.txt
Let's organize your project with a clean structure. Create these directories and files in your project directory:
llm_integration/
├── core/
│ ├── llm_client.py # your main LLM interaction code
│ ├── prompt_manager.py # Handles prompt templates
│ └── response_handler.py # Processes LLM responses
├── cache/
│ └── redis_manager.py # Manages your caching system
├── config/
│ └── settings.py # Configuration management
├── api/
│ └── routes.py # API endpoints
├── utils/
│ ├── monitoring.py # Usage tracking
│ └── rate_limiter.py # Rate limiting logic
├── requirements.txt
└── main.py
└── usage_logs.json
Building the LLM Client
Let's start with your LLM client which is the most important component of your application. This is where we'll interact with the ChatGPT model (or any other LLM you prefer). Add the following code snippets to your core/llm_client.py
file:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Dict, Optional
import logging
class LLMClient:
def __init__(self, model_name: str = "gpt2", timeout: int = 30):
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
torch_dtype=torch.float16
)
except Exception as e:
logging.error(f"Error loading model: {str(e)}")
# Fallback to a simpler model if the specified one fails
self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
self.model = AutoModelForCausalLM.from_pretrained("gpt2")
self.timeout = timeout
self.logger = logging.getLogger(__name__)
In this first part of yourLLMClient
class, we're setting up the foundation:
- We are using
AutoModelForCausalLM
andAutoTokenizer
from the transformers library to load your model - The
device_map="auto"
parameter automatically handles GPU/CPU allocation - We're using
torch.float16
to optimize memory usage while maintaining good performance
Now let's add the method that talks to your model:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
reraise=True
)
async def complete(self,
prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None) -> Dict:
"""Get completion from the model with automatic retries"""
try:
inputs = self.tokenizer(prompt, return_tensors="pt").to(
self.model.device
)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens or 100,
temperature=temperature,
do_sample=True
)
response_text = self.tokenizer.decode(
outputs[0],
skip_special_tokens=True
)
# Calculate token usage for monitoring
input_tokens = len(inputs.input_ids[0])
output_tokens = len(outputs[0]) - input_tokens
return {
'content': response_text,
'usage': {
'prompt_tokens': input_tokens,
'completion_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens
},
'model': "gpt2"
}
except Exception as e:
self.logger.error(f"Error in LLM completion: {str(e)}")
raise
Let's break down what's happening in this completion method:
- Added
@retry
decorator method to deal with temporary failures. - Used
torch.no_grad()
context manager to save memory by disabling gradient calculations. - Tracking the token usage in both input and output which is very important for costing.
- Returns a structured dictionary with the response and usage statistics.
Creating your LLM Response Handler
Next, we need to add the response handler to parse and structure the LLM's raw output. Do that in your core/response_handler.py
file with the following code snippets:
from typing import Dict
import logging
class ResponseHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
def parse_moderation_response(self, raw_response: str) -> Dict:
"""Parse and structure the raw LLM response for moderation"""
try:
# Default response structure
structured_response = {
"is_appropriate": True,
"confidence_score": 0.0,
"reason": None
}
# Simple keyword-based analysis
lower_response = raw_response.lower()
# Check for inappropriate content signals
if any(word in lower_response for word in ['inappropriate', 'unsafe', 'offensive', 'harmful']):
structured_response["is_appropriate"] = False
structured_response["confidence_score"] = 0.9
# Extract reason if present
if "because" in lower_response:
reason_start = lower_response.find("because")
structured_response["reason"] = raw_response[reason_start:].split('.')[0].strip()
else:
structured_response["confidence_score"] = 0.95
return structured_response
except Exception as e:
self.logger.error(f"Error parsing response: {str(e)}")
return {
"is_appropriate": True,
"confidence_score": 0.5,
"reason": "Failed to parse response"
}
def format_response(self, raw_response: Dict) -> Dict:
"""Format the final response with parsed content and usage stats"""
try:
return {
"content": self.parse_moderation_response(raw_response["content"]),
"usage": raw_response["usage"],
"model": raw_response["model"]
}
except Exception as e:
self.logger.error(f"Error formatting response: {str(e)}")
raise
Adding a Robust Caching System
Now let's create your caching system to improve the application performance and reduce costs. Add the following code snippets to your cache/redis_manager.py
file:
import redis
from typing import Optional, Any
import json
import hashlib
class CacheManager:
def __init__(self, redis_url: str, ttl: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
def _generate_key(self, prompt: str, params: dict) -> str:
"""Generate a unique cache key"""
cache_data = {
'prompt': prompt,
'params': params
}
serialized = json.dumps(cache_data, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()
async def get_cached_response(self,
prompt: str,
params: dict) -> Optional[dict]:
"""Retrieve cached LLM response"""
key = self._generate_key(prompt, params)
cached = self.redis.get(key)
return json.loads(cached) if cached else None
async def cache_response(self,
prompt: str,
params: dict,
response: dict) -> None:
"""Cache LLM response"""
key = self._generate_key(prompt, params)
self.redis.setex(
key,
self.ttl,
json.dumps(response)
)
In the above code snippets, we created a CacheManager
class that handles all caching operations with the following:
- The
_generate_key
method, which creates unique cache keys based on prompts and parameters -
get_cached_response
which checks if we have a cached response for a given prompt -
cache_response
that stores successful responses for future use
Creating a Smart Prompt Manager
Let's create your prompt manager that will manage the prompts for your LLM model. Add the following code to your core/prompt_manager.py
:
from typing import Dict, Optional
from pydantic import BaseModel
import json
import os
class Prompt(BaseModel):
template: str
version: str
description: Optional[str]
class PromptManager:
def __init__(self, prompts_dir: str = "prompts"):
self.prompts_dir = prompts_dir
self.prompts: Dict[str, Prompt] = {}
self._load_prompts()
def _load_prompts(self):
"""Load prompt templates from JSON files"""
if not os.path.exists(self.prompts_dir):
os.makedirs(self.prompts_dir)
for filename in os.listdir(self.prompts_dir):
if filename.endswith('.json'):
with open(os.path.join(self.prompts_dir, filename)) as f:
prompt_data = json.load(f)
prompt_name = filename.replace('.json', '')
self.prompts[prompt_name] = Prompt(**prompt_data)
def get_prompt(self, name: str, **kwargs) -> str:
"""Get a formatted prompt template"""
if name not in self.prompts:
raise KeyError(f"Prompt template '{name}' not found")
prompt = self.prompts[name]
return prompt.template.format(**kwargs)
Then create a sample prompt template for content moderation in your prompts/content_moderation.json
file with code snippets:
{
"template": "Analyze if this message is appropriate or not. Respond with whether it is appropriate or inappropriate, and explain why:\n\nMessage: {content}\n\nAnalysis:",
"version": "1.0",
"description": "Template for content moderation tasks"
}
Now your prompt manager will be able to load prompt templates from your JSON file and get also get a formatted prompt template.
Setting Up a Configuration Manager
To keep all your LLM configurations in one place and easily reuse them across your application, let's create configuration settings. Add the code below to your config/settings.py
file:
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional
class Settings(BaseSettings):
huggingface_api_key: str
redis_url: str = "redis://localhost:6379"
cache_ttl: int = 3600
llm_model_name: str = "gpt2"
request_timeout: int = 30
max_retries: int = 3
rate_limit_requests: int = 100
rate_limit_period: int = 60
model_config = SettingsConfigDict(
env_file=".env",
protected_namespaces=()
)
settings = Settings()
Implementing Rate Limiting
Next, let's implement rate limiting to control how users access your application’s resources. To do that, add the following code to your utils/rate_limiter.py
file:
import time
from collections import deque
from typing import Deque, Tuple
class RateLimiter:
def __init__(self, requests: int, period: int):
self.requests = requests
self.period = period
self.timestamps: Deque[float] = deque()
async def check_rate_limit(self) -> bool:
"""Check if request is within rate limits"""
now = time.time()
while self.timestamps and now - self.timestamps[0] > self.period:
self.timestamps.popleft()
if len(self.timestamps) >= self.requests:
return False
self.timestamps.append(now)
return True
In the RateLimiter
we implemented a resuable check_rate_limit
method that can be used in any route to handle rate limiting by simply passing the period and number of requests allowed for each user for period of time.
Creating your API Endpoints
Now let's create your API endpoints in the api/routes.py
file to integrate your LLM in your application:
from functools import lru_cache
from fastapi import APIRouter, HTTPException, Depends
from typing import Dict
from core.llm_client import LLMClient
from core.prompt_manager import PromptManager
from cache.redis_manager import CacheManager
from config.settings import settings
from utils.monitoring import UsageMonitor
from utils.rate_limiter import RateLimiter
from core.response_handler import ResponseHandler
router = APIRouter()
rate_limiter = RateLimiter(
requests=settings.rate_limit_requests,
period=settings.rate_limit_period
)
usage_monitor = UsageMonitor()
# Dependency injection functions
@lru_cache()
def get_llm_client() -> LLMClient:
"""Provide LLM client instance"""
return LLMClient(
model_name=settings.llm_model_name,
timeout=settings.request_timeout
)
@lru_cache()
def get_response_handler() -> ResponseHandler:
"""Provide response handler instance"""
return ResponseHandler()
@lru_cache()
def get_cache_manager() -> CacheManager:
"""Provide cache manager instance"""
return CacheManager(
redis_url=settings.redis_url,
ttl=settings.cache_ttl
)
@lru_cache()
def get_prompt_manager() -> PromptManager:
"""Provide prompt manager instance"""
return PromptManager()
@router.post("/moderate")
async def moderate_content(
content: Dict[str, str],
llm_client: LLMClient = Depends(get_llm_client),
cache_manager: CacheManager = Depends(get_cache_manager),
prompt_manager: PromptManager = Depends(get_prompt_manager),
response_handler: ResponseHandler = Depends(get_response_handler)
):
"""Moderate content using LLM"""
if not await rate_limiter.check_rate_limit():
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
try:
prompt = prompt_manager.get_prompt(
"content_moderation",
content=content["text"]
)
# Check cache first
cached_response = await cache_manager.get_cached_response(
prompt,
{'temperature': 0.3}
)
if cached_response:
return cached_response
# Get LLM response
raw_response = await llm_client.complete(
prompt=prompt,
temperature=0.3
)
# Process the response
formatted_response = response_handler.format_response(raw_response)
# Cache the processed response
await cache_manager.cache_response(
prompt,
{'temperature': 0.3},
formatted_response
)
return formatted_response
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing request: {str(e)}"
)
Here we defined a /moderate
endpoint in the APIRouter
class, which is responsible for organizing API routes. The @lru_cache
decorator is applied to dependency injection functions (get_llm_client
, get_response_handler
, get_cache_manager
, and get_prompt_manager
) to ensure that instances of LLMClient
, CacheManager
, and PromptManager
are cached for better performance. The moderate_content
function, decorated with @router.post
, defines a POST route for content moderation and utilizes FastAPI's Depends
mechanism to inject these dependencies. Inside the function, the RateLimiter
class, configured with rate limit settings from settings, enforces request limits.
Finally, let's update your main.py
to bring everything together:
from fastapi import FastAPI
from api.routes import router
import uvicorn
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
app = FastAPI(title="LLM Integration API")
app.include_router(router, prefix="/api/v1")
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)
In the above code, we've created a FastAPI app and the router using api.routes
under the /api/v1
prefix. Enabled logging to display informational messages with timestamps. The app will run localhost:8000
using Uvicorn, with hot-reloading enabled.
Running your Application
We now have all the components in place, let’s start getting your application up and running. First, create a .env
file in your project root directory and add your HUGGINGFACE_API_KEY
and REDIS_URL
:
HUGGINGFACE_API_KEY=your_api_key_here
REDIS_URL=redis://localhost:6379
Then ensure Redis is running on your machine. On most Unix-based systems, you can start it with the command:
redis-server
Now you can start your application:
python main.py
your FastAPI server will start running on http://localhost:8000. The automatic API documentation will be available at http://localhost:8000/docs - this is super helpful for testing your endpoints!
Testing your Content Moderation API
Let's test your newly created API with a real request. Open a new terminal and run this curl command:
curl -X POST http://localhost:8000/api/v1/moderate \
-H "Content-Type: application/json" \
-d '{"text": "This is a test message that needs moderation."}'
You should see a response like this on your terminal:
Adding Monitoring and Analytics
Now let's add some monitoring features to track how your application is performing and how much resyources is being used. Add the following code to your utils/monitoring.py
file:
from datetime import datetime
import logging
from typing import Dict
import json
class UsageMonitor:
def __init__(self, log_file: str = "usage_logs.json"):
self.log_file = log_file
self.logger = logging.getLogger(__name__)
async def log_request(self,
endpoint: str,
response: Dict,
duration_ms: float):
"""Log API request details"""
try:
log_entry = {
"timestamp": datetime.now().isoformat(),
"endpoint": endpoint,
"tokens_used": response["usage"]["total_tokens"],
"duration_ms": duration_ms,
"model": response["model"]
}
with open(self.log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")
except Exception as e:
self.logger.error(f"Error logging usage: {str(e)}")
The UsageMonitor
class will be performing the following operations:
- Tracking every API request with timestamps
- Recording token usage for cost monitoring
- Measuring response times
- Storing everything in a structured log file (replace this with a database before you deploy your application to production)
Next, add a new method to calculate usage statistics:
async def get_usage_statistics(self) -> Dict:
"""Calculate usage statistics"""
try:
total_tokens = 0
total_requests = 0
total_duration = 0
with open(self.log_file, "r") as f:
for line in f:
entry = json.loads(line)
total_tokens += entry["tokens_used"]
total_requests += 1
total_duration += entry["duration_ms"]
return {
"total_tokens": total_tokens,
"total_requests": total_requests,
"average_latency": total_duration / total_requests if total_requests > 0 else 0,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Error calculating statistics: {str(e)}")
return {}
Update your API to add the monitoring features from the UsageMonitor
class:
#...
from utils.monitoring import UsageMonitor
import time
#...
usage_monitor = UsageMonitor()
@router.get("/stats")
async def get_statistics():
"""Get usage statistics"""
return await usage_monitor.get_usage_statistics()
@router.post("/moderate")
async def moderate_content(
content: Dict[str, str],
llm_client: LLMClient = Depends(get_llm_client),
cache_manager: CacheManager = Depends(get_cache_manager),
prompt_manager: PromptManager = Depends(get_prompt_manager),
response_handler: ResponseHandler = Depends(get_response_handler) # Add this
):
start_time = time.time()
"""Moderate content using LLM"""
if not await rate_limiter.check_rate_limit():
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
try:
#...
# Add monitoring
duration_ms = (time.time() - start_time) * 1000
print(duration_ms)
await usage_monitor.log_request(
"/moderate",
formatted_response,
duration_ms
)
return formatted_response
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing request: {str(e)}"
)
Now, test your /stats
endpoint by running this curl command:
curl -X GET http://localhost:8000/api/v1/stats
The above command will show you the stats of your requests on the /moderate endpoint as shown in the screenshot below:
Conclusion
Throughout this tutorial, have learned how to use a large language model in production applications. You implemented features like API clients, caching, prompt management, and error handling. As an example of these concepts, you developed a content moderation system.
Now that you have a solid foundation, you could enhance your system with:
- Streaming responses for real-time applications
- A/B testing for an immediate improvement
- A web-based interface to manage prompts
- Custom model fine-tuning
- Integration with third-party monitoring services
Please recall that in the examples you used the ChatGPT2 model, but you can adapt this system to work with any LLM provider. So choose the model that meets your requirements and is within your budget.
Please don’t hesitate to contact me if you have questions or if you want to tell me what you are building with this system.
Happy coding! 🚀
Top comments (0)