DEV Community

linou518
linou518

Posted on

Safe CSV Ingestion into PostgreSQL: A Multi-Tenant ETL Pipeline Pattern

When building a SaaS where users can upload arbitrary CSV files for analysis, the trickiest problem is "we don't know the schema ahead of time." Normal RDBMSes require you to define column names and types before creating a table. But user CSVs might have 10 columns or 100. Column names might be 売上金額 or revenue_amount, with spaces or symbols mixed in.

Here's the ETL pipeline I recently implemented for a CSV analytics platform, and the patterns I learned along the way.

Overall Flow

S3 (uploaded CSV)
  ↓
Parser: type inference + column name normalization
  ↓
Staging table (dynamically created)
  ↓
DWH table (per-company schema)
Enter fullscreen mode Exit fullscreen mode

Auth: AWS Cognito. Storage: S3. DB: RDS PostgreSQL (async via SQLAlchemy + asyncpg). Backend: FastAPI. ETL triggered via POST /api/etl/{upload_id}/run.

Pattern 1: Column Name Normalization

User CSV headers can be anything: 売上金額, Revenue (JPY), col (with spaces), empty strings, duplicates. Using these directly as SQL column names invites injection risks and syntax errors.

def _normalize_column_name(name: Any, index: int, seen: set[str]) -> str:
    normalized = re.sub(r"[^a-z0-9]+", "_", str(name).strip().lower()).strip("_")
    if not normalized:
        normalized = f"column_{index + 1}"
    if not normalized[0].isalpha():
        normalized = f"col_{normalized}"
    # Respect PostgreSQL's 63-char identifier limit
    normalized = normalized[:63].rstrip("_") or f"column_{index + 1}"
    # Handle duplicates with suffix
    candidate = normalized
    suffix = 1
    while candidate in seen:
        suffix_text = f"_{suffix}"
        base = normalized[:63 - len(suffix_text)].rstrip("_")
        candidate = f"{base}{suffix_text}"
        suffix += 1
    seen.add(candidate)
    return candidate
Enter fullscreen mode Exit fullscreen mode

Pass a seen: set[str] to detect duplicates and append _1, _2... suffixes. Strictly enforce PostgreSQL's 63-character identifier limit.

Pattern 2: Type Inference

Read all columns as strings, then determine: is this numeric? date? or just text?

def _infer_column_type(series: pd.Series) -> tuple[str, pd.Series]:
    non_null_mask = series.notna()
    if not non_null_mask.any():
        return "text", series

    numeric_series = pd.to_numeric(series, errors="coerce")
    if numeric_series[non_null_mask].notna().all():
        return "numeric", numeric_series

    datetime_series = pd.to_datetime(series, errors="coerce", utc=True)
    if datetime_series[non_null_mask].notna().all():
        return "date", datetime_series

    return "text", series
Enter fullscreen mode Exit fullscreen mode

Simple logic: "if all non-null values convert to numeric → numeric; if all convert to datetime → date; otherwise text." The non_null_mask filter prevents misclassification on columns with NULLs.

PostgreSQL type mapping:

_TYPE_MAP = {
    "numeric": "DOUBLE PRECISION",
    "date": "TIMESTAMPTZ",
    "text": "TEXT",
}
Enter fullscreen mode Exit fullscreen mode

Pattern 3: Dynamic Table Name Generation + SQL Injection Prevention

Dynamic DDL requires assembling raw SQL strings. The CREATE TABLE {table_name} (...) portion can't use placeholders. This is the scariest part.

Solution: whitelist validation before embedding in SQL.

_VALID_SQL_IDENTIFIER = re.compile(r"^[a-z][a-z0-9_]*$")

def _validate_table_name(name: str) -> str:
    if not _VALID_SQL_IDENTIFIER.match(name):
        raise ValueError(f"Invalid SQL table name: {name}")
    return name
Enter fullscreen mode Exit fullscreen mode

Table naming convention: c{company_id_hex8}_{safe_filename}_{timestamp}. The company ID prefix also handles multi-tenant isolation.

Pattern 4: Automatic Encoding Detection

Japanese CSV files commonly cause encoding issues. Try UTF-8, fall back to latin-1 on failure:

try:
    df = _read_csv(csv_bytes, "utf-8")
except UnicodeDecodeError:
    df = _read_csv(csv_bytes, "latin-1")
Enter fullscreen mode Exit fullscreen mode

Shift-JIS files will display garbled under latin-1, but won't throw parse errors. chardet would be more accurate, but given that most users upload Excel-exported CSVs, UTF-8/latin-1 covers the common cases.

Pattern 5: Status Tracking for Visibility

ETL is async and takes time. Users need to know where things stand when they refresh, so track status in an uploads table:

pending → processing → completed
                     ↘ failed
Enter fullscreen mode Exit fullscreen mode

Set PROCESSING when ETL starts. Update to completed or failed on finish. Frontend polls or uses WebSocket to monitor status.

Summary

"Users can upload any CSV" looks simple but hides a stack of unglamorous but critical problems: column name normalization, type inference, SQL injection prevention, encoding handling.

The lesson in one sentence: "Never trust input. Validate everything before touching it."

User data always arrives in unexpected shapes. Whether your system crashes or returns a clean error is what separates quality SaaS from fragile ones.

Top comments (0)