#!/usr/bin/env python3
"""
Generate DML SQL for populating the etl_config table from a YAML file,
and write it as a Flyway migration.
Usage:
python etl_config.py etl_config --name <NAME> [--yaml-file PATH] [--table etl_config]
Assumptions:
- YAML structure (recommended):
etl_config:
- pipeline_name: accounts_daily
enabled: true
schedule_cron: "0 3 * * *"
source_table: accounts_raw
target_table: accounts_curated
- ...
- The script is stored somewhere under the repo root; update CONFIG_DIR and
MIGRATIONS_DIR below to match your layout.
"""
import argparse
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Sequence
import yaml # pip install pyyaml
# ---------------------------------------------------------------------------
# CONFIGURE THESE PATHS FOR YOUR REPO LAYOUT
# ---------------------------------------------------------------------------
# Repo root: adjust the number of `.parents[...]` if needed for your layout.
REPO_ROOT = Path(__file__).resolve().parents[2]
# Where YAML config files live (one per --name).
# e.g. infrastructure/flyway/data-domains/accounts_datastore/etl_config/config/<name>.yaml
CONFIG_DIR = (
REPO_ROOT
/ "infrastructure"
/ "flyway"
/ "data-domains"
/ "accounts_datastore"
/ "etl_config"
/ "config"
)
# Where Flyway looks for migrations for this domain.
# e.g. infrastructure/flyway/data-domains/accounts_datastore/etl_config/migrations/
MIGRATIONS_DIR = (
REPO_ROOT
/ "infrastructure"
/ "flyway"
/ "data-domains"
/ "accounts_datastore"
/ "etl_config"
/ "migrations"
)
# ---------------------------------------------------------------------------
# HELPERS
# ---------------------------------------------------------------------------
def sql_literal(value: Any) -> str:
"""
Convert a Python value into a SQL literal string.
- None -> NULL
- bool -> true/false
- numbers -> as-is
- everything else -> single-quoted string with quotes escaped
"""
if value is None:
return "NULL"
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
return str(value)
# treat everything else as string
text = str(value)
# escape single quotes by doubling them
text = text.replace("'", "''")
return f"'{text}'"
def load_yaml(path: Path) -> Dict[str, Any]:
if not path.exists():
raise FileNotFoundError(f"YAML config not found: {path}")
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data is None:
raise ValueError(f"YAML file is empty: {path}")
return data
def extract_records(yaml_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Try to get the list of records from YAML.
Preferred: top-level key "etl_config" is a list.
Fallback: YAML itself is a list.
"""
if isinstance(yaml_data, list):
if not yaml_data:
raise ValueError("YAML list is empty; nothing to generate.")
return yaml_data
if "etl_config" in yaml_data:
records = yaml_data["etl_config"]
if not isinstance(records, list):
raise ValueError("Expected 'etl_config' key to contain a list of records.")
if not records:
raise ValueError("'etl_config' list is empty; nothing to generate.")
return records
raise ValueError("YAML must be a list or contain an 'etl_config' list.")
def infer_columns(records: Sequence[Dict[str, Any]]) -> List[str]:
"""
Infer the list of columns from the first record and ensure all records
have exactly the same keys. This keeps the SQL deterministic.
"""
first_keys = set(records[0].keys())
if not first_keys:
raise ValueError("First record has no columns; cannot infer schema.")
for idx, rec in enumerate(records[1:], start=2):
keys = set(rec.keys())
if keys != first_keys:
raise ValueError(
f"Record #{idx} has different keys.\n"
f"Expected: {sorted(first_keys)}\n"
f"Found : {sorted(keys)}"
)
return sorted(first_keys)
def build_insert_sql(table: str, records: Sequence[Dict[str, Any]]) -> str:
"""
Build a SQL script with INSERTs for all records.
"""
columns = infer_columns(records)
col_list = ", ".join(columns)
statements: List[str] = []
for rec in records:
values_sql = ", ".join(sql_literal(rec.get(col)) for col in columns)
stmt = f"INSERT INTO {table} ({col_list}) VALUES ({values_sql});"
statements.append(stmt)
script = "BEGIN;\n\n" + "\n\n".join(statements) + "\n\nCOMMIT;\n"
return script
def next_migration_filename(config_type: str, name: str) -> str:
"""
Produce a Flyway-style migration filename, e.g.:
V20251127_123456__etl_config_accounts_daily.sql
"""
ts = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
safe_type = config_type.replace("-", "_")
safe_name = name.replace("-", "_")
return f"V{ts}__{safe_type}_{safe_name}.sql"
def write_migration(content: str, filename: str) -> Path:
MIGRATIONS_DIR.mkdir(parents=True, exist_ok=True)
out_path = MIGRATIONS_DIR / filename
out_path.write_text(content, encoding="utf-8")
return out_path
# ---------------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------------
def parse_args(argv: Sequence[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate Flyway DML migration for etl_config from YAML."
)
parser.add_argument(
"config_type",
choices=["etl_config"],
help="Type of config to generate (currently only 'etl_config').",
)
parser.add_argument(
"--name",
required=True,
help="Logical name for this config; used to pick YAML and to name the migration.",
)
parser.add_argument(
"--yaml-file",
help="Optional explicit path to YAML file. "
"If omitted, defaults to CONFIG_DIR/<name>.yaml",
)
parser.add_argument(
"--table",
default="etl_config",
help="Target database table name (default: etl_config).",
)
return parser.parse_args(argv)
def main(argv: Sequence[str]) -> int:
args = parse_args(argv)
# Determine which YAML file to load
if args.yaml_file:
yaml_path = Path(args.yaml_file).resolve()
else:
yaml_path = (CONFIG_DIR / f"{args.name}.yaml").resolve()
print(f"Using YAML config: {yaml_path}")
try:
yaml_data = load_yaml(yaml_path)
records = extract_records(yaml_data)
sql_script = build_insert_sql(args.table, records)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
filename = next_migration_filename(args.config_type, args.name)
out_path = write_migration(sql_script, filename)
print(f"Generated migration: {out_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)