from flask import Flask, jsonify, request, send_file
from flask_cors import CORS
import snowflake.connector
import pandas as pd
import os
from datetime import datetime, timedelta
import json
import logging
from typing import Dict, List, Any
import io
app = Flask(__name__)
CORS(app)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Snowflake connection configuration
SNOWFLAKE_CONFIG = {
'user': os.getenv('SNOWFLAKE_USER'),
'password': os.getenv('SNOWFLAKE_PASSWORD'),
'account': os.getenv('SNOWFLAKE_ACCOUNT'),
'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE'),
'database': os.getenv('SNOWFLAKE_DATABASE'),
'schema': os.getenv('SNOWFLAKE_SCHEMA')
}
class SnowflakeConnection:
def __init__(self):
self.connection = None
def connect(self):
try:
self.connection = snowflake.connector.connect(**SNOWFLAKE_CONFIG)
logger.info("Successfully connected to Snowflake")
return self.connection
except Exception as e:
logger.error(f"Failed to connect to Snowflake: {str(e)}")
raise
def execute_query(self, query: str, params: Dict = None) -> pd.DataFrame:
if not self.connection:
self.connect()
try:
cursor = self.connection.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
# Fetch results and column names
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Create DataFrame
df = pd.DataFrame(results, columns=columns)
cursor.close()
return df
except Exception as e:
logger.error(f"Query execution failed: {str(e)}")
raise
def execute_procedure(self, procedure_name: str, params: List = None):
if not self.connection:
self.connect()
try:
cursor = self.connection.cursor()
if params:
cursor.callproc(procedure_name, params)
else:
cursor.callproc(procedure_name)
cursor.close()
logger.info(f"Successfully executed procedure: {procedure_name}")
except Exception as e:
logger.error(f"Procedure execution failed: {str(e)}")
raise
# Initialize Snowflake connection
sf_conn = SnowflakeConnection()
# Stored Procedures Creation
STORED_PROCEDURES = {
'warehouse_metrics': """
CREATE OR REPLACE PROCEDURE REFRESH_WAREHOUSE_METRICS()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
-- Create or replace warehouse metrics table
CREATE OR REPLACE TABLE FINOPS_WAREHOUSE_METRICS AS
WITH warehouse_base AS (
SELECT
warehouse_name,
COUNT(*) as total_queries,
SUM(credits_used_cloud_services + credits_used_compute) as total_credits,
COUNT(DISTINCT DATE(start_time)) as active_days,
AVG(CASE WHEN execution_status = 'SUCCESS'
THEN DATEDIFF('second', start_time, end_time) END) as avg_execution_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
GROUP BY warehouse_name
),
performance_buckets AS (
SELECT
warehouse_name,
SUM(CASE WHEN execution_time_ms BETWEEN 0 AND 1000 THEN 1 ELSE 0 END) as queries_0_to_1_sec,
SUM(CASE WHEN execution_time_ms BETWEEN 1001 AND 10000 THEN 1 ELSE 0 END) as queries_1_to_10_sec,
SUM(CASE WHEN execution_time_ms BETWEEN 10001 AND 30000 THEN 1 ELSE 0 END) as queries_10_to_30_sec,
SUM(CASE WHEN execution_time_ms BETWEEN 30001 AND 60000 THEN 1 ELSE 0 END) as queries_30_to_60_sec,
SUM(CASE WHEN execution_time_ms BETWEEN 60001 AND 300000 THEN 1 ELSE 0 END) as queries_1_to_5_min,
SUM(CASE WHEN execution_time_ms > 300000 THEN 1 ELSE 0 END) as queries_5_min_plus
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
AND warehouse_name IS NOT NULL
GROUP BY warehouse_name
),
bad_practices AS (
SELECT
warehouse_name,
SUM(CASE WHEN query_text ILIKE '%SELECT *%' THEN 1 ELSE 0 END) as select_star_queries,
SUM(CASE WHEN partitions_scanned > partitions_total * 0.8 THEN 1 ELSE 0 END) as unpartitioned_scan_queries,
SUM(CASE WHEN query_text ILIKE '%CROSS JOIN%' OR query_text ILIKE '%CARTESIAN%' THEN 1 ELSE 0 END) as cartesian_join_queries,
SUM(CASE WHEN rows_produced = 0 THEN 1 ELSE 0 END) as zero_result_queries,
SUM(CASE WHEN execution_status IN ('FAIL', 'CANCELLED') THEN 1 ELSE 0 END) as failed_cancelled_queries,
SUM(CASE WHEN compilation_time_ms > 5000 THEN 1 ELSE 0 END) as high_compile_time_queries,
SUM(CASE WHEN bytes_spilled_to_local_storage > 0 THEN 1 ELSE 0 END) as spilled_to_local_queries,
SUM(CASE WHEN bytes_spilled_to_remote_storage > 0 THEN 1 ELSE 0 END) as spilled_to_remote_queries
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
AND warehouse_name IS NOT NULL
GROUP BY warehouse_name
),
cost_efficiency AS (
SELECT
warehouse_name,
SUM(CASE WHEN DAYOFWEEK(start_time) IN (1, 7)
THEN credits_used_cloud_services + credits_used_compute ELSE 0 END) as weekend_idle_credits,
AVG(queue_time_ms) as queue_wait_time_avg_ms
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
AND warehouse_name IS NOT NULL
GROUP BY warehouse_name
)
SELECT
wb.warehouse_name,
wb.total_queries,
wb.total_credits,
(wb.active_days * 24.0 / 30) as active_hours_per_day,
COALESCE(pb.queries_0_to_1_sec, 0) as queries_0_to_1_sec,
COALESCE(pb.queries_1_to_10_sec, 0) as queries_1_to_10_sec,
COALESCE(pb.queries_10_to_30_sec, 0) as queries_10_to_30_sec,
COALESCE(pb.queries_30_to_60_sec, 0) as queries_30_to_60_sec,
COALESCE(pb.queries_1_to_5_min, 0) as queries_1_to_5_min,
COALESCE(pb.queries_5_min_plus, 0) as queries_5_min_plus,
COALESCE(bp.select_star_queries, 0) as select_star_queries,
COALESCE(bp.unpartitioned_scan_queries, 0) as unpartitioned_scan_queries,
COALESCE(bp.cartesian_join_queries, 0) as cartesian_join_queries,
COALESCE(bp.zero_result_queries, 0) as zero_result_queries,
COALESCE(bp.failed_cancelled_queries, 0) as failed_cancelled_queries,
COALESCE(bp.high_compile_time_queries, 0) as high_compile_time_queries,
COALESCE(bp.spilled_to_local_queries, 0) as spilled_to_local_queries,
COALESCE(bp.spilled_to_remote_queries, 0) as spilled_to_remote_queries,
COALESCE(ce.weekend_idle_credits, 0) as weekend_idle_credits,
COALESCE(ce.queue_wait_time_avg_ms, 0) as queue_wait_time_avg_ms,
CURRENT_TIMESTAMP() as last_updated
FROM warehouse_base wb
LEFT JOIN performance_buckets pb ON wb.warehouse_name = pb.warehouse_name
LEFT JOIN bad_practices bp ON wb.warehouse_name = bp.warehouse_name
LEFT JOIN cost_efficiency ce ON wb.warehouse_name = ce.warehouse_name;
RETURN 'Warehouse metrics refreshed successfully';
END;
$$;
""",
'database_metrics': """
CREATE OR REPLACE PROCEDURE REFRESH_DATABASE_METRICS()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
CREATE OR REPLACE TABLE FINOPS_DATABASE_METRICS AS
WITH database_storage AS (
SELECT
database_name,
SUM(storage_bytes) / (1024*1024*1024) as total_storage_gb,
COUNT(DISTINCT table_name) as table_count,
SUM(time_travel_bytes) / (1024*1024*1024) as time_travel_storage_gb
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
WHERE deleted IS NULL
GROUP BY database_name
),
query_patterns AS (
SELECT
database_name,
SUM(CASE WHEN query_text ILIKE '%SELECT *%' THEN 1 ELSE 0 END) as select_star_on_large_tables,
SUM(CASE WHEN partitions_scanned > partitions_total * 0.8 THEN 1 ELSE 0 END) as unpartitioned_scans_count,
SUM(CASE WHEN partitions_scanned = partitions_total THEN 1 ELSE 0 END) as full_table_scan_queries
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
AND database_name IS NOT NULL
GROUP BY database_name
)
SELECT
ds.database_name,
ds.total_storage_gb,
ds.table_count,
ds.time_travel_storage_gb,
COALESCE(qp.select_star_on_large_tables, 0) as select_star_on_large_tables,
COALESCE(qp.unpartitioned_scans_count, 0) as unpartitioned_scans_count,
COALESCE(qp.full_table_scan_queries, 0) as full_table_scan_queries,
CURRENT_TIMESTAMP() as last_updated
FROM database_storage ds
LEFT JOIN query_patterns qp ON ds.database_name = qp.database_name;
RETURN 'Database metrics refreshed successfully';
END;
$$;
""",
'user_metrics': """
CREATE OR REPLACE PROCEDURE REFRESH_USER_METRICS()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
CREATE OR REPLACE TABLE FINOPS_USER_METRICS AS
WITH user_base AS (
SELECT
user_name,
warehouse_name,
COUNT(*) as total_queries,
SUM(credits_used_cloud_services + credits_used_compute) as total_credits,
SUM(CASE WHEN query_text ILIKE '%SELECT *%' THEN 1 ELSE 0 END) as select_star_queries,
SUM(CASE WHEN partitions_scanned > partitions_total * 0.8 THEN 1 ELSE 0 END) as unpartitioned_scan_queries,
SUM(CASE WHEN bytes_spilled_to_local_storage > 0 THEN 1 ELSE 0 END) as spilled_queries,
SUM(CASE WHEN execution_time_ms > 300000 THEN 1 ELSE 0 END) as long_running_queries,
SUM(CASE WHEN rows_produced = 0 THEN 1 ELSE 0 END) as zero_result_queries
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
AND user_name IS NOT NULL
AND warehouse_name IS NOT NULL
GROUP BY user_name, warehouse_name
),
warehouse_totals AS (
SELECT
warehouse_name,
SUM(total_credits) as warehouse_total_credits
FROM user_base
GROUP BY warehouse_name
)
SELECT
ub.user_name,
ub.warehouse_name,
ub.total_queries,
ub.total_credits,
(ub.total_credits / wt.warehouse_total_credits * 100) as percentage_of_warehouse_usage,
ub.select_star_queries,
ub.unpartitioned_scan_queries,
ub.spilled_queries,
ub.long_running_queries,
ub.zero_result_queries,
CASE
WHEN ub.total_credits > 1000 THEN 'High Cost'
ELSE 'Low Cost'
END as cost_status,
CURRENT_TIMESTAMP() as last_updated
FROM user_base ub
JOIN warehouse_totals wt ON ub.warehouse_name = wt.warehouse_name;
RETURN 'User metrics refreshed successfully';
END;
$$;
"""
}
def create_stored_procedures():
"""Create all stored procedures in Snowflake"""
try:
for proc_name, proc_sql in STORED_PROCEDURES.items():
sf_conn.execute_query(proc_sql)
logger.info(f"Created stored procedure: {proc_name}")
except Exception as e:
logger.error(f"Failed to create stored procedures: {str(e)}")
raise
def refresh_all_metrics():
"""Refresh all metric tables using stored procedures"""
procedures = [
'REFRESH_WAREHOUSE_METRICS',
'REFRESH_DATABASE_METRICS',
'REFRESH_USER_METRICS'
]
for proc in procedures:
try:
sf_conn.execute_procedure(proc)
logger.info(f"Refreshed metrics using: {proc}")
except Exception as e:
logger.error(f"Failed to refresh {proc}: {str(e)}")
raise
# API Routes
@app.route('/api/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
@app.route('/api/refresh-metrics', methods=['POST'])
def refresh_metrics():
"""Refresh all metrics tables"""
try:
refresh_all_metrics()
return jsonify({
'status': 'success',
'message': 'All metrics refreshed successfully',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e),
'timestamp': datetime.now().isoformat()
}), 500
@app.route('/api/warehouses', methods=['GET'])
def get_warehouse_metrics():
"""Get warehouse metrics data"""
try:
query = """
SELECT * FROM FINOPS_WAREHOUSE_METRICS
ORDER BY total_credits DESC
"""
df = sf_conn.execute_query(query)
# Convert to JSON format expected by frontend
data = df.to_dict('records')
# Save to CSV if requested
if request.args.get('export') == 'csv':
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
return send_file(
io.BytesIO(csv_buffer.getvalue().encode()),
mimetype='text/csv',
as_attachment=True,
download_name=f'warehouse_metrics_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
)
return jsonify(data)
except Exception as e:
logger.error(f"Error fetching warehouse metrics: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/databases', methods=['GET'])
def get_database_metrics():
"""Get database metrics data"""
try:
query = """
SELECT * FROM FINOPS_DATABASE_METRICS
ORDER BY total_storage_gb DESC
"""
df = sf_conn.execute_query(query)
data = df.to_dict('records')
if request.args.get('export') == 'csv':
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
return send_file(
io.BytesIO(csv_buffer.getvalue().encode()),
mimetype='text/csv',
as_attachment=True,
download_name=f'database_metrics_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
)
return jsonify(data)
except Exception as e:
logger.error(f"Error fetching database metrics: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/users', methods=['GET'])
def get_user_metrics():
"""Get user metrics data with optional filtering"""
try:
warehouse_filter = request.args.get('warehouse')
metric_filter = request.args.get('metric')
query = "SELECT * FROM FINOPS_USER_METRICS"
conditions = []
if warehouse_filter:
conditions.append(f"warehouse_name = '{warehouse_filter}'")
if metric_filter:
# Add specific metric filtering logic based on drill-down context
if metric_filter == 'select_star_queries':
conditions.append("select_star_queries > 0")
elif metric_filter == 'unpartitioned_scan_queries':
conditions.append("unpartitioned_scan_queries > 0")
elif metric_filter == 'spilled_queries':
conditions.append("spilled_queries > 0")
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY total_credits DESC"
df = sf_conn.execute_query(query)
data = df.to_dict('records')
if request.args.get('export') == 'csv':
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
return send_file(
io.BytesIO(csv_buffer.getvalue().encode()),
mimetype='text/csv',
as_attachment=True,
download_name=f'user_metrics_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
)
return jsonify(data)
except Exception as e:
logger.error(f"Error fetching user metrics: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/queries', methods=['GET'])
def get_query_history():
"""Get query history for specific user/warehouse/metric combination"""
try:
user_name = request.args.get('user')
warehouse_name = request.args.get('warehouse')
metric_type = request.args.get('metric_type')
limit = request.args.get('limit', 100)
query = """
SELECT
query_id,
user_name,
warehouse_name,
LEFT(query_text, 100) as query_text_preview,
start_time,
execution_time_ms,
bytes_scanned,
credits_used_cloud_services + credits_used_compute as credits_used,
error_code,
execution_status
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD('day', -30, CURRENT_TIMESTAMP())
"""
conditions = []
if user_name:
conditions.append(f"user_name = '{user_name}'")
if warehouse_name:
conditions.append(f"warehouse_name = '{warehouse_name}'")
# Add metric-specific filtering
if metric_type:
if metric_type == 'select_star_queries':
conditions.append("query_text ILIKE '%SELECT *%'")
elif metric_type == 'unpartitioned_scan_queries':
conditions.append("partitions_scanned > partitions_total * 0.8")
elif metric_type == 'spilled_queries':
conditions.append("bytes_spilled_to_local_storage > 0")
elif metric_type == 'long_running_queries':
conditions.append("execution_time_ms > 300000")
if conditions:
query += " AND " + " AND ".join(conditions)
query += f" ORDER BY start_time DESC LIMIT {limit}"
df = sf_conn.execute_query(query)
data = df.to_dict('records')
if request.args.get('export') == 'csv':
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
csv_buffer.seek(0)
return send_file(
io.BytesIO(csv_buffer.getvalue().encode()),
mimetype='text/csv',
as_attachment=True,
download_name=f'query_history_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
)
return jsonify(data)
except Exception as e:
logger.error(f"Error fetching query history: {str(e)}")
return jsonify({'error': str(e)}), 500
# @app.route('/api/query/```
#
```text
# Flask==2.3.3
# Flask-CORS==4.0.0
# snowflake-connector-python==3.4.0
# pandas==2.1.1
# python-dotenv==1.0.0
# gunicorn==21.2.0
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)