from flask import Flask, jsonify, request
from flask_cors import CORS
import snowflake.connector
import pandas as pd
import json
import os
from datetime import datetime
import logging
from typing import Dict, List, Optional, Any
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app)
class SnowflakeDataManager:
"""Manages Snowflake connections and data caching"""
def __init__(self):
self.cache_dir = "cache"
self.ensure_cache_dir()
def ensure_cache_dir(self):
"""Create cache directory if it doesn't exist"""
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
def execute_query(self, cursor, query: str) -> pd.DataFrame:
"""Execute query and return DataFrame"""
try:
cursor.execute(query)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
df = pd.DataFrame(results, columns=columns)
logger.info(f"Query executed successfully. Rows returned: {len(df)}")
return df
except Exception as e:
logger.error(f"Error executing query: {str(e)}")
raise
def save_to_cache(self, data: pd.DataFrame, filename: str):
"""Save DataFrame to multiple formats for caching"""
base_path = os.path.join(self.cache_dir, filename)
try:
# Save as CSV
data.to_csv(f"{base_path}.csv", index=False)
# Save as JSON
data.to_json(f"{base_path}.json", orient='records', date_format='iso')
# Save as Parquet (most efficient)
data.to_parquet(f"{base_path}.parquet", index=False)
logger.info(f"Data cached successfully: {filename}")
except Exception as e:
logger.error(f"Error saving cache: {str(e)}")
raise
def load_from_cache(self, filename: str, format_type: str = 'parquet') -> Optional[pd.DataFrame]:
"""Load DataFrame from cache"""
cache_path = os.path.join(self.cache_dir, f"{filename}.{format_type}")
try:
if os.path.exists(cache_path):
if format_type == 'parquet':
return pd.read_parquet(cache_path)
elif format_type == 'json':
return pd.read_json(cache_path)
elif format_type == 'csv':
return pd.read_csv(cache_path)
return None
except Exception as e:
logger.error(f"Error loading cache: {str(e)}")
return None
def is_cache_valid(self, filename: str, hours: int = 24) -> bool:
"""Check if cache is still valid based on timestamp"""
cache_path = os.path.join(self.cache_dir, f"{filename}.parquet")
if not os.path.exists(cache_path):
return False
file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
time_diff = datetime.now() - file_time
return time_diff.total_seconds() < (hours * 3600)
# Global data manager instance
data_manager = SnowflakeDataManager()
# Global variables to store data
warehouse_data = None
query_summary_data = None
query_details_data = None
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'cache_status': {
'warehouse': warehouse_data is not None,
'query_summary': query_summary_data is not None,
'query_details': query_details_data is not None
}
})
@app.route('/initialize', methods=['POST'])
def initialize_data():
"""Initialize data from Snowflake using provided cursor"""
global warehouse_data, query_summary_data, query_details_data
try:
# Get cursor from request (you'll pass this from your application)
cursor = request.json.get('cursor')
if not cursor:
return jsonify({'error': 'Snowflake cursor required'}), 400
# Check cache first
if (data_manager.is_cache_valid('warehouse_analytics') and
data_manager.is_cache_valid('query_history_summary') and
data_manager.is_cache_valid('query_details_complete')):
warehouse_data = data_manager.load_from_cache('warehouse_analytics')
query_summary_data = data_manager.load_from_cache('query_history_summary')
query_details_data = data_manager.load_from_cache('query_details_complete')
logger.info("Data loaded from cache")
return jsonify({
'status': 'success',
'source': 'cache',
'warehouse_rows': len(warehouse_data),
'query_summary_rows': len(query_summary_data),
'query_details_rows': len(query_details_data)
})
# Execute queries to get fresh data
logger.info("Fetching fresh data from Snowflake...")
# Query 1: Warehouse Analytics Dashboard
warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
warehouse_data = data_manager.execute_query(cursor, warehouse_query)
data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')
# Query 2: Query History Summary
summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
query_summary_data = data_manager.execute_query(cursor, summary_query)
data_manager.save_to_cache(query_summary_data, 'query_history_summary')
# Query 3: Query Details Complete
details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
query_details_data = data_manager.execute_query(cursor, details_query)
data_manager.save_to_cache(query_details_data, 'query_details_complete')
logger.info("All data fetched and cached successfully")
return jsonify({
'status': 'success',
'source': 'database',
'warehouse_rows': len(warehouse_data),
'query_summary_rows': len(query_summary_data),
'query_details_rows': len(query_details_data)
})
except Exception as e:
logger.error(f"Error initializing data: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/initialize-with-cursor', methods=['POST'])
def initialize_with_direct_cursor():
"""Initialize data when you have direct access to cursor object"""
global warehouse_data, query_summary_data, query_details_data
# This endpoint expects you to call it directly from Python with cursor
# You would use this in your Python script that has the Snowflake cursor
return jsonify({'message': 'Use initialize_data_direct function instead'})
def initialize_data_direct(cursor):
"""Direct initialization function - call this from your Python script"""
global warehouse_data, query_summary_data, query_details_data
try:
# Check cache first
if (data_manager.is_cache_valid('warehouse_analytics') and
data_manager.is_cache_valid('query_history_summary') and
data_manager.is_cache_valid('query_details_complete')):
warehouse_data = data_manager.load_from_cache('warehouse_analytics')
query_summary_data = data_manager.load_from_cache('query_history_summary')
query_details_data = data_manager.load_from_cache('query_details_complete')
logger.info("Data loaded from cache")
return True
# Execute queries to get fresh data
logger.info("Fetching fresh data from Snowflake...")
# Query 1: Warehouse Analytics Dashboard
warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
warehouse_data = data_manager.execute_query(cursor, warehouse_query)
data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')
# Query 2: Query History Summary
summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
query_summary_data = data_manager.execute_query(cursor, summary_query)
data_manager.save_to_cache(query_summary_data, 'query_history_summary')
# Query 3: Query Details Complete
details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
query_details_data = data_manager.execute_query(cursor, details_query)
data_manager.save_to_cache(query_details_data, 'query_details_complete')
logger.info("All data fetched and cached successfully")
return True
except Exception as e:
logger.error(f"Error initializing data: {str(e)}")
return False
@app.route('/warehouses', methods=['GET'])
def get_warehouses():
"""Get all warehouse data"""
global warehouse_data
if warehouse_data is None:
return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400
try:
# Convert to dict and handle JSON serialization
result = []
for _, row in warehouse_data.iterrows():
row_dict = row.to_dict()
# Handle QUERY_IDS column (JSON object)
if 'QUERY_IDS' in row_dict:
if pd.isna(row_dict['QUERY_IDS']):
row_dict['QUERY_IDS'] = {}
elif isinstance(row_dict['QUERY_IDS'], str):
try:
row_dict['QUERY_IDS'] = json.loads(row_dict['QUERY_IDS'])
except:
row_dict['QUERY_IDS'] = {}
# Handle datetime objects
for key, value in row_dict.items():
if pd.isna(value):
row_dict[key] = None
elif isinstance(value, pd.Timestamp):
row_dict[key] = value.isoformat()
elif isinstance(value, (pd.Int64Dtype, pd.Float64Dtype)):
row_dict[key] = None if pd.isna(value) else value
result.append(row_dict)
return jsonify({
'status': 'success',
'count': len(result),
'data': result
})
except Exception as e:
logger.error(f"Error getting warehouses: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/queries/by-warehouse/<warehouse_id>/<metric_type>', methods=['GET'])
def get_queries_by_warehouse_metric(warehouse_id, metric_type):
"""Get queries for specific warehouse and metric type"""
global warehouse_data, query_summary_data
if warehouse_data is None or query_summary_data is None:
return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400
try:
# Find the warehouse
warehouse_row = warehouse_data[warehouse_data['WAREHOUSE_ID'] == warehouse_id]
if warehouse_row.empty:
return jsonify({'error': 'Warehouse not found'}), 404
# Get query IDs for the metric type
query_ids_json = warehouse_row.iloc[0]['QUERY_IDS']
if isinstance(query_ids_json, str):
query_ids_data = json.loads(query_ids_json)
else:
query_ids_data = query_ids_json if query_ids_json else {}
# Map metric types to QUERY_IDS keys
metric_mapping = {
'1-10-seconds': '1-10_sec_ids',
'10-20-seconds': '10-20_sec_ids',
'20-60-seconds': '20-60_sec_ids',
'1-3-minutes': '1-3_min_ids',
'3-5-minutes': '3-5_min_ids',
'5-plus-minutes': '5_plus_min_ids',
'queued-1-2-minutes': 'queued_1-2_min_ids',
'queued-2-5-minutes': 'queued_2-5_min_ids',
'queued-5-10-minutes': 'queued_5-10_min_ids',
'queued-10-20-minutes': 'queued_10-20_min_ids',
'queued-20-plus-minutes': 'queued_20_plus_min_ids',
'spilled-local': 'spilled_local_ids',
'spilled-remote': 'spilled_remote_ids',
'failed-queries': 'failed_queries_ids',
'successful-queries': 'successful_queries_ids',
'running-queries': 'running_queries_ids'
}
query_ids_key = metric_mapping.get(metric_type)
if not query_ids_key:
return jsonify({'error': 'Invalid metric type'}), 400
query_ids = query_ids_data.get(query_ids_key, [])
# Filter out null values
query_ids = [qid for qid in query_ids if qid is not None]
if not query_ids:
return jsonify({
'status': 'success',
'warehouse_id': warehouse_id,
'metric_type': metric_type,
'count': 0,
'data': []
})
# Get queries from summary data
filtered_queries = query_summary_data[query_summary_data['QUERY_ID'].isin(query_ids)]
# Group by user
grouped_data = {}
for _, query in filtered_queries.iterrows():
user_name = query['USER_NAME']
if user_name not in grouped_data:
grouped_data[user_name] = []
query_dict = query.to_dict()
# Handle datetime and NaN values
for key, value in query_dict.items():
if pd.isna(value):
query_dict[key] = None
elif isinstance(value, pd.Timestamp):
query_dict[key] = value.isoformat()
grouped_data[user_name].append(query_dict)
return jsonify({
'status': 'success',
'warehouse_id': warehouse_id,
'metric_type': metric_type,
'count': len(filtered_queries),
'users': len(grouped_data),
'data': grouped_data
})
except Exception as e:
logger.error(f"Error getting queries by warehouse metric: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/queries/by-user/<warehouse_id>/<metric_type>/<user_name>', methods=['GET'])
def get_queries_by_user(warehouse_id, metric_type, user_name):
"""Get queries for specific user in warehouse metric"""
global warehouse_data, query_summary_data
if warehouse_data is None or query_summary_data is None:
return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400
try:
# Get queries for warehouse and metric first
response_data = get_queries_by_warehouse_metric(warehouse_id, metric_type)
if response_data.status_code != 200:
return response_data
response_json = response_data.get_json()
user_queries = response_json['data'].get(user_name, [])
return jsonify({
'status': 'success',
'warehouse_id': warehouse_id,
'metric_type': metric_type,
'user_name': user_name,
'count': len(user_queries),
'data': user_queries
})
except Exception as e:
logger.error(f"Error getting queries by user: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/query/details/<query_id>', methods=['GET'])
def get_query_details(query_id):
"""Get detailed information for a specific query"""
global query_details_data
if query_details_data is None:
return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400
try:
# Find the specific query
query_detail = query_details_data[query_details_data['QUERY_ID'] == query_id]
if query_detail.empty:
return jsonify({'error': 'Query not found'}), 404
# Convert to dict
result = query_detail.iloc[0].to_dict()
# Handle datetime and NaN values
for key, value in result.items():
if pd.isna(value):
result[key] = None
elif isinstance(value, pd.Timestamp):
result[key] = value.isoformat()
return jsonify({
'status': 'success',
'query_id': query_id,
'data': result
})
except Exception as e:
logger.error(f"Error getting query details: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/queries/summary', methods=['GET'])
def get_query_summary():
"""Get query summary with optional filters"""
global query_summary_data
if query_summary_data is None:
return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400
try:
# Get query parameters for filtering
warehouse_id = request.args.get('warehouse_id')
user_name = request.args.get('user_name')
execution_status = request.args.get('execution_status')
duration_bucket = request.args.get('duration_bucket')
filtered_data = query_summary_data.copy()
# Apply filters
if warehouse_id:
filtered_data = filtered_data[filtered_data['WAREHOUSE_ID'] == warehouse_id]
if user_name:
filtered_data = filtered_data[filtered_data['USER_NAME'] == user_name]
if execution_status:
filtered_data = filtered_data[filtered_data['EXECUTION_STATUS'] == execution_status]
if duration_bucket:
filtered_data = filtered_data[filtered_data['DURATION_BUCKET'] == duration_bucket]
# Convert to list of dicts
result = []
for _, row in filtered_data.iterrows():
row_dict = row.to_dict()
# Handle datetime and NaN values
for key, value in row_dict.items():
if pd.isna(value):
row_dict[key] = None
elif isinstance(value, pd.Timestamp):
row_dict[key] = value.isoformat()
result.append(row_dict)
return jsonify({
'status': 'success',
'count': len(result),
'filters': {
'warehouse_id': warehouse_id,
'user_name': user_name,
'execution_status': execution_status,
'duration_bucket': duration_bucket
},
'data': result
})
except Exception as e:
logger.error(f"Error getting query summary: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/cache/refresh', methods=['POST'])
def refresh_cache():
"""Force refresh cache (requires cursor to be provided)"""
try:
cursor = request.json.get('cursor')
if not cursor:
return jsonify({'error': 'Snowflake cursor required'}), 400
# Force refresh by calling initialize without cache check
global warehouse_data, query_summary_data, query_details_data
# Execute queries to get fresh data
logger.info("Force refreshing data from Snowflake...")
warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
warehouse_data = data_manager.execute_query(cursor, warehouse_query)
data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')
summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
query_summary_data = data_manager.execute_query(cursor, summary_query)
data_manager.save_to_cache(query_summary_data, 'query_history_summary')
details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
query_details_data = data_manager.execute_query(cursor, details_query)
data_manager.save_to_cache(query_details_data, 'query_details_complete')
logger.info("Cache refreshed successfully")
return jsonify({
'status': 'success',
'message': 'Cache refreshed successfully',
'warehouse_rows': len(warehouse_data),
'query_summary_rows': len(query_summary_data),
'query_details_rows': len(query_details_data)
})
except Exception as e:
logger.error(f"Error refreshing cache: {str(e)}")
return jsonify({'error': str(e)}), 500
def refresh_cache_direct(cursor):
"""Direct cache refresh function - call this from your Python script"""
global warehouse_data, query_summary_data, query_details_data
try:
logger.info("Force refreshing data from Snowflake...")
warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
warehouse_data = data_manager.execute_query(cursor, warehouse_query)
data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')
summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
query_summary_data = data_manager.execute_query(cursor, summary_query)
data_manager.save_to_cache(query_summary_data, 'query_history_summary')
details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
query_details_data = data_manager.execute_query(cursor, details_query)
data_manager.save_to_cache(query_details_data, 'query_details_complete')
logger.info("Cache refreshed successfully")
return True
except Exception as e:
logger.error(f"Error refreshing cache: {str(e)}")
return False
if __name__ == '__main__':
# Load cached data on startup
try:
warehouse_data = data_manager.load_from_cache('warehouse_analytics')
query_summary_data = data_manager.load_from_cache('query_history_summary')
query_details_data = data_manager.load_from_cache('query_details_complete')
if warehouse_data is not None:
logger.info("Cached data loaded on startup")
else:
logger.info("No cached data found. Call /initialize to load data.")
except Exception as e:
logger.error(f"Error loading cached data on startup: {str(e)}")
app.run(debug=True, host='0.0.0.0', port=5000)
# Usage example for your Python script:
"""
# In your main Python script where you have the Snowflake cursor:
import requests
from your_flask_app import initialize_data_direct, refresh_cache_direct
# Assuming you have a Snowflake cursor
cursor = your_snowflake_connection.cursor()
# Initialize data
if initialize_data_direct(cursor):
print("Data initialized successfully")
# Start the Flask app in a separate thread or process
# Then your React app can call the API endpoints
# To refresh data later:
# refresh_cache_direct(cursor)
else:
print("Failed to initialize data")
"""
# usage_script.py
"""
Usage script showing how to integrate the Flask backend with your Snowflake cursor
"""
import snowflake.connector
import threading
import time
from flask_app import app, initialize_data_direct, refresh_cache_direct
def run_flask_app():
"""Run Flask app in a separate thread"""
app.run(debug=False, host='0.0.0.0', port=5000, use_reloader=False)
def main():
"""Main function to demonstrate usage"""
# Your Snowflake connection parameters
snowflake_config = {
'user': 'your_username',
'password': 'your_password',
'account': 'your_account',
'warehouse': 'your_warehouse',
'database': 'your_database',
'schema': 'your_schema'
}
try:
# Establish Snowflake connection
print("Connecting to Snowflake...")
conn = snowflake.connector.connect(**snowflake_config)
cursor = conn.cursor()
print("Connected successfully!")
# Initialize data using direct function
print("Initializing data from Snowflake tables...")
if initialize_data_direct(cursor):
print("β Data initialized successfully!")
print("Cache files created in './cache/' directory")
# Start Flask app in background thread
print("Starting Flask API server...")
flask_thread = threading.Thread(target=run_flask_app)
flask_thread.daemon = True
flask_thread.start()
# Wait a moment for Flask to start
time.sleep(2)
print("π Flask API server running on http://localhost:5000")
print("\n" + "="*50)
print("Available API Endpoints:")
print("="*50)
print("GET /health - Health check")
print("GET /warehouses - Get all warehouses")
print("GET /queries/by-warehouse/<id>/<metric> - Get queries by warehouse metric")
print("GET /queries/by-user/<wh>/<metric>/<user> - Get queries by user")
print("GET /query/details/<query_id> - Get detailed query info")
print("GET /queries/summary - Get query summary with filters")
print("="*50)
print("\nExample URLs:")
print("- http://localhost:5000/health")
print("- http://localhost:5000/warehouses")
print("- http://localhost:5000/queries/by-warehouse/WH123/1-10-seconds")
print("- http://localhost:5000/query/details/QUERY123")
print("\nπ‘ Your React app can now call these endpoints!")
# Keep the script running
print("\nPress Ctrl+C to stop...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nπ Shutting down...")
else:
print("β Failed to initialize data")
except Exception as e:
print(f"β Error: {str(e)}")
finally:
# Close connections
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
print("Database connections closed")
if __name__ == "__main__":
main()
# Alternative: If you want to refresh data periodically
def setup_periodic_refresh(cursor, interval_hours=6):
"""Setup periodic data refresh"""
def refresh_job():
while True:
time.sleep(interval_hours * 3600) # Convert hours to seconds
print(f"Refreshing data... (every {interval_hours} hours)")
if refresh_cache_direct(cursor):
print("β Data refreshed successfully")
else:
print("β Data refresh failed")
refresh_thread = threading.Thread(target=refresh_job)
refresh_thread.daemon = True
refresh_thread.start()
print(f"π Periodic refresh scheduled every {interval_hours} hours")
# Quick test function
def test_api_endpoints():
"""Test function to verify API endpoints"""
import requests
import json
base_url = "http://localhost:5000"
try:
# Test health endpoint
response = requests.get(f"{base_url}/health")
print("Health Check:", response.json())
# Test warehouses endpoint
response = requests.get(f"{base_url}/warehouses")
data = response.json()
print(f"Warehouses: Found {data['count']} warehouses")
if data['count'] > 0:
# Get first warehouse for testing
first_warehouse = data['data'][0]
warehouse_id = first_warehouse['WAREHOUSE_ID']
# Test queries by warehouse metric
response = requests.get(f"{base_url}/queries/by-warehouse/{warehouse_id}/1-10-seconds")
queries_data = response.json()
print(f"Queries for {warehouse_id} (1-10 seconds): {queries_data['count']} queries")
except requests.exceptions.RequestException as e:
print(f"API test failed: {e}")
print("Make sure the Flask server is running")
# Example for testing without starting the full server
def quick_test():
"""Quick test without starting server"""
# Your cursor initialization here
cursor = None # Replace with your actual cursor
if cursor and initialize_data_direct(cursor):
print("Data loaded successfully!")
print("You can now start the Flask server separately")
else:
print("Failed to load data")
Top comments (0)
Subscribe
For further actions, you may consider blocking this person and/or reporting abuse
We're a place where coders share, stay up-to-date and grow their careers.
Top comments (0)