DEV Community

LS
LS

Posted on • Edited on

flyway implementation

#!/usr/bin/env python3
"""
Generate DML SQL for populating the etl_config table from a YAML file,
and write it as a Flyway migration.

Usage:
    python etl_config.py etl_config --name <NAME> [--yaml-file PATH] [--table etl_config]

Assumptions:
    - YAML structure (recommended):

        etl_config:
          - pipeline_name: accounts_daily
            enabled: true
            schedule_cron: "0 3 * * *"
            source_table: accounts_raw
            target_table: accounts_curated
          - ...

    - The script is stored somewhere under the repo root; update CONFIG_DIR and
      MIGRATIONS_DIR below to match your layout.
"""

import argparse
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Sequence

import yaml  # pip install pyyaml


# ---------------------------------------------------------------------------
# CONFIGURE THESE PATHS FOR YOUR REPO LAYOUT
# ---------------------------------------------------------------------------

# Repo root: adjust the number of `.parents[...]` if needed for your layout.
REPO_ROOT = Path(__file__).resolve().parents[2]

# Where YAML config files live (one per --name).
# e.g. infrastructure/flyway/data-domains/accounts_datastore/etl_config/config/<name>.yaml
CONFIG_DIR = (
    REPO_ROOT
    / "infrastructure"
    / "flyway"
    / "data-domains"
    / "accounts_datastore"
    / "etl_config"
    / "config"
)

# Where Flyway looks for migrations for this domain.
# e.g. infrastructure/flyway/data-domains/accounts_datastore/etl_config/migrations/
MIGRATIONS_DIR = (
    REPO_ROOT
    / "infrastructure"
    / "flyway"
    / "data-domains"
    / "accounts_datastore"
    / "etl_config"
    / "migrations"
)


# ---------------------------------------------------------------------------
# HELPERS
# ---------------------------------------------------------------------------

def sql_literal(value: Any) -> str:
    """
    Convert a Python value into a SQL literal string.

    - None -> NULL
    - bool -> true/false
    - numbers -> as-is
    - everything else -> single-quoted string with quotes escaped
    """
    if value is None:
        return "NULL"

    if isinstance(value, bool):
        return "true" if value else "false"

    if isinstance(value, (int, float)):
        return str(value)

    # treat everything else as string
    text = str(value)
    # escape single quotes by doubling them
    text = text.replace("'", "''")
    return f"'{text}'"


def load_yaml(path: Path) -> Dict[str, Any]:
    if not path.exists():
        raise FileNotFoundError(f"YAML config not found: {path}")
    with path.open("r", encoding="utf-8") as f:
        data = yaml.safe_load(f)
    if data is None:
        raise ValueError(f"YAML file is empty: {path}")
    return data


def extract_records(yaml_data: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Try to get the list of records from YAML.

    Preferred: top-level key "etl_config" is a list.
    Fallback: YAML itself is a list.
    """
    if isinstance(yaml_data, list):
        if not yaml_data:
            raise ValueError("YAML list is empty; nothing to generate.")
        return yaml_data

    if "etl_config" in yaml_data:
        records = yaml_data["etl_config"]
        if not isinstance(records, list):
            raise ValueError("Expected 'etl_config' key to contain a list of records.")
        if not records:
            raise ValueError("'etl_config' list is empty; nothing to generate.")
        return records

    raise ValueError("YAML must be a list or contain an 'etl_config' list.")


def infer_columns(records: Sequence[Dict[str, Any]]) -> List[str]:
    """
    Infer the list of columns from the first record and ensure all records
    have exactly the same keys. This keeps the SQL deterministic.
    """
    first_keys = set(records[0].keys())
    if not first_keys:
        raise ValueError("First record has no columns; cannot infer schema.")

    for idx, rec in enumerate(records[1:], start=2):
        keys = set(rec.keys())
        if keys != first_keys:
            raise ValueError(
                f"Record #{idx} has different keys.\n"
                f"Expected: {sorted(first_keys)}\n"
                f"Found   : {sorted(keys)}"
            )

    return sorted(first_keys)


def build_insert_sql(table: str, records: Sequence[Dict[str, Any]]) -> str:
    """
    Build a SQL script with INSERTs for all records.
    """
    columns = infer_columns(records)
    col_list = ", ".join(columns)

    statements: List[str] = []

    for rec in records:
        values_sql = ", ".join(sql_literal(rec.get(col)) for col in columns)
        stmt = f"INSERT INTO {table} ({col_list}) VALUES ({values_sql});"
        statements.append(stmt)

    script = "BEGIN;\n\n" + "\n\n".join(statements) + "\n\nCOMMIT;\n"
    return script


def next_migration_filename(config_type: str, name: str) -> str:
    """
    Produce a Flyway-style migration filename, e.g.:

        V20251127_123456__etl_config_accounts_daily.sql
    """
    ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    safe_type = config_type.replace("-", "_")
    safe_name = name.replace("-", "_")
    return f"V{ts}__{safe_type}_{safe_name}.sql"


def write_migration(content: str, filename: str) -> Path:
    MIGRATIONS_DIR.mkdir(parents=True, exist_ok=True)
    out_path = MIGRATIONS_DIR / filename
    out_path.write_text(content, encoding="utf-8")
    return out_path


# ---------------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------------

def parse_args(argv: Sequence[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Generate Flyway DML migration for etl_config from YAML."
    )
    parser.add_argument(
        "config_type",
        choices=["etl_config"],
        help="Type of config to generate (currently only 'etl_config').",
    )
    parser.add_argument(
        "--name",
        required=True,
        help="Logical name for this config; used to pick YAML and to name the migration.",
    )
    parser.add_argument(
        "--yaml-file",
        help="Optional explicit path to YAML file. "
             "If omitted, defaults to CONFIG_DIR/<name>.yaml",
    )
    parser.add_argument(
        "--table",
        default="etl_config",
        help="Target database table name (default: etl_config).",
    )
    return parser.parse_args(argv)


def main(argv: Sequence[str]) -> int:
    args = parse_args(argv)

    # Determine which YAML file to load
    if args.yaml_file:
        yaml_path = Path(args.yaml_file).resolve()
    else:
        yaml_path = (CONFIG_DIR / f"{args.name}.yaml").resolve()

    print(f"Using YAML config: {yaml_path}")

    try:
        yaml_data = load_yaml(yaml_path)
        records = extract_records(yaml_data)
        sql_script = build_insert_sql(args.table, records)
    except Exception as exc:
        print(f"ERROR: {exc}", file=sys.stderr)
        return 1

    filename = next_migration_filename(args.config_type, args.name)
    out_path = write_migration(sql_script, filename)

    print(f"Generated migration: {out_path}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main(sys.argv[1:]))

Enter fullscreen mode Exit fullscreen mode

Top comments (0)