import streamlit as st
import pandas as pd
import numpy as np # Keep for potential utility, though random data generation is removed from core functions
import plotly.express as px
from datetime import datetime, timedelta
# Import Snowpark Session for Snowflake Native App
from snowflake.snowpark.context import get_active_session
# --- Page Configuration ---
# Sets the page layout to wide and gives the app a title
st.set_page_config(layout="wide", page_title="Snowflake FinOps Dashboard")
# --- Helper Functions (Updated for Real Data with Snowpark) ---
# In a real Snowflake Native App, these functions execute Snowflake queries
# using the Snowpark Session object to fetch actual cost and performance data.
def get_daily_credit_consumption(session, time_range="Last 30 Days", start_date=None, end_date=None):
"""
Fetches daily credit consumption data from Snowflake.
This function will query SNOWFLAKE.ACCOUNT_USAGE.METERING_HISTORY.
"""
if session is None:
st.warning("Snowpark session not available. Cannot fetch real credit consumption data.")
return pd.DataFrame({'DATE': [], 'CREDITS': []})
if time_range == "Custom" and start_date and end_date:
# Ensure dates are in YYYY-MM-DD format for SQL query
start_date_str = start_date.isoformat()
end_date_str = end_date.isoformat()
else:
today = datetime.now().date()
if time_range == "Last 7 Days":
start_date_str = (today - timedelta(days=7)).isoformat()
elif time_range == "Last 30 Days":
start_date_str = (today - timedelta(days=30)).isoformat()
else: # Defaulting to last 90 days if no custom range is selected
start_date_str = (today - timedelta(days=90)).isoformat()
end_date_str = today.isoformat()
query = f"""
SELECT
USAGE_DATE::DATE AS DATE,
SUM(CONSUMED_CREDITS) AS CREDITS
FROM
SNOWFLAKE.ACCOUNT_USAGE.METERING_HISTORY
WHERE
USAGE_DATE BETWEEN '{start_date_str}' AND '{end_date_str}'
GROUP BY
USAGE_DATE
ORDER BY
USAGE_DATE;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching daily credit consumption: {e}")
return pd.DataFrame({'DATE': [], 'CREDITS': []})
if df.empty:
st.warning("No credit consumption data found for the selected range.")
return df
def get_top_n_users_roles(session, cost_type="All Credits", n=10):
"""
Fetches top users/roles by spend from Snowflake.
This query aggregates query costs by user and role.
Note: Direct per-query credit cost is complex. This uses a simplified calculation.
"""
if session is None:
st.warning("Snowpark session not available. Cannot fetch real top users/roles data.")
return pd.DataFrame(columns=['USER', 'ROLE', 'COST', '% SHARE'])
# Simplified cost calculation based on elapsed time and warehouse size.
# For a more precise cost, you'd need to join with billing data or use a more complex model.
query = f"""
SELECT
qh.USER_NAME AS USER,
qh.ROLE_NAME AS ROLE,
-- Simplified cost calculation: total_elapsed_time * (credits_per_second_for_size)
SUM(qh.TOTAL_ELAPSED_TIME / 1000 *
CASE qh.WAREHOUSE_SIZE
WHEN 'X-SMALL' THEN 0.0003 -- Example credit rate per second
WHEN 'SMALL' THEN 0.0006
WHEN 'MEDIUM' THEN 0.0012
WHEN 'LARGE' THEN 0.0024
WHEN 'X-LARGE' THEN 0.0048
WHEN '2X-LARGE' THEN 0.0096
WHEN '3X-LARGE' THEN 0.0192
WHEN '4X-LARGE' THEN 0.0384
WHEN '5X-LARGE' THEN 0.0768
WHEN '6X-LARGE' THEN 0.1536
ELSE 0.0003 -- Default for unknown sizes
END
) AS COST
FROM
SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY qh
WHERE
qh.START_TIME >= DATEADD(day, -30, CURRENT_TIMESTAMP()) -- Last 30 days
AND qh.WAREHOUSE_SIZE IS NOT NULL
GROUP BY
1, 2
ORDER BY
COST DESC
LIMIT {n};
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching top users/roles data: {e}")
return pd.DataFrame(columns=['USER', 'ROLE', 'COST', '% SHARE'])
if not df.empty:
df['% SHARE'] = (df['COST'] / df['COST'].sum()) * 100
df = df.sort_values(by='COST', ascending=False).head(n)
else:
st.warning("No top users/roles data found.")
df = pd.DataFrame(columns=['USER', 'ROLE', 'COST', '% SHARE'])
return df
def get_runaway_queries(session):
"""
Detects runaway queries based on high duration thresholds from Snowflake.
Estimates cost for detected queries.
"""
if session is None:
return []
# Get recent running queries that exceed a duration threshold
query = f"""
SELECT
QUERY_ID,
USER_NAME AS USER,
TOTAL_ELAPSED_TIME / 1000 AS DURATION_SECONDS,
EXECUTION_STATUS,
QUERY_TEXT AS SQL_SNIPPET,
WAREHOUSE_SIZE
FROM
SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE
TOTAL_ELAPSED_TIME > 180000 -- Example: queries running longer than 3 minutes (180 seconds)
AND EXECUTION_STATUS = 'RUNNING' -- Only look for currently running queries
AND START_TIME >= DATEADD(hour, -1, CURRENT_TIMESTAMP()) -- Check queries started in the last hour
ORDER BY
TOTAL_ELAPSED_TIME DESC
LIMIT 5;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching runaway queries: {e}")
return []
if not df.empty:
# Estimate cost for runaway queries based on size and duration
def estimate_cost(row):
size_to_credit_rate = {
'X-SMALL': 0.0003, 'SMALL': 0.0006, 'MEDIUM': 0.0012, 'LARGE': 0.0024,
'X-LARGE': 0.0048, '2X-LARGE': 0.0096, '3X-LARGE': 0.0192,
'4X-LARGE': 0.0384, '5X-LARGE': 0.0768, '6X-LARGE': 0.1536
}
rate = size_to_credit_rate.get(row['WAREHOUSE_SIZE'], 0.0003)
return row['DURATION_SECONDS'] * rate
df['COST'] = df.apply(estimate_cost, axis=1)
# Rename columns to match expected dictionary keys in the UI
df.rename(columns={
'USER': 'user', 'DURATION_SECONDS': 'duration_seconds',
'COST': 'cost', 'QUERY_ID': 'query_id', 'SQL_SNIPPET': 'sql_snippet'
}, inplace=True)
return df[['query_id', 'cost', 'user', 'sql_snippet']].to_dict(orient='records')
return []
def get_long_running_queries(session):
"""
Identifies long-running queries (not necessarily runaway, but inefficient)
that completed recently from Snowflake.
"""
if session is None:
return []
query = """
SELECT
QUERY_ID,
USER_NAME AS USER,
TOTAL_ELAPSED_TIME / 1000 AS DURATION_SECONDS,
QUERY_TEXT AS SQL_SNIPPET,
WAREHOUSE_SIZE,
START_TIME
FROM
SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE
TOTAL_ELAPSED_TIME > 60000 -- Queries longer than 60 seconds (1 minute)
AND TOTAL_ELAPSED_TIME < 180000 -- But less than runaway threshold (3 minutes)
AND START_TIME >= DATEADD(day, -7, CURRENT_TIMESTAMP()) -- Last 7 days
AND EXECUTION_STATUS = 'SUCCESS' -- Only completed queries
ORDER BY
TOTAL_ELAPSED_TIME DESC
LIMIT 5;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching long-running queries: {e}")
return []
if not df.empty:
# Estimate cost similarly to runaway queries for consistency
def estimate_cost(row):
size_to_credit_rate = {
'X-SMALL': 0.0003, 'SMALL': 0.0006, 'MEDIUM': 0.0012, 'LARGE': 0.0024,
'X-LARGE': 0.0048, '2X-LARGE': 0.0096, '3XX-LARGE': 0.0192,
'4X-LARGE': 0.0384, '5X-LARGE': 0.0768, '6X-LARGE': 0.1536
}
rate = size_to_credit_rate.get(row['WAREHOUSE_SIZE'], 0.0003)
return row['DURATION_SECONDS'] * rate
df['COST'] = df.apply(estimate_cost, axis=1)
df.rename(columns={
'USER': 'user', 'DURATION_SECONDS': 'duration_seconds',
'COST': 'cost', 'QUERY_ID': 'query_id', 'SQL_SNIPPET': 'sql_snippet'
}, inplace=True)
return df[['query_id', 'cost', 'user', 'sql_snippet', 'duration_seconds']].to_dict(orient='records')
return []
def get_over_provisioned_warehouses(session):
"""
Identifies over-provisioned warehouses based on low utilization from Snowflake.
"""
if session is None:
return []
query = """
WITH WarehouseUsage AS (
SELECT
WAREHOUSE_NAME,
DATE_TRUNC('hour', START_TIME) AS HOUR,
SUM(CASE WHEN WAREHOUSE_SIZE IS NOT NULL THEN 1 ELSE 0 END) AS ACTIVE_SECONDS_IN_HOUR,
SUM(CREDITS_USED) AS CREDITS_USED_PER_HOUR
FROM
SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE
START_TIME >= DATEADD(day, -7, CURRENT_TIMESTAMP()) -- Last 7 days of data
GROUP BY
1, 2
),
WarehouseTotalActiveSeconds AS (
SELECT
WAREHOUSE_NAME,
SUM(ACTIVE_SECONDS_IN_HOUR) AS TOTAL_ACTIVE_SECONDS
FROM
WarehouseUsage
GROUP BY 1
),
WarehouseTotalCredits AS (
SELECT
WAREHOUSE_NAME,
SUM(CREDITS_USED) AS TOTAL_CREDITS_USED
FROM
SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE
START_TIME >= DATEADD(day, -7, CURRENT_TIMESTAMP())
GROUP BY 1
)
SELECT
wmh.WAREHOUSE_NAME AS NAME,
COALESCE(wtc.TOTAL_CREDITS_USED, 0) AS COST,
(COALESCE(wtas.TOTAL_ACTIVE_SECONDS, 0) / (7 * 24 * 3600.0)) * 100 AS UTILIZATION_PERCENT, -- Total active seconds / (7 days * 24 hours * 3600 seconds) * 100
wh.WAREHOUSE_SIZE AS SIZE
FROM
SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY wmh
JOIN
SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSES wh ON wmh.WAREHOUSE_NAME = wh.WAREHOUSE_NAME
LEFT JOIN
WarehouseTotalActiveSeconds wtas ON wmh.WAREHOUSE_NAME = wtas.WAREHOUSE_NAME
LEFT JOIN
WarehouseTotalCredits wtc ON wmh.WAREHOUSE_NAME = wtc.WAREHOUSE_NAME
WHERE
wmh.START_TIME >= DATEADD(day, -7, CURRENT_TIMESTAMP()) -- Consistent date range
GROUP BY
wmh.WAREHOUSE_NAME, wh.WAREHOUSE_SIZE, wtas.TOTAL_ACTIVE_SECONDS, wtc.TOTAL_CREDITS_USED
HAVING
UTILIZATION_PERCENT < 30 -- Example: Warehouses with less than 30% utilization
ORDER BY
COST DESC;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching over-provisioned warehouses: {e}")
return []
if not df.empty:
df['UTILIZATION_PERCENT'] = df['UTILIZATION_PERCENT'].round(2)
return df.to_dict(orient='records')
return []
def get_stale_tables(session):
"""
Identifies large tables that haven't been accessed for a long time (stale tables).
"""
if session is None:
return []
query = """
SELECT
TABLE_CATALOG || '.' || TABLE_SCHEMA || '.' || TABLE_NAME AS TABLE_FULL_NAME,
TABLE_OWNER,
LAST_ALTERED,
ROW_COUNT,
BYTES / (1024*1024*1024) AS SIZE_GB -- Convert bytes to GB
-- Consider joining with ACCESS_HISTORY for last access, but it's often delayed/expensive
FROM
SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
WHERE
DELETED IS NULL -- Only active tables
AND TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA', 'PUBLIC', 'SNOWFLAKE') -- Exclude system/common schemas
AND LAST_ALTERED < DATEADD(month, -6, CURRENT_TIMESTAMP()) -- Not altered in last 6 months
AND BYTES > (1024*1024*1024*10) -- Only tables larger than 10GB
ORDER BY
BYTES DESC
LIMIT 5;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching stale tables: {e}")
return []
if not df.empty:
df['SIZE_GB'] = df['SIZE_GB'].round(2)
return df.to_dict(orient='records')
return []
def get_large_temp_tables(session):
"""
Identifies large transient/temporary tables that might indicate inefficient ETL patterns.
This is conceptual as direct 'temporary' table usage is not easily aggregated in ACCOUNT_USAGE.
Focuses on tables with short lifespans or specific naming conventions.
"""
if session is None:
return []
# This query is more illustrative. Identifying 'temporary' or 'transient' tables and their
# impact often requires custom logging or more advanced metadata analysis.
query = """
SELECT
DATABASE_NAME,
SCHEMA_NAME,
TABLE_NAME,
TABLE_TYPE,
BYTES / (1024*1024*1024) AS SIZE_GB,
CREATED
FROM
SNOWFLAKE.ACCOUNT_USAGE.TABLES
WHERE
TABLE_TYPE IN ('TEMPORARY', 'TRANSIENT')
AND CREATED >= DATEADD(day, -7, CURRENT_TIMESTAMP()) -- Created in last 7 days
AND BYTES > (1024*1024*1024) -- Larger than 1GB
ORDER BY
SIZE_GB DESC
LIMIT 5;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching large temporary/transient tables: {e}")
return []
if not df.empty:
df['SIZE_GB'] = df['SIZE_GB'].round(2)
return df.to_dict(orient='records')
return []
def get_caching_opportunities(session):
"""
Detects missed caching opportunities by analyzing query patterns from Snowflake.
This analyzes SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY.
"""
if session is None:
return []
query = """
SELECT
QUERY_TEXT AS PATTERN,
COUNT(*) AS FREQUENCY,
AVG(TOTAL_ELAPSED_TIME) AS AVG_DURATION_MS
FROM
SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE
START_TIME >= DATEADD(day, -7, CURRENT_TIMESTAMP())
AND BYTES_SCANNED > 0 -- Queries that actually scanned data
AND RESULT_SCAN_BYTES = 0 -- Not using result cache for the outcome
AND QUERY_TYPE = 'SELECT'
AND (
-- Look for common patterns that might benefit from caching/materialized views
CONTAINS(LOWER(QUERY_TEXT), 'count(*)') OR
CONTAINS(LOWER(QUERY_TEXT), 'group by') OR
CONTAINS(LOWER(QUERY_TEXT), 'join')
)
GROUP BY
PATTERN
HAVING
FREQUENCY > 5 -- More than 5 repetitions
AND AVG_DURATION_MS > 1000 -- Average duration > 1 second (to focus on non-trivial queries)
ORDER BY
FREQUENCY DESC, AVG_DURATION_MS DESC
LIMIT 3;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching caching opportunities: {e}")
return []
if not df.empty:
opportunities = []
for index, row in df.iterrows():
tip = ""
if 'count(*)' in row['PATTERN'].lower():
tip = "Consider using `APPROX_COUNT_DISTINCT` for approximations or creating a materialized view for frequently aggregated counts."
elif 'group by' in row['PATTERN'].lower():
tip = "If this aggregation is common and stable, consider a materialized view to pre-compute the results."
elif 'join' in row['PATTERN'].lower():
tip = "Review join keys and table clustering. Consider a materialized view for frequently joined static datasets."
else:
tip = "Analyze query structure for common patterns suitable for caching or view creation (e.g., repeated SELECTs)."
opportunities.append({"pattern": row['PATTERN'][:100] + ("..." if len(row['PATTERN']) > 100 else ""), "tip": tip}) # Truncate long patterns
return opportunities
return []
def get_inefficient_serverless_usage(session):
"""
(Placeholder) Identifies potential inefficiencies in serverless compute usage (e.g., Snowpipe, Tasks, Hybrid Tables).
Direct detailed cost breakdown for individual serverless components can be complex via ACCOUNT_USAGE alone.
This function illustrates the intent.
"""
if session is None:
return []
# Example: Looking for frequently executing tasks with long durations or high credit consumption (conceptual)
# A more robust solution might involve parsing TASK_HISTORY and correlating with costs.
query = """
SELECT
'TASK' AS TYPE,
TASK_NAME AS NAME,
SCHEMA_NAME,
DATABASE_NAME,
STATE,
COMPLETED_TIME,
SUM(EXECUTION_TIME) AS TOTAL_EXECUTION_TIME_MS
FROM
SNOWFLAKE.ACCOUNT_USAGE.TASK_HISTORY
WHERE
STATE = 'SUCCEEDED'
AND COMPLETED_TIME >= DATEADD(day, -7, CURRENT_TIMESTAMP())
GROUP BY 1,2,3,4,5,6
HAVING SUM(EXECUTION_TIME) > 60000 -- Tasks running longer than 1 minute on average
ORDER BY TOTAL_EXECUTION_TIME_MS DESC
LIMIT 3;
"""
try:
df = session.sql(query).to_pandas()
except Exception as e:
st.error(f"Error fetching serverless usage data: {e}")
return []
if not df.empty:
recommendations = []
for index, row in df.iterrows():
recommendations.append({
"type": row['TYPE'],
"name": row['NAME'],
"tip": f"Review {row['TYPE']} '{row['NAME']}' in {row['DATABASE_NAME']}.{row['SCHEMA_NAME']} due to long execution time ({row['TOTAL_EXECUTION_TIME_MS']/1000:.0f}s). Consider optimizing its SQL or scheduling."
})
return recommendations
return []
# --- Get the Snowpark Session ---
# In a Snowflake Native App, the Streamlit app is automatically provided with a Snowpark Session.
# You can retrieve it using get_active_session().
session = None
try:
session = get_active_session()
except Exception as e:
st.error(f"Could not get active Snowpark session. Ensure this is run in a Snowflake Native App context. Error: {e}")
st.info("The dashboard will display empty data since no active Snowflake session is available.")
# session remains None, triggering empty data returns from functions
# --- Custom CSS for Styling the Dashboard ---
# This CSS makes the dashboard look professional, applies colors to alerts,
# and ensures responsiveness.
st.markdown("""
<style>
/* Base app styling */
.stApp {
background-color: #F0F2F6; /* Light grey background for the entire app */
font-family: 'Inter', sans-serif; /* Apply Inter font globally */
color: #333333; /* Default text color */
}
/* Header and Title Styling */
h1, h2, h3, h4, h5, h6 {
color: #2C3E50; /* Darker grey for all headings */
font-family: 'Inter', sans-serif;
}
/* Card container styling */
.card {
border-radius: 10px;
padding: 20px;
margin-bottom: 20px;
box-shadow: 0 4px 8px 0 rgba(0,0,0,0.1); /* Subtle shadow for depth */
transition: 0.3s; /* Smooth transition on hover */
}
.card:hover {
box-shadow: 0 8px 16px 0 rgba(0,0,0,0.2); /* Enhanced shadow on hover */
}
/* Specific color-coded card styles */
.card-red {
background-color: #F8D7DA; /* Light red background */
border-left: 5px solid #DC3545; /* Red left border for emphasis */
color: #DC3545; /* Red text for primary info */
}
.card-orange {
background-color: #FFF3CD; /* Light orange background */
border-left: 5px solid #FFC107; /* Orange left border */
color: #FFC107; /* Orange text */
}
.card-blue {
background-color: #D1ECF1; /* Light blue background */
border-left: 5px solid #17A2B8; /* Blue left border */
color: #17A2B8; /* Blue text */
}
/* Card content styling */
.card-title {
font-size: 1.2em;
font-weight: bold;
margin-bottom: 10px;
color: #2C3E50; /* Dark heading for card titles */
}
.card-content {
font-size: 0.9em;
margin-bottom: 5px;
color: #555555; /* Slightly darker text for content within cards */
}
.card-content strong {
color: #000000; /* Black for key information like IDs, Costs, Users */
}
/* Streamlit Button Styling (targets actual st.button elements) */
div.stButton > button {
background-color: #007BFF; /* Primary blue button color */
color: white;
border-radius: 8px; /* More rounded corners for buttons */
border: none;
padding: 10px 20px;
font-size: 0.9em;
cursor: pointer;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
transition: background-color 0.2s, box-shadow 0.2s;
width: 100%; /* Make buttons fill the width of their container */
margin-top: 15px; /* Space above buttons */
}
div.stButton > button:hover {
background-color: #0056b3; /* Darker blue on hover */
box-shadow: 0 4px 8px rgba(0,0,0,0.2);
}
/* Chart container styling (Plotly charts will inherit this) */
.chart-container {
background-color: white;
border-radius: 10px;
box-shadow: 0 4px 8px 0 rgba(0,0,0,0.1);
padding: 20px;
margin-bottom: 30px;
}
/* Horizontal Rule for section separation */
hr {
border: 0;
height: 1px;
background-image: linear-gradient(to right, rgba(0, 0, 0, 0), rgba(0, 0, 0, 0.15), rgba(0, 0, 0, 0));
margin: 40px 0; /* Add more vertical space around separators */
}
/* Ensure responsiveness for columns - targets Streamlit's internal column divs */
/* Note: These classes might vary slightly with Streamlit updates, but generally target flex containers */
.st-emotion-cache-h5rpjc, .st-emotion-cache-1r6hotw { /* These are common class names for columns */
display: flex;
flex-wrap: wrap; /* Allows columns to wrap on smaller screens */
gap: 20px; /* Space between columns */
}
.st-emotion-cache-1r6hotw > div { /* Target individual column divs to ensure padding/margin */
flex: 1; /* Distribute space equally initially */
min-width: 280px; /* Minimum width before wrapping to next line */
}
/* Specific styling for dataframe */
.stDataFrame {
border-radius: 10px;
box_shadow: 0 4px 8px 0 rgba(0,0,0,0.1);
padding: 10px;
}
</style>
""", unsafe_allow_html=True) # Allow Streamlit to render custom HTML/CSS
# --- Dashboard Title and Description ---
st.title("π° Snowflake FinOps & Performance Optimization Dashboard")
st.markdown("A comprehensive overview of your Snowflake account costs and performance, designed for immediate action and long-term optimization.")
# --- Global Filters ---
st.header("π Global Filters")
# Use st.columns to lay out filters horizontally for a clean look
filter_col1, filter_col2, filter_col3 = st.columns([1, 1, 3]) # Adjust column widths for better spacing
with filter_col1:
time_range_options = ["Last 7 Days", "Last 30 Days", "Custom"]
selected_time_range = st.selectbox(
"Select Time Range",
time_range_options,
help="Choose a predefined time range or select 'Custom' to specify dates."
)
with filter_col2:
cost_type_options = ["All Credits", "Compute", "Storage", "Data Transfer"]
selected_cost_type = st.selectbox(
"Select Cost Type",
cost_type_options,
help="Filter cost data by specific types (e.g., Compute, Storage)."
)
with filter_col3:
start_date, end_date = None, None
if selected_time_range == "Custom":
today = datetime.now().date()
# Default custom range to last 30 days
start_date = st.date_input("Start Date", value=today - timedelta(days=30), max_value=today)
end_date = st.date_input("End Date", value=today, max_value=today)
if start_date and end_date and start_date > end_date:
st.error("Error: End date must be after start date.")
start_date, end_date = None, None
else:
st.empty() # Ensures space is not taken if custom range is not selected
st.markdown("---") # Section Separator for visual clarity
# --- Immediate Action Zone ---
st.header("π¨ Immediate Action Zone")
st.markdown("High-urgency alerts and actionable recommendations for cost and performance optimization, categorized by Snowflake resource type.")
# --- Fetch all real alerts/warnings/recommendations ---
runaway_queries = get_runaway_queries(session)
long_running_queries = get_long_running_queries(session)
over_provisioned_warehouses = get_over_provisioned_warehouses(session)
stale_tables = get_stale_tables(session)
large_temp_tables = get_large_temp_tables(session)
caching_opportunities = get_caching_opportunities(session)
inefficient_serverless_usage = get_inefficient_serverless_usage(session)
# --- Section: Compute Optimization ---
st.subheader("π₯οΈ Compute Optimization Actions")
compute_cols = st.columns(2)
with compute_cols[0]: # Left column for Critical Alerts
st.markdown("##### π΄ Critical Alerts")
if runaway_queries:
for i, query in enumerate(runaway_queries):
st.markdown(f"""
<div class="card card-red">
<div class="card-title">RUNAWAY QUERY DETECTED!</div>
<div class="card-content">
<strong>Query ID:</strong> {query['query_id']}<br>
<strong>Estimated Cost:</strong> ${query['cost']:.2f} Credits<br>
<strong>User:</strong> {query['user']}<br>
<strong>SQL Snippet:</strong> `{query['sql_snippet'][:50]}...`
</div>
</div>
""", unsafe_allow_html=True)
if st.button(
f"Abort Query {query['query_id']}",
key=f"abort_btn_{query['query_id']}_{i}",
help=f"Immediately aborts the runaway query {query['query_id']} to save costs."
):
st.toast(f"Initiating abort for query {query['query_id']}...", icon="π«")
if session:
try:
# Assuming 'your_app_schema.abort_query_sp' is a stored procedure that executes ALTER QUERY <query_id> ABORT;
spcs_status_df = session.call("your_app_schema.abort_query_sp", query['query_id'])
if not spcs_status_df.empty and spcs_status_df.collect()[0][0] == "SUCCESS":
st.success(f"Query {query['query_id']} aborted successfully.")
else:
st.error(f"Failed to abort query {query['query_id']}.")
except Exception as e:
st.error(f"Error calling SPCS for abort: {e}")
else:
st.error("Snowpark Session not available. Cannot abort query.")
else:
st.info("No critical compute alerts detected.")
with compute_cols[1]: # Right column for Warnings and Recommendations
st.markdown("##### π Warnings")
if over_provisioned_warehouses:
for i, warehouse in enumerate(over_provisioned_warehouses):
st.markdown(f"""
<div class="card card-orange">
<div class="card-title">Over-Provisioned Warehouse</div>
<div class="card-content">
<strong>Name:</strong> {warehouse['NAME']}<br>
<strong>Estimated Monthly Cost:</strong> ${warehouse['COST']:.2f} Credits<br>
<strong>Utilization:</strong> {warehouse['UTILIZATION_PERCENT']:.2f}% ({warehouse['SIZE']} size)
</div>
</div>
""", unsafe_allow_html=True)
if st.button(
f"Get Rec for {warehouse['NAME']}",
key=f"rec_wh_btn_{warehouse['NAME']}_{i}",
help=f"Provides recommendations for optimizing {warehouse['NAME']} based on its utilization."
):
if warehouse['UTILIZATION_PERCENT'] < 10:
st.write(f"**Recommendation for {warehouse['NAME']}:** This warehouse has very low utilization. Consider resizing significantly (e.g., to S, XS) or setting auto-suspend to 60 seconds.")
elif warehouse['UTILIZATION_PERCENT'] < 30:
st.write(f"**Recommendation for {warehouse['NAME']}:** Consider resizing to a smaller warehouse (e.g., MEDIUM) or setting auto-suspend to 60 seconds due to low utilization ({warehouse['UTILIZATION_PERCENT']:.2f}%).")
else:
st.write(f"**Recommendation for {warehouse['NAME']}:** Utilization is fair. Review query patterns to see if there are bursty workloads that could benefit from auto-scaling.")
else:
st.info("No over-provisioned warehouses detected.")
st.markdown("##### π΅ Performance Recommendations")
if long_running_queries:
st.markdown("**Long-Running Queries**")
for i, query in enumerate(long_running_queries):
st.markdown(f"""
<div class="card card-blue">
<div class="card-title">Long-Running Query</div>
<div class="card-content">
<strong>Query ID:</strong> {query['query_id']}<br>
<strong>User:</strong> {query['user']}<br>
<strong>Duration:</strong> {query['duration_seconds']:.0f} seconds<br>
<strong>SQL Snippet:</strong> `{query['sql_snippet'][:50]}...`
</div>
</div>
""", unsafe_allow_html=True)
st.info(f"**Tip:** Analyze query plan for `{query['query_id']}`. Look for large scans, expensive joins, or missing clustering/indexing opportunities.")
if caching_opportunities:
st.markdown("**Missed Caching Opportunities**")
for i, opportunity in enumerate(caching_opportunities):
st.markdown(f"""
<div class="card card-blue">
<div class="card-title">Missed Caching Opportunity</div>
<div class="card-content">
<strong>Pattern:</strong> {opportunity['pattern']}<br>
<strong>Education Tip:</strong> {opportunity['tip']}
</div>
</div>
""", unsafe_allow_html=True)
if not (long_running_queries or caching_opportunities):
st.info("No immediate performance recommendations for compute detected.")
st.markdown("---")
# --- Section: Storage Optimization ---
st.subheader("ποΈ Storage Optimization Actions")
storage_cols = st.columns(2)
with storage_cols[0]:
st.markdown("##### π Storage Warnings")
if stale_tables:
st.markdown("**Large Stale Tables**")
for i, table in enumerate(stale_tables):
st.markdown(f"""
<div class="card card-orange">
<div class="card-title">Stale Table Detected</div>
<div class="card-content">
<strong>Table:</strong> {table['TABLE_FULL_NAME']}<br>
<strong>Size:</strong> {table['SIZE_GB']:.2f} GB<br>
<strong>Last Altered:</strong> {table['LAST_ALTERED'].strftime('%Y-%m-%d')}<br>
<strong>Owner:</strong> {table['TABLE_OWNER']}
</div>
</div>
""", unsafe_allow_html=True)
st.info(f"**Recommendation:** Investigate if `{table['TABLE_FULL_NAME']}` is still needed. Consider archiving, dropping, or moving to cheaper storage if not actively used.")
else:
st.info("No large stale tables detected.")
with storage_cols[1]:
st.markdown("##### π΅ Storage Recommendations")
if large_temp_tables:
st.markdown("**Large Temporary/Transient Tables**")
for i, table in enumerate(large_temp_tables):
st.markdown(f"""
<div class="card card-blue">
<div class="card-title">Large Temporary Table</div>
<div class="card-content">
<strong>Table:</strong> {table['DATABASE_NAME']}.{table['SCHEMA_NAME']}.{table['TABLE_NAME']}<br>
<strong>Type:</strong> {table['TABLE_TYPE']}<br>
<strong>Size:</strong> {table['SIZE_GB']:.2f} GB<br>
<strong>Created:</strong> {table['CREATED'].strftime('%Y-%m-%d %H:%M')}
</div>
</div>
""", unsafe_allow_html=True)
st.info(f"**Recommendation:** High volume of large temporary/transient tables could indicate inefficient ETL or data processing patterns. Review the process creating `{table['TABLE_NAME']}` for optimization.")
else:
st.info("No significant temporary/transient table issues found.")
st.markdown("---")
# --- Section: Serverless Compute Optimization (Conceptual) ---
st.subheader("βοΈ Serverless Compute Optimization Actions (Snowpipe, Tasks, Hybrid Tables)")
serverless_cols = st.columns(1) # Single column for serverless for now
with serverless_cols[0]:
st.markdown("##### π΅ Serverless Recommendations")
if inefficient_serverless_usage:
for i, rec in enumerate(inefficient_serverless_usage):
st.markdown(f"""
<div class="card card-blue">
<div class="card-title">Serverless Usage Opportunity</div>
<div class="card-content">
<strong>Type:</strong> {rec['type']}<br>
<strong>Name:</strong> {rec['name']}<br>
<strong>Tip:</strong> {rec['tip']}
</div>
</div>
""", unsafe_allow_html=True)
else:
st.info("No specific serverless compute optimization opportunities identified at this time. This section's insights depend on detailed analysis of Snowpipe, Tasks, and Hybrid Table usage.")
st.markdown("---")
# --- Spend Trends Section ---
st.header("π Spend Trends Analysis")
# Daily/Weekly Credit Consumption Line Chart
st.subheader("Daily Credit Consumption Over Time")
# Pass the selected date range to the data function
daily_credits_df = get_daily_credit_consumption(session, selected_time_range, start_date, end_date)
# Create an interactive line chart using Plotly Express
fig_credits = px.line(
daily_credits_df,
x='DATE', # Ensure column name matches the SQL query result
y='CREDITS', # Ensure column name matches the SQL query result
title=f"Daily Credit Consumption ({selected_time_range})",
labels={'CREDITS': 'Credits Consumed', 'DATE': 'Date'},
height=450,
template="plotly_white" # Use a clean template
)
# Customize chart appearance for better readability and aesthetics
fig_credits.update_layout(
hovermode="x unified", # Shows all y-values at a given x-point on hover
plot_bgcolor='rgba(0,0,0,0)', # Transparent plot background
paper_bgcolor='rgba(0,0,0,0)', # Transparent paper background
margin=dict(l=20, r=20, t=50, b=20), # Adjust margins
font=dict(family="Inter, sans-serif", color="#333333"),
title_font_size=20
)
fig_credits.update_xaxes(showgrid=False, title_font_size=14, tickfont_size=12)
fig_credits.update_yaxes(showgrid=True, gridcolor='#e0e0e0', title_font_size=14, tickfont_size=12)
# Display the Plotly chart
st.plotly_chart(fig_credits, use_container_width=True)
st.markdown("---") # Separator
# Top N Users/Roles by Spend Bar Chart and Table
st.subheader(f"Top Users/Roles by {selected_cost_type} Spend")
top_n_df = get_top_n_users_roles(session, selected_cost_type)
# Use columns for bar chart and details table side-by-side
top_spend_chart_col, top_spend_table_col = st.columns([2, 1])
with top_spend_chart_col:
# Create an interactive bar chart for top users/roles
fig_top_users = px.bar(
top_n_df,
x='USER', # Ensure column name matches the SQL query result
y='COST', # Ensure column name matches the SQL query result
color='ROLE', # Ensure column name matches the SQL query result
title=f"Top 10 Users by {selected_cost_type} Spend",
labels={'COST': 'Cost (Credits)', 'USER': 'User'},
height=450,
template="plotly_white"
)
fig_top_users.update_layout(
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
margin=dict(l=20, r=20, t=50, b=20),
font=dict(family="Inter", color="#333333"),
title_font_size=20
)
fig_top_users.update_xaxes(tickangle=45, showgrid=False, title_font_size=14, tickfont_size=12)
fig_top_users.update_yaxes(showgrid=True, gridcolor='#e0e0e0', title_font_size=14, tickfont_size=12)
st.plotly_chart(fig_top_users, use_container_width=True)
with top_spend_table_col:
st.markdown("#### Spend Details")
# Display the top users data in a sortable dataframe
st.dataframe(top_n_df, use_container_width=True, hide_index=True)
st.markdown("π‘ *Future enhancement: Clicking on a user will open a deep-dive view into their specific Snowflake activity and costs.*")
# Placeholder for Deep-Dive View (example of how it might be implemented)
# selected_user_for_dive = st.selectbox(
# "Select a User for Deep Dive",
# [""] + top_n_df['USER'].tolist(), # Note: Changed to 'USER' to match SQL result
# index=0, # Default to empty
# help="Choose a user to see their detailed cost breakdown and query history."
# )
# if selected_user_for_dive:
# st.subheader(f"Deep Dive: {selected_user_for_dive}")
# st.info(f"Displaying detailed query history, warehouse usage, and cost breakdown for **{selected_user_for_dive}** here.")
# # Example data for deep dive (would be fetched via SPCS call)
# # st.dataframe(pd.DataFrame({"Query": ["Q1", "Q2"], "Cost": [10, 20], "Warehouse": ["WH1", "WH2"]}))
# --- Footer ---
st.markdown("---")
st.markdown("""
<div style="text-align: center; color: #777777; font-size: 0.8em; margin-top: 30px;">
Developed for Snowflake Native App using Streamlit and SPCS.
<br>
This version attempts to source data directly from Snowflake Account Usage views.
</div>
""", unsafe_allow_html=True)
For further actions, you may consider blocking this person and/or reporting abuse
Top comments (0)