DEV Community

Vinicius Fagundes
Vinicius Fagundes

Posted on

Building Custom MCP Servers with Python: A Data Engineer's Guide πŸ› οΈ

The Model Context Protocol (MCP) lets you connect AI assistants directly to your data infrastructure. Here's how to build production-ready servers that make your data AI-accessible.

🎯 What Problem Does This Solve?

Traditional approach:

  • Export data to CSV
  • Upload to AI chat interface
  • Lose context and freshness
  • Repeat manually every time

MCP approach:

  • AI queries your systems directly
  • Real-time data access
  • Full business context preserved
  • Automated and repeatable

πŸ—οΈ Architecture Overview

Claude Desktop ←→ MCP Server (Python) ←→ Your Data Infrastructure
                        ↓
              [Snowflake, Postgres, APIs, Data Catalogs]
Enter fullscreen mode Exit fullscreen mode

πŸ’‘ When to Build Custom MCP Servers

Use Case Build Custom Use Pre-built
Proprietary data warehouse schemas βœ… ❌
Custom authentication/RBAC βœ… ❌
Standard Postgres/MySQL ❌ βœ…
Business logic enforcement βœ… ❌
Public GitHub repos ❌ βœ…
Data governance compliance βœ… ❌

πŸ”§ Implementation: Snowflake MCP Server

Basic Server Structure

import asyncio
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
import mcp.server.stdio
import snowflake.connector

# Initialize MCP server
app = Server("snowflake-data-server")

# Snowflake connection config
SNOWFLAKE_CONFIG = {
    "account": "your_account",
    "user": "mcp_user",
    "password": "secure_password",
    "warehouse": "COMPUTE_WH",
    "database": "ANALYTICS_DB",
    "schema": "PUBLIC"
}

def get_snowflake_connection():
    """Create Snowflake connection with connection pooling"""
    return snowflake.connector.connect(**SNOWFLAKE_CONFIG)
Enter fullscreen mode Exit fullscreen mode

Expose Data Catalog as Resources

@app.list_resources()
async def list_resources() -> list[Resource]:
    """Expose available tables as resources"""
    conn = get_snowflake_connection()
    cursor = conn.cursor()

    cursor.execute("""
        SELECT table_name, row_count, bytes 
        FROM information_schema.tables 
        WHERE table_schema = 'PUBLIC'
        ORDER BY row_count DESC
        LIMIT 50
    """)

    resources = []
    for table_name, row_count, bytes_size in cursor.fetchall():
        resources.append(
            Resource(
                uri=f"snowflake://ANALYTICS_DB/PUBLIC/{table_name}",
                name=f"Table: {table_name}",
                description=f"{row_count:,} rows, {bytes_size/1024/1024:.2f} MB",
                mimeType="application/json"
            )
        )

    cursor.close()
    conn.close()
    return resources

@app.read_resource()
async def read_resource(uri: str) -> str:
    """Return table schema when resource is accessed"""
    # Extract table name from URI
    table_name = uri.split("/")[-1]

    conn = get_snowflake_connection()
    cursor = conn.cursor()

    cursor.execute(f"DESCRIBE TABLE {table_name}")
    schema = cursor.fetchall()

    cursor.close()
    conn.close()

    # Format schema info
    schema_info = "\n".join([
        f"{col[0]}: {col[1]} {'(nullable)' if col[3] == 'Y' else ''}"
        for col in schema
    ])

    return f"Schema for {table_name}:\n{schema_info}"
Enter fullscreen mode Exit fullscreen mode

Create Query Tools

