-- ==================== SNOWFLAKE TABLE CREATION SCRIPTS ====================
CREATE OR REPLACE TABLE WAREHOUSE_ANALYTICS AS
WITH warehouse_base AS (
SELECT DISTINCT
wh.WAREHOUSE_ID,
wh.WAREHOUSE_NAME,
we.SIZE as CURRENT_SIZE,
we.CLUSTER_COUNT as CURRENT_CLUSTER_COUNT,
we.WAREHOUSE_TYPE,
-- Get min/max cluster settings from events
MAX(CASE WHEN we.EVENT_NAME = 'CLUSTER_COUNT_CHANGED' THEN we.CLUSTER_COUNT END) as MAX_CLUSTER_COUNT,
MIN(CASE WHEN we.EVENT_NAME = 'CLUSTER_COUNT_CHANGED' THEN we.CLUSTER_COUNT END) as MIN_CLUSTER_COUNT,
-- Warehouse lifecycle
MIN(we.TIMESTAMP) as FIRST_USED_DATE,
MAX(we.TIMESTAMP) as LAST_USED_DATE
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_EVENTS_HISTORY we
ON wh.WAREHOUSE_ID = we.WAREHOUSE_ID
WHERE wh.START_TIME >= DATEADD('day', -30, CURRENT_TIMESTAMP())
GROUP BY wh.WAREHOUSE_ID, wh.WAREHOUSE_NAME, we.SIZE, we.CLUSTER_COUNT, we.WAREHOUSE_TYPE
),
warehouse_stats AS (
SELECT
wb.WAREHOUSE_ID,
wb.WAREHOUSE_NAME,
wb.CURRENT_SIZE,
wb.CURRENT_CLUSTER_COUNT,
wb.WAREHOUSE_TYPE,
COALESCE(wb.MAX_CLUSTER_COUNT, wb.CURRENT_CLUSTER_COUNT) as MAX_CLUSTER_COUNT,
COALESCE(wb.MIN_CLUSTER_COUNT, wb.CURRENT_CLUSTER_COUNT) as MIN_CLUSTER_COUNT,
wb.FIRST_USED_DATE,
wb.LAST_USED_DATE,
-- Query Statistics
COUNT(qh.QUERY_ID) as TOTAL_QUERIES,
COUNT(DISTINCT qh.USER_NAME) as UNIQUE_USERS,
COUNT(DISTINCT DATE(qh.START_TIME)) as ACTIVE_DAYS_LAST_30,
ROUND(COUNT(DISTINCT HOUR(qh.START_TIME)) / GREATEST(COUNT(DISTINCT DATE(qh.START_TIME)), 1), 1) as AVG_ACTIVE_HOURS_PER_DAY,
-- Credit and Cost Analysis
ROUND(SUM(wh.CREDITS_USED), 2) as TOTAL_CREDITS_CONSUMED,
ROUND(SUM(wh.CREDITS_USED_COMPUTE), 2) as COMPUTE_CREDITS,
ROUND(SUM(wh.CREDITS_USED_CLOUD_SERVICES), 2) as CLOUD_SERVICE_CREDITS,
ROUND(AVG(wh.CREDITS_USED), 4) as AVG_CREDITS_PER_HOUR,
-- Performance Buckets
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME <= 1000 THEN 1 ELSE 0 END) as QUERIES_0_TO_1_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 1000 AND qh.TOTAL_ELAPSED_TIME <= 10000 THEN 1 ELSE 0 END) as QUERIES_1_TO_10_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 10000 AND qh.TOTAL_ELAPSED_TIME <= 30000 THEN 1 ELSE 0 END) as QUERIES_10_TO_30_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 30000 AND qh.TOTAL_ELAPSED_TIME <= 60000 THEN 1 ELSE 0 END) as QUERIES_30_TO_60_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 60000 AND qh.TOTAL_ELAPSED_TIME <= 300000 THEN 1 ELSE 0 END) as QUERIES_1_TO_5_MIN,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 300000 THEN 1 ELSE 0 END) as QUERIES_5_MIN_PLUS,
-- Warehouse Utilization Buckets (based on query load)
SUM(CASE WHEN qh.QUERY_LOAD_PERCENT BETWEEN 0 AND 20 THEN 1 ELSE 0 END) as QUERIES_0_TO_20_PCT_UTIL,
SUM(CASE WHEN qh.QUERY_LOAD_PERCENT BETWEEN 21 AND 40 THEN 1 ELSE 0 END) as QUERIES_21_TO_40_PCT_UTIL,
SUM(CASE WHEN qh.QUERY_LOAD_PERCENT BETWEEN 41 AND 60 THEN 1 ELSE 0 END) as QUERIES_41_TO_60_PCT_UTIL,
SUM(CASE WHEN qh.QUERY_LOAD_PERCENT BETWEEN 61 AND 80 THEN 1 ELSE 0 END) as QUERIES_61_TO_80_PCT_UTIL,
SUM(CASE WHEN qh.QUERY_LOAD_PERCENT BETWEEN 81 AND 100 THEN 1 ELSE 0 END) as QUERIES_81_TO_100_PCT_UTIL,
-- Bad Practice Categories
SUM(CASE WHEN qh.WAREHOUSE_SIZE = 'X-SMALL' AND qh.BYTES_SCANNED > 10737418240 THEN 1 ELSE 0 END) as OVER_PROVISIONED_QUERIES, -- 10GB threshold
SUM(CASE WHEN qh.BYTES_SPILLED_TO_LOCAL_STORAGE > 0 THEN 1 ELSE 0 END) as SPILLED_TO_LOCAL_QUERIES,
SUM(CASE WHEN qh.BYTES_SPILLED_TO_REMOTE_STORAGE > 0 THEN 1 ELSE 0 END) as SPILLED_TO_REMOTE_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%SELECT *%' AND NOT UPPER(qh.QUERY_TEXT) LIKE '%LIMIT%' THEN 1 ELSE 0 END) as SELECT_STAR_QUERIES,
SUM(CASE WHEN qh.PARTITIONS_TOTAL > 0 AND qh.PARTITIONS_SCANNED / qh.PARTITIONS_TOTAL > 0.5 THEN 1 ELSE 0 END) as UNPARTITIONED_SCAN_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%CROSS JOIN%' OR UPPER(qh.QUERY_TEXT) LIKE '%CARTESIAN%' THEN 1 ELSE 0 END) as CARTESIAN_JOIN_QUERIES,
SUM(CASE WHEN qh.ROWS_PRODUCED = 0 AND qh.BYTES_SCANNED > 1048576 THEN 1 ELSE 0 END) as ZERO_RESULT_QUERIES, -- 1MB threshold
SUM(CASE WHEN qh.EXECUTION_STATUS IN ('FAIL', 'CANCELLED') THEN 1 ELSE 0 END) as FAILED_CANCELLED_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%ORDER BY%' AND UPPER(qh.QUERY_TEXT) NOT LIKE '%LIMIT%' AND qh.ROWS_PRODUCED > 100000 THEN 1 ELSE 0 END) as UNLIMITED_ORDER_BY_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%DISTINCT%' AND qh.BYTES_SCANNED > 1073741824 THEN 1 ELSE 0 END) as EXPENSIVE_DISTINCT_QUERIES, -- 1GB threshold
SUM(CASE WHEN qh.COMPILATION_TIME > 5000 THEN 1 ELSE 0 END) as HIGH_COMPILE_TIME_QUERIES,
SUM(CASE WHEN DAYOFWEEK(qh.START_TIME) IN (1, 7) THEN 1 ELSE 0 END) as WEEKEND_QUERIES,
SUM(CASE WHEN HOUR(qh.START_TIME) BETWEEN 0 AND 6 OR HOUR(qh.START_TIME) BETWEEN 22 AND 23 THEN 1 ELSE 0 END) as OFF_HOURS_QUERIES,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME < 1000 AND qh.BYTES_SCANNED > 104857600 THEN 1 ELSE 0 END) as SMALL_QUERY_OVERHEAD, -- 100MB threshold
-- Resource Utilization
ROUND(AVG(qh.QUERY_LOAD_PERCENT), 2) as AVG_WAREHOUSE_UTILIZATION_PCT,
ROUND(MAX(qh.QUERY_LOAD_PERCENT), 2) as MAX_WAREHOUSE_UTILIZATION_PCT,
ROUND(SUM(qh.BYTES_SCANNED)/1024/1024/1024/1024, 2) as TOTAL_TB_SCANNED,
ROUND(SUM(qh.BYTES_SPILLED_TO_LOCAL_STORAGE + qh.BYTES_SPILLED_TO_REMOTE_STORAGE)/1024/1024/1024, 2) as TOTAL_GB_SPILLED,
-- Time Analysis
ROUND(AVG(qh.TOTAL_ELAPSED_TIME)/1000, 2) as AVG_QUERY_DURATION_SEC,
ROUND(MAX(qh.TOTAL_ELAPSED_TIME)/1000, 2) as MAX_QUERY_DURATION_SEC,
ROUND(AVG(qh.QUEUED_PROVISIONING_TIME + qh.QUEUED_REPAIR_TIME + qh.QUEUED_OVERLOAD_TIME)/1000, 2) as AVG_QUEUE_TIME_SEC,
-- Success Rate
ROUND(SUM(CASE WHEN qh.EXECUTION_STATUS = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(qh.QUERY_ID), 2) as SUCCESS_RATE_PCT,
-- Last Updated
CURRENT_TIMESTAMP() as LAST_UPDATED
FROM warehouse_base wb
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh ON wb.WAREHOUSE_ID = qh.WAREHOUSE_ID
AND qh.START_TIME >= DATEADD('day', -30, CURRENT_TIMESTAMP())
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
ON qh.WAREHOUSE_ID = wh.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wh.START_TIME)
GROUP BY wb.WAREHOUSE_ID, wb.WAREHOUSE_NAME, wb.CURRENT_SIZE, wb.CURRENT_CLUSTER_COUNT,
wb.WAREHOUSE_TYPE, wb.MAX_CLUSTER_COUNT, wb.MIN_CLUSTER_COUNT,
wb.FIRST_USED_DATE, wb.LAST_USED_DATE
)
SELECT * FROM warehouse_stats
WHERE TOTAL_QUERIES > 0 -- Only include warehouses that had queries
ORDER BY TOTAL_CREDITS_CONSUMED DESC;
-- 1. WAREHOUSE ANALYTICS TABLE
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.WAREHOUSE_ANALYTICS AS
WITH warehouse_base AS (
SELECT DISTINCT
wh.WAREHOUSE_ID,
wh.WAREHOUSE_NAME,
we.SIZE as CURRENT_SIZE,
we.CLUSTER_COUNT as CURRENT_CLUSTER_COUNT,
we.WAREHOUSE_TYPE,
-- Get warehouse configuration details
MAX(CASE WHEN we.EVENT_NAME = 'SUSPEND' THEN we.TIMESTAMP END) as LAST_SUSPEND_TIME,
MAX(CASE WHEN we.EVENT_NAME = 'RESUME' THEN we.TIMESTAMP END) as LAST_RESUME_TIME,
MAX(we.TIMESTAMP) as LAST_EVENT_TIME
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_EVENTS_HISTORY we
ON wh.WAREHOUSE_ID = we.WAREHOUSE_ID
WHERE wh.START_TIME >= DATEADD('day', -30, CURRENT_TIMESTAMP())
GROUP BY wh.WAREHOUSE_ID, wh.WAREHOUSE_NAME, we.SIZE, we.CLUSTER_COUNT, we.WAREHOUSE_TYPE
),
warehouse_metrics AS (
SELECT
wb.WAREHOUSE_ID,
wb.WAREHOUSE_NAME,
wb.CURRENT_SIZE,
wb.CURRENT_CLUSTER_COUNT,
wb.WAREHOUSE_TYPE,
wb.LAST_SUSPEND_TIME,
wb.LAST_RESUME_TIME,
wb.LAST_EVENT_TIME,
-- Basic Metrics
COUNT(DISTINCT qh.QUERY_ID) as TOTAL_QUERIES,
COUNT(DISTINCT qh.USER_NAME) as UNIQUE_USERS,
ROUND(SUM(wm.CREDITS_USED), 2) as TOTAL_CREDITS_CONSUMED,
ROUND(SUM(wm.CREDITS_USED_COMPUTE), 2) as COMPUTE_CREDITS,
ROUND(SUM(wm.CREDITS_USED_CLOUD_SERVICES), 2) as CLOUD_SERVICES_CREDITS,
-- Time-based Metrics
COUNT(DISTINCT DATE(qh.START_TIME)) as ACTIVE_DAYS,
COUNT(DISTINCT HOUR(qh.START_TIME)) as UNIQUE_HOURS_USED,
ROUND(COUNT(DISTINCT HOUR(qh.START_TIME)) / NULLIF(COUNT(DISTINCT DATE(qh.START_TIME)), 0), 1) as ACTIVE_HOURS_PER_DAY,
-- Warehouse Load Metrics
ROUND(AVG(wl.AVG_RUNNING), 2) as AVG_CONCURRENT_QUERIES,
ROUND(AVG(wl.AVG_QUEUED_LOAD), 2) as AVG_QUEUE_DEPTH,
ROUND(AVG(wl.AVG_QUEUED_PROVISIONING), 2) as AVG_PROVISIONING_DELAY,
ROUND(AVG(wl.AVG_BLOCKED), 2) as AVG_BLOCKED_QUERIES,
-- Performance Buckets (Query Duration)
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME <= 1000 THEN 1 ELSE 0 END) as QUERIES_0_TO_1_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 1000 AND qh.TOTAL_ELAPSED_TIME <= 10000 THEN 1 ELSE 0 END) as QUERIES_1_TO_10_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 10000 AND qh.TOTAL_ELAPSED_TIME <= 30000 THEN 1 ELSE 0 END) as QUERIES_10_TO_30_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 30000 AND qh.TOTAL_ELAPSED_TIME <= 60000 THEN 1 ELSE 0 END) as QUERIES_30_TO_60_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 60000 AND qh.TOTAL_ELAPSED_TIME <= 300000 THEN 1 ELSE 0 END) as QUERIES_1_TO_5_MIN,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 300000 THEN 1 ELSE 0 END) as QUERIES_5_MIN_PLUS,
-- Warehouse Utilization Buckets (based on concurrent queries)
SUM(CASE WHEN wl.AVG_RUNNING <= 0.2 THEN 1 ELSE 0 END) as LOW_UTILIZATION_PERIODS, -- 0-20%
SUM(CASE WHEN wl.AVG_RUNNING > 0.2 AND wl.AVG_RUNNING <= 0.4 THEN 1 ELSE 0 END) as MEDIUM_LOW_UTILIZATION, -- 20-40%
SUM(CASE WHEN wl.AVG_RUNNING > 0.4 AND wl.AVG_RUNNING <= 0.6 THEN 1 ELSE 0 END) as MEDIUM_UTILIZATION, -- 40-60%
SUM(CASE WHEN wl.AVG_RUNNING > 0.6 AND wl.AVG_RUNNING <= 0.8 THEN 1 ELSE 0 END) as MEDIUM_HIGH_UTILIZATION, -- 60-80%
SUM(CASE WHEN wl.AVG_RUNNING > 0.8 THEN 1 ELSE 0 END) as HIGH_UTILIZATION_PERIODS, -- 80%+
-- Bad Practice Categories
SUM(CASE WHEN qh.WAREHOUSE_SIZE = 'X-SMALL' AND qh.BYTES_SCANNED > 1000000000 THEN 1 ELSE 0 END) as OVER_PROVISIONED_QUERIES,
SUM(CASE WHEN qh.BYTES_SPILLED_TO_LOCAL_STORAGE > 0 THEN 1 ELSE 0 END) as SPILLED_TO_LOCAL_QUERIES,
SUM(CASE WHEN qh.BYTES_SPILLED_TO_REMOTE_STORAGE > 0 THEN 1 ELSE 0 END) as SPILLED_TO_REMOTE_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%SELECT *%' THEN 1 ELSE 0 END) as SELECT_STAR_QUERIES,
SUM(CASE WHEN qh.PARTITIONS_TOTAL > 0 AND qh.PERCENTAGE_SCANNED_FROM_CACHE < 10 THEN 1 ELSE 0 END) as UNPARTITIONED_SCAN_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%CROSS JOIN%' OR UPPER(qh.QUERY_TEXT) LIKE '%CARTESIAN%' THEN 1 ELSE 0 END) as CARTESIAN_JOIN_QUERIES,
SUM(CASE WHEN qh.ROWS_PRODUCED = 0 AND qh.BYTES_SCANNED > 0 THEN 1 ELSE 0 END) as ZERO_RESULT_QUERIES,
SUM(CASE WHEN qh.EXECUTION_STATUS IN ('FAIL', 'CANCELLED') THEN 1 ELSE 0 END) as FAILED_CANCELLED_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%ORDER BY%' AND UPPER(qh.QUERY_TEXT) NOT LIKE '%LIMIT%' AND qh.ROWS_PRODUCED > 100000 THEN 1 ELSE 0 END) as UNLIMITED_ORDER_BY,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%DISTINCT%' AND qh.BYTES_SCANNED > 1000000000 THEN 1 ELSE 0 END) as EXPENSIVE_DISTINCTS,
SUM(CASE WHEN qh.COMPILATION_TIME > 5000 THEN 1 ELSE 0 END) as HIGH_COMPILE_TIME,
SUM(CASE WHEN DAYOFWEEK(qh.START_TIME) IN (1, 7) THEN 1 ELSE 0 END) as WEEKEND_QUERIES,
SUM(CASE WHEN HOUR(qh.START_TIME) < 6 OR HOUR(qh.START_TIME) > 22 THEN 1 ELSE 0 END) as OFF_HOURS_USAGE,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME < 100 AND wm.CREDITS_USED > 0.01 THEN 1 ELSE 0 END) as SMALL_QUERY_OVERHEAD,
-- Additional Metrics
ROUND(AVG(qh.TOTAL_ELAPSED_TIME/1000), 2) as AVG_QUERY_DURATION_SEC,
ROUND(SUM(qh.BYTES_SCANNED)/1024/1024/1024, 2) as TOTAL_GB_SCANNED,
ROUND(AVG(qh.BYTES_SCANNED)/1024/1024/1024, 2) as AVG_GB_PER_QUERY,
SUM(qh.ROWS_PRODUCED) as TOTAL_ROWS_PRODUCED,
-- Efficiency Metrics
ROUND(SUM(wm.CREDITS_USED) / NULLIF(COUNT(qh.QUERY_ID), 0), 4) as COST_PER_QUERY,
ROUND(SUM(qh.BYTES_SCANNED) / NULLIF(SUM(wm.CREDITS_USED), 0) / 1024/1024/1024, 2) as GB_PER_CREDIT,
CURRENT_TIMESTAMP() as LAST_UPDATED
FROM warehouse_base wb
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
ON wb.WAREHOUSE_ID = qh.WAREHOUSE_ID
AND qh.START_TIME >= DATEADD('day', -30, CURRENT_TIMESTAMP())
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wm
ON wb.WAREHOUSE_ID = wm.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wm.START_TIME)
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_LOAD_HISTORY wl
ON wb.WAREHOUSE_ID = wl.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wl.START_TIME)
GROUP BY wb.WAREHOUSE_ID, wb.WAREHOUSE_NAME, wb.CURRENT_SIZE, wb.CURRENT_CLUSTER_COUNT,
wb.WAREHOUSE_TYPE, wb.LAST_SUSPEND_TIME, wb.LAST_RESUME_TIME, wb.LAST_EVENT_TIME
)
SELECT * FROM warehouse_metrics
ORDER BY TOTAL_CREDITS_CONSUMED DESC;
-- 2. USER ANALYTICS TABLE
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.USER_ANALYTICS AS
SELECT
u.USER_ID,
u.NAME as USER_NAME,
u.LOGIN_NAME,
u.EMAIL,
u.FIRST_NAME,
u.LAST_NAME,
u.DEFAULT_WAREHOUSE,
u.DEFAULT_ROLE,
u.CREATED_ON as USER_CREATED_DATE,
u.LAST_SUCCESS_LOGIN,
u.HAS_MFA,
u.DISABLED as IS_DISABLED,
u.TYPE as USER_TYPE,
-- Query Activity Metrics
COUNT(DISTINCT qh.QUERY_ID) as TOTAL_QUERIES,
COUNT(DISTINCT qh.WAREHOUSE_ID) as WAREHOUSES_USED,
COUNT(DISTINCT DATE(qh.START_TIME)) as ACTIVE_DAYS,
COUNT(DISTINCT qh.SESSION_ID) as UNIQUE_SESSIONS,
-- Credit Usage
ROUND(SUM(wm.CREDITS_USED), 2) as TOTAL_CREDITS_CONSUMED,
ROUND(AVG(wm.CREDITS_USED), 4) as AVG_CREDITS_PER_QUERY,
ROUND(SUM(wm.CREDITS_USED_COMPUTE), 2) as COMPUTE_CREDITS,
ROUND(SUM(wm.CREDITS_USED_CLOUD_SERVICES), 2) as CLOUD_SERVICES_CREDITS,
-- Performance Metrics
ROUND(AVG(qh.TOTAL_ELAPSED_TIME/1000), 2) as AVG_QUERY_DURATION_SEC,
ROUND(SUM(qh.BYTES_SCANNED)/1024/1024/1024, 2) as TOTAL_GB_SCANNED,
ROUND(AVG(qh.BYTES_SCANNED)/1024/1024/1024, 2) as AVG_GB_PER_QUERY,
SUM(qh.ROWS_PRODUCED) as TOTAL_ROWS_PRODUCED,
-- Query Success Metrics
SUM(CASE WHEN qh.EXECUTION_STATUS = 'SUCCESS' THEN 1 ELSE 0 END) as SUCCESSFUL_QUERIES,
SUM(CASE WHEN qh.EXECUTION_STATUS IN ('FAIL', 'CANCELLED') THEN 1 ELSE 0 END) as FAILED_QUERIES,
ROUND(SUM(CASE WHEN qh.EXECUTION_STATUS = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / NULLIF(COUNT(qh.QUERY_ID), 0), 2) as SUCCESS_RATE_PCT,
-- Time Distribution
SUM(CASE WHEN HOUR(qh.START_TIME) BETWEEN 6 AND 18 THEN 1 ELSE 0 END) as BUSINESS_HOURS_QUERIES,
SUM(CASE WHEN DAYOFWEEK(qh.START_TIME) IN (1, 7) THEN 1 ELSE 0 END) as WEEKEND_QUERIES,
-- Bad Practices
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%SELECT *%' THEN 1 ELSE 0 END) as SELECT_STAR_QUERIES,
SUM(CASE WHEN qh.BYTES_SPILLED_TO_LOCAL_STORAGE > 0 THEN 1 ELSE 0 END) as SPILLED_QUERIES,
SUM(CASE WHEN qh.ROWS_PRODUCED = 0 AND qh.BYTES_SCANNED > 0 THEN 1 ELSE 0 END) as ZERO_RESULT_QUERIES,
-- Most Used Warehouse
MODE(qh.WAREHOUSE_NAME) as MOST_USED_WAREHOUSE,
MODE(qh.QUERY_TYPE) as MOST_COMMON_QUERY_TYPE,
-- Login Information
MAX(lh.EVENT_TIMESTAMP) as LAST_LOGIN_TIME,
COUNT(DISTINCT lh.EVENT_ID) as LOGIN_COUNT_30_DAYS,
MIN(qh.START_TIME) as FIRST_QUERY_TIME,
MAX(qh.START_TIME) as LAST_QUERY_TIME,
CURRENT_TIMESTAMP() as LAST_UPDATED
FROM SNOWFLAKE.ACCOUNT_USAGE.USERS u
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
ON u.NAME = qh.USER_NAME
AND qh.START_TIME >= DATEADD('day', -30, CURRENT_TIMESTAMP())
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wm
ON qh.WAREHOUSE_ID = wm.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wm.START_TIME)
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.LOGIN_HISTORY lh
ON u.NAME = lh.USER_NAME
AND lh.EVENT_TIMESTAMP >= DATEADD('day', -30, CURRENT_TIMESTAMP())
WHERE u.DELETED_ON IS NULL
GROUP BY u.USER_ID, u.NAME, u.LOGIN_NAME, u.EMAIL, u.FIRST_NAME, u.LAST_NAME,
u.DEFAULT_WAREHOUSE, u.DEFAULT_ROLE, u.CREATED_ON, u.LAST_SUCCESS_LOGIN,
u.HAS_MFA, u.DISABLED, u.TYPE
ORDER BY TOTAL_CREDITS_CONSUMED DESC;
-- 3. QUERY SUMMARY TABLE
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_SUMMARY AS
SELECT
qh.QUERY_ID,
qh.QUERY_HASH,
qh.USER_NAME,
qh.ROLE_NAME,
qh.WAREHOUSE_NAME,
qh.WAREHOUSE_SIZE,
qh.DATABASE_NAME,
qh.SCHEMA_NAME,
qh.QUERY_TYPE,
qh.START_TIME,
qh.END_TIME,
qh.EXECUTION_STATUS,
qh.ERROR_CODE,
LEFT(qh.ERROR_MESSAGE, 500) as ERROR_MESSAGE_TRUNCATED,
-- Performance Metrics
ROUND(qh.TOTAL_ELAPSED_TIME/1000, 2) as TOTAL_DURATION_SEC,
ROUND(qh.COMPILATION_TIME/1000, 2) as COMPILATION_SEC,
ROUND(qh.EXECUTION_TIME/1000, 2) as EXECUTION_SEC,
ROUND(qh.QUEUED_PROVISIONING_TIME/1000, 2) as QUEUED_PROVISIONING_SEC,
ROUND(qh.QUEUED_REPAIR_TIME/1000, 2) as QUEUED_REPAIR_SEC,
ROUND(qh.QUEUED_OVERLOAD_TIME/1000, 2) as QUEUED_OVERLOAD_SEC,
-- Data Metrics
ROUND(qh.BYTES_SCANNED/1024/1024/1024, 2) as GB_SCANNED,
ROUND(qh.BYTES_WRITTEN/1024/1024/1024, 2) as GB_WRITTEN,
qh.ROWS_PRODUCED,
qh.ROWS_INSERTED,
qh.ROWS_UPDATED,
qh.ROWS_DELETED,
qh.PARTITIONS_SCANNED,
qh.PARTITIONS_TOTAL,
qh.PERCENTAGE_SCANNED_FROM_CACHE,
-- Resource Usage
ROUND(qh.BYTES_SPILLED_TO_LOCAL_STORAGE/1024/1024/1024, 2) as GB_SPILLED_LOCAL,
ROUND(qh.BYTES_SPILLED_TO_REMOTE_STORAGE/1024/1024/1024, 2) as GB_SPILLED_REMOTE,
ROUND(qh.BYTES_SENT_OVER_THE_NETWORK/1024/1024/1024, 2) as GB_NETWORK,
ROUND(wm.CREDITS_USED, 4) as CREDITS_USED,
qh.CREDITS_USED_CLOUD_SERVICES,
-- Query Categories
CASE
WHEN qh.TOTAL_ELAPSED_TIME <= 1000 THEN '0-1 SEC'
WHEN qh.TOTAL_ELAPSED_TIME <= 10000 THEN '1-10 SEC'
WHEN qh.TOTAL_ELAPSED_TIME <= 30000 THEN '10-30 SEC'
WHEN qh.TOTAL_ELAPSED_TIME <= 60000 THEN '30-60 SEC'
WHEN qh.TOTAL_ELAPSED_TIME <= 300000 THEN '1-5 MIN'
ELSE '5+ MIN'
END as PERFORMANCE_BUCKET,
-- Issue Flags
CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%SELECT *%' THEN 1 ELSE 0 END as IS_SELECT_STAR,
CASE WHEN qh.BYTES_SPILLED_TO_LOCAL_STORAGE > 0 THEN 1 ELSE 0 END as HAS_LOCAL_SPILL,
CASE WHEN qh.BYTES_SPILLED_TO_REMOTE_STORAGE > 0 THEN 1 ELSE 0 END as HAS_REMOTE_SPILL,
CASE WHEN qh.ROWS_PRODUCED = 0 AND qh.BYTES_SCANNED > 0 THEN 1 ELSE 0 END as IS_ZERO_RESULT,
CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%CROSS JOIN%' OR UPPER(qh.QUERY_TEXT) LIKE '%CARTESIAN%' THEN 1 ELSE 0 END as HAS_CARTESIAN_JOIN,
CASE WHEN qh.WAREHOUSE_SIZE = 'X-SMALL' AND qh.BYTES_SCANNED > 1000000000 THEN 1 ELSE 0 END as IS_OVER_PROVISIONED,
CASE WHEN HOUR(qh.START_TIME) < 6 OR HOUR(qh.START_TIME) > 22 THEN 1 ELSE 0 END as IS_OFF_HOURS,
CASE WHEN DAYOFWEEK(qh.START_TIME) IN (1, 7) THEN 1 ELSE 0 END as IS_WEEKEND,
-- Query Preview (first 200 chars)
LEFT(qh.QUERY_TEXT, 200) as QUERY_PREVIEW,
qh.QUERY_TAG,
qh.SESSION_ID,
qh.TRANSACTION_ID,
CURRENT_TIMESTAMP() as LAST_UPDATED
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wm
ON qh.WAREHOUSE_ID = wm.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wm.START_TIME)
WHERE qh.START_TIME >= DATEADD('day', -30, CURRENT_TIMESTAMP())
ORDER BY qh.START_TIME DESC;
-- 4. QUERY DETAILS TABLE (Full query text and metadata)
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_DETAILS AS
SELECT
qh.QUERY_ID,
qh.QUERY_TEXT,
qh.USER_NAME,
qh.ROLE_NAME,
qh.WAREHOUSE_NAME,
qh.DATABASE_NAME,
qh.SCHEMA_NAME,
qh.START_TIME,
qh.END_TIME,
qh.EXECUTION_STATUS,
qh.ERROR_CODE,
qh.ERROR_MESSAGE,
-- Complete Performance Metrics
qh.TOTAL_ELAPSED_TIME,
qh.COMPILATION_TIME,
qh.EXECUTION_TIME,
qh.QUEUED_PROVISIONING_TIME,
qh.QUEUED_REPAIR_TIME,
qh.QUEUED_OVERLOAD_TIME,
qh.TRANSACTION_BLOCKED_TIME,
qh.LIST_EXTERNAL_FILES_TIME,
qh.CHILD_QUERIES_WAIT_TIME,
qh.QUERY_RETRY_TIME,
qh.FAULT_HANDLING_TIME,
-- Complete Data Metrics
qh.BYTES_SCANNED,
qh.PERCENTAGE_SCANNED_FROM_CACHE,
qh.BYTES_WRITTEN,
qh.BYTES_WRITTEN_TO_RESULT,
qh.BYTES_READ_FROM_RESULT,
qh.ROWS_PRODUCED,
qh.ROWS_INSERTED,
qh.ROWS_UPDATED,
qh.ROWS_DELETED,
qh.ROWS_UNLOADED,
qh.BYTES_DELETED,
qh.PARTITIONS_SCANNED,
qh.PARTITIONS_TOTAL,
-- Complete Resource Usage
qh.BYTES_SPILLED_TO_LOCAL_STORAGE,
qh.BYTES_SPILLED_TO_REMOTE_STORAGE,
qh.BYTES_SENT_OVER_THE_NETWORK,
qh.OUTBOUND_DATA_TRANSFER_CLOUD,
qh.OUTBOUND_DATA_TRANSFER_REGION,
qh.OUTBOUND_DATA_TRANSFER_BYTES,
qh.INBOUND_DATA_TRANSFER_CLOUD,
qh.INBOUND_DATA_TRANSFER_REGION,
qh.INBOUND_DATA_TRANSFER_BYTES,
-- Credits and Costs
wm.CREDITS_USED as CREDITS_USED,
qh.CREDITS_USED_CLOUD_SERVICES,
qh.QUERY_ACCELERATION_BYTES_SCANNED,
qh.QUERY_ACCELERATION_PARTITIONS_SCANNED,
qh.QUERY_ACCELERATION_UPPER_LIMIT_SCALE_FACTOR,
-- External Functions
qh.EXTERNAL_FUNCTION_TOTAL_INVOCATIONS,
qh.EXTERNAL_FUNCTION_TOTAL_SENT_ROWS,
qh.EXTERNAL_FUNCTION_TOTAL_RECEIVED_ROWS,
qh.EXTERNAL_FUNCTION_TOTAL_SENT_BYTES,
qh.EXTERNAL_FUNCTION_TOTAL_RECEIVED_BYTES,
-- Query Metadata
qh.QUERY_TYPE,
qh.QUERY_TAG,
qh.QUERY_HASH,
qh.QUERY_HASH_VERSION,
qh.QUERY_PARAMETERIZED_HASH,
qh.QUERY_PARAMETERIZED_HASH_VERSION,
qh.WAREHOUSE_ID,
qh.WAREHOUSE_SIZE,
qh.WAREHOUSE_TYPE,
qh.CLUSTER_NUMBER,
qh.SESSION_ID,
qh.TRANSACTION_ID,
qh.ROLE_TYPE,
qh.USER_TYPE,
qh.IS_CLIENT_GENERATED_STATEMENT,
qh.QUERY_LOAD_PERCENT,
qh.RELEASE_VERSION,
CURRENT_TIMESTAMP() as LAST_UPDATED
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
LEFT JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wm
ON qh.WAREHOUSE_ID = wm.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wm.START_TIME)
WHERE qh.START_TIME >= DATEADD('day', -30, CURRENT_TIMESTAMP())
ORDER BY qh.START_TIME DESC;
-- ==================== INDEXES FOR PERFORMANCE ====================
CREATE INDEX IF NOT EXISTS idx_warehouse_analytics_name ON ANALYTICS_DB.ANALYTICS_SCHEMA.WAREHOUSE_ANALYTICS(WAREHOUSE_NAME);
CREATE INDEX IF NOT EXISTS idx_user_analytics_name ON ANALYTICS_DB.ANALYTICS_SCHEMA.USER_ANALYTICS(USER_NAME);
CREATE INDEX IF NOT EXISTS idx_query_summary_warehouse ON ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_SUMMARY(WAREHOUSE_NAME);
CREATE INDEX IF NOT EXISTS idx_query_summary_user ON ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_SUMMARY(USER_NAME);
CREATE INDEX IF NOT EXISTS idx_query_summary_start_time ON ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_SUMMARY(START_TIME);
CREATE INDEX IF NOT EXISTS idx_query_details_id ON ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_DETAILS(QUERY_ID);
-- ==================== REFRESH PROCEDURE ====================
CREATE OR REPLACE PROCEDURE ANALYTICS_DB.ANALYTICS_SCHEMA.REFRESH_ANALYTICS_TABLES()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
-- Refresh all analytics tables
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.WAREHOUSE_ANALYTICS AS (
-- [Insert the warehouse analytics query here]
);
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.USER_ANALYTICS AS (
-- [Insert the user analytics query here]
);
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_SUMMARY AS (
-- [Insert the query summary query here]
);
CREATE OR REPLACE TABLE ANALYTICS_DB.ANALYTICS_SCHEMA.QUERY_DETAILS AS (
-- [Insert the query details query here]
);
RETURN 'Analytics tables refreshed successfully at ' || CURRENT_TIMESTAMP();
END;
$$;
-- Schedule the refresh to run daily
CREATE OR REPLACE TASK ANALYTICS_DB.ANALYTICS_SCHEMA.REFRESH_ANALYTICS_TASK
WAREHOUSE = 'ANALYTICS_WH'
SCHEDULE = 'USING CRON 0 6 * * * UTC' -- Daily at 6 AM UTC
AS
CALL ANALYTICS_DB.ANALYTICS_SCHEMA.REFRESH_ANALYTICS_TABLES();
-- Start the task
ALTER TASK ANALYTICS_DB.ANALYTICS_SCHEMA.REFRESH_ANALYTICS_TASK RESUME;
from flask import Flask, jsonify, request
from datetime import datetime, timedelta
import json
app = Flask(__name__)
class SnowflakeAnalytics:
def __init__(self, cursor):
self.cursor = cursor
def execute_query(self, query, params=None):
"""Execute query and return results as list of dictionaries"""
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
columns = [desc[0] for desc in self.cursor.description]
results = []
for row in self.cursor.fetchall():
results.append(dict(zip(columns, row)))
return results
except Exception as e:
return {"error": str(e)}
# ==================== MAIN WAREHOUSE SUMMARY ====================
def get_warehouse_summary(self, days_back=30):
"""Main warehouse summary with performance buckets and bad practices"""
query = f"""
WITH warehouse_stats AS (
SELECT
wh.WAREHOUSE_NAME,
COUNT(qh.QUERY_ID) as TOTAL_QUERIES,
ROUND(SUM(wh.CREDITS_USED), 2) as TOTAL_CREDITS_CONSUMED,
COUNT(DISTINCT DATE(qh.START_TIME)) as ACTIVE_DAYS,
ROUND(COUNT(DISTINCT HOUR(qh.START_TIME)) / COUNT(DISTINCT DATE(qh.START_TIME)), 1) as ACTIVE_HOURS_PER_DAY,
-- Performance Buckets
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME <= 1000 THEN 1 ELSE 0 END) as QUERIES_0_TO_1_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 1000 AND qh.TOTAL_ELAPSED_TIME <= 10000 THEN 1 ELSE 0 END) as QUERIES_1_TO_10_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 10000 AND qh.TOTAL_ELAPSED_TIME <= 30000 THEN 1 ELSE 0 END) as QUERIES_10_TO_30_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 30000 AND qh.TOTAL_ELAPSED_TIME <= 60000 THEN 1 ELSE 0 END) as QUERIES_30_TO_60_SEC,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 60000 AND qh.TOTAL_ELAPSED_TIME <= 300000 THEN 1 ELSE 0 END) as QUERIES_1_TO_5_MIN,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME > 300000 THEN 1 ELSE 0 END) as QUERIES_5_MIN_PLUS,
-- Bad Practice Categories
SUM(CASE WHEN qh.WAREHOUSE_SIZE = 'X-SMALL' AND qh.BYTES_SCANNED > 1000000000 THEN 1 ELSE 0 END) as OVER_PROVISIONED_QUERIES,
SUM(CASE WHEN qh.BYTES_SPILLED_TO_LOCAL_STORAGE > 0 THEN 1 ELSE 0 END) as SPILLED_TO_LOCAL_QUERIES,
SUM(CASE WHEN qh.BYTES_SPILLED_TO_REMOTE_STORAGE > 0 THEN 1 ELSE 0 END) as SPILLED_TO_REMOTE_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%SELECT *%' THEN 1 ELSE 0 END) as SELECT_STAR_QUERIES,
SUM(CASE WHEN qh.PARTITIONS_TOTAL > 0 AND qh.PERCENTAGE_SCANNED_FROM_CACHE < 10 THEN 1 ELSE 0 END) as UNPARTITIONED_SCAN_QUERIES,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%CROSS JOIN%' OR UPPER(qh.QUERY_TEXT) LIKE '%CARTESIAN%' THEN 1 ELSE 0 END) as CARTESIAN_JOIN_QUERIES,
SUM(CASE WHEN qh.ROWS_PRODUCED = 0 AND qh.BYTES_SCANNED > 0 THEN 1 ELSE 0 END) as ZERO_RESULT_QUERIES,
SUM(CASE WHEN qh.EXECUTION_STATUS = 'FAIL' OR qh.EXECUTION_STATUS = 'CANCELLED' THEN 1 ELSE 0 END) as FAILED_CANCELLED_QUERIES,
-- Additional Anti-Patterns
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%ORDER BY%' AND UPPER(qh.QUERY_TEXT) NOT LIKE '%LIMIT%' AND qh.ROWS_PRODUCED > 100000 THEN 1 ELSE 0 END) as UNLIMITED_ORDER_BY,
SUM(CASE WHEN UPPER(qh.QUERY_TEXT) LIKE '%DISTINCT%' AND qh.BYTES_SCANNED > 1000000000 THEN 1 ELSE 0 END) as EXPENSIVE_DISTINCTS,
SUM(CASE WHEN qh.COMPILATION_TIME > 5000 THEN 1 ELSE 0 END) as HIGH_COMPILE_TIME,
SUM(CASE WHEN DAYOFWEEK(qh.START_TIME) IN (1, 7) THEN 1 ELSE 0 END) as WEEKEND_QUERIES,
SUM(CASE WHEN HOUR(qh.START_TIME) < 6 OR HOUR(qh.START_TIME) > 22 THEN 1 ELSE 0 END) as OFF_HOURS_USAGE,
SUM(CASE WHEN qh.TOTAL_ELAPSED_TIME < 100 AND wh.CREDITS_USED > 0.01 THEN 1 ELSE 0 END) as SMALL_QUERY_OVERHEAD
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
ON qh.WAREHOUSE_ID = wh.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wh.START_TIME)
WHERE qh.START_TIME >= DATEADD('day', -{days_back}, CURRENT_TIMESTAMP())
AND qh.WAREHOUSE_ID IS NOT NULL
GROUP BY wh.WAREHOUSE_NAME
)
SELECT * FROM warehouse_stats
ORDER BY TOTAL_CREDITS_CONSUMED DESC
"""
return self.execute_query(query)
# ==================== DRILL-DOWN: USERS BY WAREHOUSE ====================
def get_warehouse_users(self, warehouse_name, days_back=30):
"""Get users who ran queries on specific warehouse"""
query = f"""
SELECT
qh.USER_NAME,
COUNT(qh.QUERY_ID) as QUERY_COUNT,
ROUND(SUM(wh.CREDITS_USED), 2) as CREDITS_USED,
ROUND(AVG(qh.TOTAL_ELAPSED_TIME/1000), 2) as AVG_DURATION_SEC,
MAX(qh.START_TIME) as LAST_QUERY_TIME,
SUM(CASE WHEN qh.EXECUTION_STATUS = 'SUCCESS' THEN 1 ELSE 0 END) as SUCCESSFUL_QUERIES,
SUM(CASE WHEN qh.EXECUTION_STATUS != 'SUCCESS' THEN 1 ELSE 0 END) as FAILED_QUERIES
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
ON qh.WAREHOUSE_ID = wh.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wh.START_TIME)
WHERE wh.WAREHOUSE_NAME = %s
AND qh.START_TIME >= DATEADD('day', -{days_back}, CURRENT_TIMESTAMP())
GROUP BY qh.USER_NAME
ORDER BY CREDITS_USED DESC
"""
return self.execute_query(query, (warehouse_name,))
# ==================== DRILL-DOWN: PERFORMANCE BUCKET DETAILS ====================
def get_performance_bucket_queries(self, warehouse_name, bucket_type, days_back=30):
"""Get queries in specific performance bucket"""
time_conditions = {
'0_to_1_sec': 'qh.TOTAL_ELAPSED_TIME <= 1000',
'1_to_10_sec': 'qh.TOTAL_ELAPSED_TIME > 1000 AND qh.TOTAL_ELAPSED_TIME <= 10000',
'10_to_30_sec': 'qh.TOTAL_ELAPSED_TIME > 10000 AND qh.TOTAL_ELAPSED_TIME <= 30000',
'30_to_60_sec': 'qh.TOTAL_ELAPSED_TIME > 30000 AND qh.TOTAL_ELAPSED_TIME <= 60000',
'1_to_5_min': 'qh.TOTAL_ELAPSED_TIME > 60000 AND qh.TOTAL_ELAPSED_TIME <= 300000',
'5_min_plus': 'qh.TOTAL_ELAPSED_TIME > 300000'
}
condition = time_conditions.get(bucket_type, 'qh.TOTAL_ELAPSED_TIME > 0')
query = f"""
SELECT
qh.USER_NAME,
COUNT(qh.QUERY_ID) as QUERY_COUNT,
ROUND(AVG(qh.TOTAL_ELAPSED_TIME/1000), 2) as AVG_DURATION_SEC,
ROUND(SUM(qh.BYTES_SCANNED)/1024/1024/1024, 2) as TOTAL_GB_SCANNED,
ROUND(SUM(wh.CREDITS_USED), 2) as CREDITS_USED
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
ON qh.WAREHOUSE_ID = wh.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wh.START_TIME)
WHERE wh.WAREHOUSE_NAME = %s
AND qh.START_TIME >= DATEADD('day', -{days_back}, CURRENT_TIMESTAMP())
AND {condition}
GROUP BY qh.USER_NAME
ORDER BY QUERY_COUNT DESC
"""
return self.execute_query(query, (warehouse_name,))
# ==================== DRILL-DOWN: BAD PRACTICE DETAILS ====================
def get_bad_practice_queries(self, warehouse_name, practice_type, days_back=30):
"""Get queries with specific bad practices"""
practice_conditions = {
'over_provisioned': "qh.WAREHOUSE_SIZE = 'X-SMALL' AND qh.BYTES_SCANNED > 1000000000",
'spilled_local': "qh.BYTES_SPILLED_TO_LOCAL_STORAGE > 0",
'spilled_remote': "qh.BYTES_SPILLED_TO_REMOTE_STORAGE > 0",
'select_star': "UPPER(qh.QUERY_TEXT) LIKE '%SELECT *%'",
'unpartitioned_scan': "qh.PARTITIONS_TOTAL > 0 AND qh.PERCENTAGE_SCANNED_FROM_CACHE < 10",
'cartesian_join': "UPPER(qh.QUERY_TEXT) LIKE '%CROSS JOIN%' OR UPPER(qh.QUERY_TEXT) LIKE '%CARTESIAN%'",
'zero_result': "qh.ROWS_PRODUCED = 0 AND qh.BYTES_SCANNED > 0",
'failed_cancelled': "qh.EXECUTION_STATUS = 'FAIL' OR qh.EXECUTION_STATUS = 'CANCELLED'",
'unlimited_order_by': "UPPER(qh.QUERY_TEXT) LIKE '%ORDER BY%' AND UPPER(qh.QUERY_TEXT) NOT LIKE '%LIMIT%' AND qh.ROWS_PRODUCED > 100000",
'expensive_distinct': "UPPER(qh.QUERY_TEXT) LIKE '%DISTINCT%' AND qh.BYTES_SCANNED > 1000000000",
'high_compile_time': "qh.COMPILATION_TIME > 5000",
'weekend_queries': "DAYOFWEEK(qh.START_TIME) IN (1, 7)",
'off_hours': "HOUR(qh.START_TIME) < 6 OR HOUR(qh.START_TIME) > 22",
'small_query_overhead': "qh.TOTAL_ELAPSED_TIME < 100 AND wh.CREDITS_USED > 0.01"
}
condition = practice_conditions.get(practice_type, '1=1')
query = f"""
SELECT
qh.USER_NAME,
COUNT(qh.QUERY_ID) as ISSUE_COUNT,
ROUND(SUM(qh.BYTES_SCANNED)/1024/1024/1024, 2) as TOTAL_GB_SCANNED,
ROUND(AVG(qh.TOTAL_ELAPSED_TIME/1000), 2) as AVG_DURATION_SEC,
ROUND(SUM(wh.CREDITS_USED), 2) as WASTED_CREDITS,
MAX(qh.START_TIME) as LAST_OCCURRENCE
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
ON qh.WAREHOUSE_ID = wh.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wh.START_TIME)
WHERE wh.WAREHOUSE_NAME = %s
AND qh.START_TIME >= DATEADD('day', -{days_back}, CURRENT_TIMESTAMP())
AND {condition}
GROUP BY qh.USER_NAME
ORDER BY ISSUE_COUNT DESC
"""
return self.execute_query(query, (warehouse_name,))
# ==================== DRILL-DOWN: USER'S QUERIES ====================
def get_user_queries(self, warehouse_name, user_name, practice_type=None, days_back=30, limit=100):
"""Get specific user's queries with optional filtering by practice type"""
practice_conditions = {
'over_provisioned': "AND qh.WAREHOUSE_SIZE = 'X-SMALL' AND qh.BYTES_SCANNED > 1000000000",
'spilled_local': "AND qh.BYTES_SPILLED_TO_LOCAL_STORAGE > 0",
'spilled_remote': "AND qh.BYTES_SPILLED_TO_REMOTE_STORAGE > 0",
'select_star': "AND UPPER(qh.QUERY_TEXT) LIKE '%SELECT *%'",
'unpartitioned_scan': "AND qh.PARTITIONS_TOTAL > 0 AND qh.PERCENTAGE_SCANNED_FROM_CACHE < 10",
'cartesian_join': "AND (UPPER(qh.QUERY_TEXT) LIKE '%CROSS JOIN%' OR UPPER(qh.QUERY_TEXT) LIKE '%CARTESIAN%')",
'zero_result': "AND qh.ROWS_PRODUCED = 0 AND qh.BYTES_SCANNED > 0",
'failed_cancelled': "AND (qh.EXECUTION_STATUS = 'FAIL' OR qh.EXECUTION_STATUS = 'CANCELLED')"
}
additional_condition = practice_conditions.get(practice_type, '') if practice_type else ''
query = f"""
SELECT
qh.QUERY_ID,
qh.START_TIME,
qh.EXECUTION_STATUS,
ROUND(qh.TOTAL_ELAPSED_TIME/1000, 2) as DURATION_SEC,
ROUND(qh.BYTES_SCANNED/1024/1024/1024, 2) as GB_SCANNED,
qh.ROWS_PRODUCED,
qh.WAREHOUSE_SIZE,
ROUND(wh.CREDITS_USED, 4) as CREDITS_USED,
qh.ERROR_CODE,
qh.ERROR_MESSAGE,
LEFT(qh.QUERY_TEXT, 200) as QUERY_PREVIEW
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
JOIN SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wh
ON qh.WAREHOUSE_ID = wh.WAREHOUSE_ID
AND DATE(qh.START_TIME) = DATE(wh.START_TIME)
WHERE wh.WAREHOUSE_NAME = %s
AND qh.USER_NAME = %s
AND qh.START_TIME >= DATEADD('day', -{days_back}, CURRENT_TIMESTAMP())
{additional_condition}
ORDER BY qh.START_TIME DESC
LIMIT {limit}
"""
return self.execute_query(query, (warehouse_name, user_name))
# ==================== DRILL-DOWN: QUERY DETAILS ====================
def get_query_details(self, query_id):
"""Get complete details for a specific query"""
query = """
SELECT
qh.QUERY_ID,
qh.QUERY_TEXT,
qh.USER_NAME,
qh.ROLE_NAME,
qh.WAREHOUSE_NAME,
qh.WAREHOUSE_SIZE,
qh.DATABASE_NAME,
qh.SCHEMA_NAME,
qh.START_TIME,
qh.END_TIME,
qh.EXECUTION_STATUS,
qh.ERROR_CODE,
qh.ERROR_MESSAGE,
-- Performance Metrics
ROUND(qh.TOTAL_ELAPSED_TIME/1000, 2) as TOTAL_DURATION_SEC,
ROUND(qh.COMPILATION_TIME/1000, 2) as COMPILATION_SEC,
ROUND(qh.EXECUTION_TIME/1000, 2) as EXECUTION_SEC,
ROUND(qh.QUEUED_PROVISIONING_TIME/1000, 2) as QUEUED_PROVISIONING_SEC,
ROUND(qh.QUEUED_REPAIR_TIME/1000, 2) as QUEUED_REPAIR_SEC,
ROUND(qh.QUEUED_OVERLOAD_TIME/1000, 2) as QUEUED_OVERLOAD_SEC,
-- Data Metrics
ROUND(qh.BYTES_SCANNED/1024/1024/1024, 2) as GB_SCANNED,
ROUND(qh.BYTES_WRITTEN/1024/1024/1024, 2) as GB_WRITTEN,
qh.ROWS_PRODUCED,
qh.ROWS_INSERTED,
qh.ROWS_UPDATED,
qh.ROWS_DELETED,
qh.PARTITIONS_SCANNED,
qh.PARTITIONS_TOTAL,
qh.PERCENTAGE_SCANNED_FROM_CACHE,
-- Resource Usage
ROUND(qh.BYTES_SPILLED_TO_LOCAL_STORAGE/1024/1024/1024, 2) as GB_SPILLED_LOCAL,
ROUND(qh.BYTES_SPILLED_TO_REMOTE_STORAGE/1024/1024/1024, 2) as GB_SPILLED_REMOTE,
ROUND(qh.BYTES_SENT_OVER_THE_NETWORK/1024/1024/1024, 2) as GB_NETWORK,
qh.CREDITS_USED_CLOUD_SERVICES,
-- Query Analysis
qh.QUERY_TYPE,
qh.QUERY_TAG,
qh.TRANSACTION_ID,
qh.SESSION_ID
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
WHERE qh.QUERY_ID = %s
"""
result = self.execute_query(query, (query_id,))
return result[0] if result else None
# ==================== FLASK API ENDPOINTS ====================
# Global variable to store cursor (will be injected)
analytics = None
def init_analytics(cursor):
"""Initialize analytics with cursor"""
global analytics
analytics = SnowflakeAnalytics(cursor)
@app.route('/api/warehouse-summary')
def warehouse_summary():
"""Main warehouse summary endpoint"""
days_back = request.args.get('days_back', 30, type=int)
result = analytics.get_warehouse_summary(days_back)
return jsonify(result)
@app.route('/api/warehouse/<warehouse_name>/users')
def warehouse_users(warehouse_name):
"""Get users for specific warehouse"""
days_back = request.args.get('days_back', 30, type=int)
result = analytics.get_warehouse_users(warehouse_name, days_back)
return jsonify(result)
@app.route('/api/warehouse/<warehouse_name>/performance/<bucket_type>')
def performance_bucket_queries(warehouse_name, bucket_type):
"""Get queries in performance bucket"""
days_back = request.args.get('days_back', 30, type=int)
result = analytics.get_performance_bucket_queries(warehouse_name, bucket_type, days_back)
return jsonify(result)
@app.route('/api/warehouse/<warehouse_name>/bad-practices/<practice_type>')
def bad_practice_queries(warehouse_name, practice_type):
"""Get queries with bad practices"""
days_back = request.args.get('days_back', 30, type=int)
result = analytics.get_bad_practice_queries(warehouse_name, practice_type, days_back)
return jsonify(result)
@app.route('/api/warehouse/<warehouse_name>/user/<user_name>/queries')
def user_queries(warehouse_name, user_name):
"""Get specific user's queries"""
days_back = request.args.get('days_back', 30, type=int)
practice_type = request.args.get('practice_type')
limit = request.args.get('limit', 100, type=int)
result = analytics.get_user_queries(warehouse_name, user_name, practice_type, days_back, limit)
return jsonify(result)
@app.route('/api/query/<query_id>/details')
def query_details(query_id):
"""Get complete query details"""
result = analytics.get_query_details(query_id)
return jsonify(result)
# ==================== UTILITY ENDPOINTS ====================
@app.route('/api/health')
def health_check():
"""Health check endpoint"""
return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()})
@app.route('/api/endpoints')
def list_endpoints():
"""List all available API endpoints"""
endpoints = {
"warehouse_summary": "/api/warehouse-summary?days_back=30",
"warehouse_users": "/api/warehouse/<warehouse_name>/users?days_back=30",
"performance_buckets": "/api/warehouse/<warehouse_name>/performance/<bucket_type>?days_back=30",
"bad_practices": "/api/warehouse/<warehouse_name>/bad-practices/<practice_type>?days_back=30",
"user_queries": "/api/warehouse/<warehouse_name>/user/<user_name>/queries?days_back=30&practice_type=<optional>&limit=100",
"query_details": "/api/query/<query_id>/details",
"performance_bucket_types": [
"0_to_1_sec", "1_to_10_sec", "10_to_30_sec",
"30_to_60_sec", "1_to_5_min", "5_min_plus"
],
"bad_practice_types": [
"over_provisioned", "spilled_local", "spilled_remote", "select_star",
"unpartitioned_scan", "cartesian_join", "zero_result", "failed_cancelled",
"unlimited_order_by", "expensive_distinct", "high_compile_time",
"weekend_queries", "off_hours", "small_query_overhead"
]
}
return jsonify(endpoints)
if __name__ == '__main__':
# Example usage:
# cursor = your_snowflake_cursor_here
# init_analytics(cursor)
# app.run(debug=True, host='0.0.0.0', port=5000)
print("Snowflake Analytics API Server")
print("Initialize with: init_analytics(your_cursor)")
print("Available endpoints at: http://localhost:5000/api/endpoints")
app.run(debug=True, host='0.0.0.0', port=5000)
Top comments (0)