The Model Context Protocol (MCP) lets you connect AI assistants directly to your data infrastructure. Here's how to build production-ready servers that make your data AI-accessible.
π― What Problem Does This Solve?
Traditional approach:
- Export data to CSV
- Upload to AI chat interface
- Lose context and freshness
- Repeat manually every time
MCP approach:
- AI queries your systems directly
- Real-time data access
- Full business context preserved
- Automated and repeatable
ποΈ Architecture Overview
Claude Desktop ββ MCP Server (Python) ββ Your Data Infrastructure
β
[Snowflake, Postgres, APIs, Data Catalogs]
π‘ When to Build Custom MCP Servers
| Use Case | Build Custom | Use Pre-built |
|---|---|---|
| Proprietary data warehouse schemas | β | β |
| Custom authentication/RBAC | β | β |
| Standard Postgres/MySQL | β | β |
| Business logic enforcement | β | β |
| Public GitHub repos | β | β |
| Data governance compliance | β | β |
π§ Implementation: Snowflake MCP Server
Basic Server Structure
import asyncio
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
import mcp.server.stdio
import snowflake.connector
# Initialize MCP server
app = Server("snowflake-data-server")
# Snowflake connection config
SNOWFLAKE_CONFIG = {
"account": "your_account",
"user": "mcp_user",
"password": "secure_password",
"warehouse": "COMPUTE_WH",
"database": "ANALYTICS_DB",
"schema": "PUBLIC"
}
def get_snowflake_connection():
"""Create Snowflake connection with connection pooling"""
return snowflake.connector.connect(**SNOWFLAKE_CONFIG)
Expose Data Catalog as Resources
@app.list_resources()
async def list_resources() -> list[Resource]:
"""Expose available tables as resources"""
conn = get_snowflake_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT table_name, row_count, bytes
FROM information_schema.tables
WHERE table_schema = 'PUBLIC'
ORDER BY row_count DESC
LIMIT 50
""")
resources = []
for table_name, row_count, bytes_size in cursor.fetchall():
resources.append(
Resource(
uri=f"snowflake://ANALYTICS_DB/PUBLIC/{table_name}",
name=f"Table: {table_name}",
description=f"{row_count:,} rows, {bytes_size/1024/1024:.2f} MB",
mimeType="application/json"
)
)
cursor.close()
conn.close()
return resources
@app.read_resource()
async def read_resource(uri: str) -> str:
"""Return table schema when resource is accessed"""
# Extract table name from URI
table_name = uri.split("/")[-1]
conn = get_snowflake_connection()
cursor = conn.cursor()
cursor.execute(f"DESCRIBE TABLE {table_name}")
schema = cursor.fetchall()
cursor.close()
conn.close()
# Format schema info
schema_info = "\n".join([
f"{col[0]}: {col[1]} {'(nullable)' if col[3] == 'Y' else ''}"
for col in schema
])
return f"Schema for {table_name}:\n{schema_info}"
Create Query Tools
@app.list_tools()
async def list_tools() -> list[Tool]:
"""Define available tools for AI to use"""
return [
Tool(
name="query_snowflake",
description="Execute SELECT queries on Snowflake tables",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL SELECT query to execute"
},
"limit": {
"type": "integer",
"description": "Max rows to return",
"default": 100
}
},
"required": ["query"]
}
),
Tool(
name="get_table_stats",
description="Get row counts and statistics for tables",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["table_name"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute tool calls from AI"""
if name == "query_snowflake":
query = arguments["query"]
limit = arguments.get("limit", 100)
# Security: Only allow SELECT queries
if not query.strip().upper().startswith("SELECT"):
return [TextContent(
type="text",
text="Error: Only SELECT queries are allowed"
)]
# Add limit if not present
if "LIMIT" not in query.upper():
query = f"{query} LIMIT {limit}"
conn = get_snowflake_connection()
cursor = conn.cursor()
try:
cursor.execute(query)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Format as table
output = f"Columns: {', '.join(columns)}\n\n"
output += "\n".join([
" | ".join([str(val) for val in row])
for row in results
])
return [TextContent(type="text", text=output)]
except Exception as e:
return [TextContent(
type="text",
text=f"Query error: {str(e)}"
)]
finally:
cursor.close()
conn.close()
elif name == "get_table_stats":
table_name = arguments["table_name"]
conn = get_snowflake_connection()
cursor = conn.cursor()
cursor.execute(f"""
SELECT
COUNT(*) as row_count,
COUNT(DISTINCT *) as distinct_rows
FROM {table_name}
""")
stats = cursor.fetchone()
cursor.close()
conn.close()
return [TextContent(
type="text",
text=f"Table: {table_name}\nRows: {stats[0]:,}\nDistinct: {stats[1]:,}"
)]
Server Entry Point
async def main():
"""Run the MCP server"""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
π Production Security Patterns
1. Row-Level Security
def apply_rls(query: str, user_context: dict) -> str:
"""Inject RLS filters based on user context"""
department = user_context.get("department")
# Parse and modify query (use sqlparse for production)
if "FROM sales_data" in query.lower():
query = query.replace(
"FROM sales_data",
f"FROM sales_data WHERE department = '{department}'"
)
return query
2. Query Cost Limits
async def estimate_query_cost(query: str) -> float:
"""Estimate query cost before execution"""
conn = get_snowflake_connection()
cursor = conn.cursor()
# Use EXPLAIN to get query plan
cursor.execute(f"EXPLAIN {query}")
plan = cursor.fetchone()
# Parse estimated bytes scanned
# Implement cost threshold logic
cursor.close()
conn.close()
return estimated_cost
3. Query Allowlist Pattern
ALLOWED_TABLES = {
"sales_summary",
"customer_metrics",
"product_catalog"
}
def validate_query(query: str) -> bool:
"""Ensure query only accesses approved tables"""
# Use sqlparse to extract table names
tables = extract_tables_from_query(query)
return all(table in ALLOWED_TABLES for table in tables)
π Configuration File
{
"mcpServers": {
"snowflake-data": {
"command": "python",
"args": ["/path/to/snowflake_mcp_server.py"],
"env": {
"SNOWFLAKE_ACCOUNT": "your_account",
"SNOWFLAKE_USER": "mcp_user",
"SNOWFLAKE_PASSWORD": "secure_password",
"SNOWFLAKE_WAREHOUSE": "COMPUTE_WH"
}
}
}
}
π Performance Optimizations
Connection Pooling
from snowflake.connector.connection import SnowflakeConnection
from typing import Optional
class ConnectionPool:
def __init__(self, size: int = 5):
self.pool = []
self.size = size
self._init_pool()
def _init_pool(self):
for _ in range(self.size):
self.pool.append(get_snowflake_connection())
def get_connection(self) -> SnowflakeConnection:
if self.pool:
return self.pool.pop()
return get_snowflake_connection()
def return_connection(self, conn: SnowflakeConnection):
if len(self.pool) < self.size:
self.pool.append(conn)
else:
conn.close()
# Global pool
pool = ConnectionPool(size=5)
Caching Layer
from functools import lru_cache
from datetime import datetime, timedelta
# Cache table schemas (they change infrequently)
@lru_cache(maxsize=100)
def get_table_schema(table_name: str) -> dict:
"""Cached table schema lookup"""
# Implementation here
pass
# Cache table statistics with TTL
STATS_CACHE = {}
CACHE_TTL = timedelta(minutes=15)
def get_table_stats_cached(table_name: str) -> dict:
if table_name in STATS_CACHE:
cached_time, stats = STATS_CACHE[table_name]
if datetime.now() - cached_time < CACHE_TTL:
return stats
# Fetch fresh stats
stats = fetch_table_stats(table_name)
STATS_CACHE[table_name] = (datetime.now(), stats)
return stats
π Monitoring & Logging
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-snowflake")
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
start_time = datetime.now()
try:
logger.info(f"Tool called: {name} with args: {arguments}")
result = await execute_tool(name, arguments)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Tool {name} completed in {duration:.2f}s")
return result
except Exception as e:
logger.error(f"Tool {name} failed: {str(e)}")
raise
π― Real-World Use Cases
1. Data Catalog Explorer
- AI browses your table schemas
- Suggests relevant datasets for analysis
- Explains column meanings from metadata
2. Self-Service Analytics
- Business users ask questions in natural language
- MCP server translates to SQL
- Results returned with context
3. Data Quality Monitoring
- AI checks for null rates, duplicates
- Compares against historical patterns
- Alerts on anomalies
4. Schema Discovery
- New team members explore data
- AI explains table relationships
- Generates ERD documentation
β‘ Key Takeaways
β
Start simple: Basic query tool + table listing
β
Add security early: RLS, query validation, cost limits
β
Optimize progressively: Connection pooling, caching
β
Monitor everything: Log queries, track costs, measure latency
What data source are you connecting first? Drop a comment with your use case! π¬
Top comments (0)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.