@app.list_tools()
async def list_tools() -> list[Tool]:
    """Define available tools for AI to use"""
    return [
        Tool(
            name="query_snowflake",
            description="Execute SELECT queries on Snowflake tables",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "SQL SELECT query to execute"
                    },
                    "limit": {
                        "type": "integer",
                        "description": "Max rows to return",
                        "default": 100
                    }
                },
                "required": ["query"]
            }
        ),
        Tool(
            name="get_table_stats",
            description="Get row counts and statistics for tables",
            inputSchema={
                "type": "object",
                "properties": {
                    "table_name": {
                        "type": "string",
                        "description": "Name of the table"
                    }
                },
                "required": ["table_name"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """Execute tool calls from AI"""

    if name == "query_snowflake":
        query = arguments["query"]
        limit = arguments.get("limit", 100)

        # Security: Only allow SELECT queries
        if not query.strip().upper().startswith("SELECT"):
            return [TextContent(
                type="text",
                text="Error: Only SELECT queries are allowed"
            )]

        # Add limit if not present
        if "LIMIT" not in query.upper():
            query = f"{query} LIMIT {limit}"

        conn = get_snowflake_connection()
        cursor = conn.cursor()

        try:
            cursor.execute(query)
            results = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]

            # Format as table
            output = f"Columns: {', '.join(columns)}\n\n"
            output += "\n".join([
                " | ".join([str(val) for val in row])
                for row in results
            ])

            return [TextContent(type="text", text=output)]

        except Exception as e:
            return [TextContent(
                type="text",
                text=f"Query error: {str(e)}"
            )]
        finally:
            cursor.close()
            conn.close()

    elif name == "get_table_stats":
        table_name = arguments["table_name"]

        conn = get_snowflake_connection()
        cursor = conn.cursor()

        cursor.execute(f"""
            SELECT 
                COUNT(*) as row_count,
                COUNT(DISTINCT *) as distinct_rows
            FROM {table_name}
        """)

        stats = cursor.fetchone()
        cursor.close()
        conn.close()

        return [TextContent(
            type="text",
            text=f"Table: {table_name}\nRows: {stats[0]:,}\nDistinct: {stats[1]:,}"
        )]
Enter fullscreen mode Exit fullscreen mode

Server Entry Point

async def main():
    """Run the MCP server"""
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

πŸ”’ Production Security Patterns

1. Row-Level Security

def apply_rls(query: str, user_context: dict) -> str:
    """Inject RLS filters based on user context"""
    department = user_context.get("department")

    # Parse and modify query (use sqlparse for production)
    if "FROM sales_data" in query.lower():
        query = query.replace(
            "FROM sales_data",
            f"FROM sales_data WHERE department = '{department}'"
        )
    return query
Enter fullscreen mode Exit fullscreen mode

2. Query Cost Limits

async def estimate_query_cost(query: str) -> float:
    """Estimate query cost before execution"""
    conn = get_snowflake_connection()
    cursor = conn.cursor()

    # Use EXPLAIN to get query plan
    cursor.execute(f"EXPLAIN {query}")
    plan = cursor.fetchone()

    # Parse estimated bytes scanned
    # Implement cost threshold logic

    cursor.close()
    conn.close()
    return estimated_cost
Enter fullscreen mode Exit fullscreen mode

3. Query Allowlist Pattern

ALLOWED_TABLES = {
    "sales_summary",
    "customer_metrics",
    "product_catalog"
}

def validate_query(query: str) -> bool:
    """Ensure query only accesses approved tables"""
    # Use sqlparse to extract table names
    tables = extract_tables_from_query(query)
    return all(table in ALLOWED_TABLES for table in tables)
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Configuration File

{
  "mcpServers": {
    "snowflake-data": {
      "command": "python",
      "args": ["/path/to/snowflake_mcp_server.py"],
      "env": {
        "SNOWFLAKE_ACCOUNT": "your_account",
        "SNOWFLAKE_USER": "mcp_user",
        "SNOWFLAKE_PASSWORD": "secure_password",
        "SNOWFLAKE_WAREHOUSE": "COMPUTE_WH"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

πŸš€ Performance Optimizations

Connection Pooling

from snowflake.connector.connection import SnowflakeConnection
from typing import Optional

class ConnectionPool:
    def __init__(self, size: int = 5):
        self.pool = []
        self.size = size
        self._init_pool()

    def _init_pool(self):
        for _ in range(self.size):
            self.pool.append(get_snowflake_connection())

    def get_connection(self) -> SnowflakeConnection:
        if self.pool:
            return self.pool.pop()
        return get_snowflake_connection()

    def return_connection(self, conn: SnowflakeConnection):
        if len(self.pool) < self.size:
            self.pool.append(conn)
        else:
            conn.close()

# Global pool
pool = ConnectionPool(size=5)
Enter fullscreen mode Exit fullscreen mode

Caching Layer

from functools import lru_cache
from datetime import datetime, timedelta

# Cache table schemas (they change infrequently)
@lru_cache(maxsize=100)
def get_table_schema(table_name: str) -> dict:
    """Cached table schema lookup"""
    # Implementation here
    pass

# Cache table statistics with TTL
STATS_CACHE = {}
CACHE_TTL = timedelta(minutes=15)

def get_table_stats_cached(table_name: str) -> dict:
    if table_name in STATS_CACHE:
        cached_time, stats = STATS_CACHE[table_name]
        if datetime.now() - cached_time < CACHE_TTL:
            return stats

    # Fetch fresh stats
    stats = fetch_table_stats(table_name)
    STATS_CACHE[table_name] = (datetime.now(), stats)
    return stats
Enter fullscreen mode Exit fullscreen mode

πŸ“ˆ Monitoring & Logging

import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-snowflake")

@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    start_time = datetime.now()

    try:
        logger.info(f"Tool called: {name} with args: {arguments}")
        result = await execute_tool(name, arguments)

        duration = (datetime.now() - start_time).total_seconds()
        logger.info(f"Tool {name} completed in {duration:.2f}s")

        return result

    except Exception as e:
        logger.error(f"Tool {name} failed: {str(e)}")
        raise
Enter fullscreen mode Exit fullscreen mode

🎯 Real-World Use Cases

1. Data Catalog Explorer

  • AI browses your table schemas
  • Suggests relevant datasets for analysis
  • Explains column meanings from metadata

2. Self-Service Analytics

  • Business users ask questions in natural language
  • MCP server translates to SQL
  • Results returned with context

3. Data Quality Monitoring

  • AI checks for null rates, duplicates
  • Compares against historical patterns
  • Alerts on anomalies

4. Schema Discovery

  • New team members explore data
  • AI explains table relationships
  • Generates ERD documentation

⚑ Key Takeaways

βœ… Start simple: Basic query tool + table listing

βœ… Add security early: RLS, query validation, cost limits

βœ… Optimize progressively: Connection pooling, caching

βœ… Monitor everything: Log queries, track costs, measure latency

What data source are you connecting first? Drop a comment with your use case! πŸ’¬

Top comments (0)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.