DEV Community

Armaan Khan
Armaan Khan

Posted on

best

from flask import Flask, jsonify, request
from flask_cors import CORS
import snowflake.connector
import pandas as pd
import json
import os
from datetime import datetime
import logging
from typing import Dict, List, Optional, Any
import uuid

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)
CORS(app)

class SnowflakeDataManager:
    """Manages Snowflake connections and data caching"""

    def __init__(self):
        self.cache_dir = "cache"
        self.ensure_cache_dir()

    def ensure_cache_dir(self):
        """Create cache directory if it doesn't exist"""
        if not os.path.exists(self.cache_dir):
            os.makedirs(self.cache_dir)

    def execute_query(self, cursor, query: str) -> pd.DataFrame:
        """Execute query and return DataFrame"""
        try:
            cursor.execute(query)
            results = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            df = pd.DataFrame(results, columns=columns)
            logger.info(f"Query executed successfully. Rows returned: {len(df)}")
            return df
        except Exception as e:
            logger.error(f"Error executing query: {str(e)}")
            raise

    def save_to_cache(self, data: pd.DataFrame, filename: str):
        """Save DataFrame to multiple formats for caching"""
        base_path = os.path.join(self.cache_dir, filename)

        try:
            # Save as CSV
            data.to_csv(f"{base_path}.csv", index=False)

            # Save as JSON
            data.to_json(f"{base_path}.json", orient='records', date_format='iso')

            # Save as Parquet (most efficient)
            data.to_parquet(f"{base_path}.parquet", index=False)

            logger.info(f"Data cached successfully: {filename}")
        except Exception as e:
            logger.error(f"Error saving cache: {str(e)}")
            raise

    def load_from_cache(self, filename: str, format_type: str = 'parquet') -> Optional[pd.DataFrame]:
        """Load DataFrame from cache"""
        cache_path = os.path.join(self.cache_dir, f"{filename}.{format_type}")

        try:
            if os.path.exists(cache_path):
                if format_type == 'parquet':
                    return pd.read_parquet(cache_path)
                elif format_type == 'json':
                    return pd.read_json(cache_path)
                elif format_type == 'csv':
                    return pd.read_csv(cache_path)
            return None
        except Exception as e:
            logger.error(f"Error loading cache: {str(e)}")
            return None

    def is_cache_valid(self, filename: str, hours: int = 24) -> bool:
        """Check if cache is still valid based on timestamp"""
        cache_path = os.path.join(self.cache_dir, f"{filename}.parquet")

        if not os.path.exists(cache_path):
            return False

        file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
        time_diff = datetime.now() - file_time

        return time_diff.total_seconds() < (hours * 3600)

# Global data manager instance
data_manager = SnowflakeDataManager()

# Global variables to store data
warehouse_data = None
query_summary_data = None
query_details_data = None

@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'cache_status': {
            'warehouse': warehouse_data is not None,
            'query_summary': query_summary_data is not None,
            'query_details': query_details_data is not None
        }
    })

@app.route('/initialize', methods=['POST'])
def initialize_data():
    """Initialize data from Snowflake using provided cursor"""
    global warehouse_data, query_summary_data, query_details_data

    try:
        # Get cursor from request (you'll pass this from your application)
        cursor = request.json.get('cursor')

        if not cursor:
            return jsonify({'error': 'Snowflake cursor required'}), 400

        # Check cache first
        if (data_manager.is_cache_valid('warehouse_analytics') and 
            data_manager.is_cache_valid('query_history_summary') and 
            data_manager.is_cache_valid('query_details_complete')):

            warehouse_data = data_manager.load_from_cache('warehouse_analytics')
            query_summary_data = data_manager.load_from_cache('query_history_summary')
            query_details_data = data_manager.load_from_cache('query_details_complete')

            logger.info("Data loaded from cache")
            return jsonify({
                'status': 'success',
                'source': 'cache',
                'warehouse_rows': len(warehouse_data),
                'query_summary_rows': len(query_summary_data),
                'query_details_rows': len(query_details_data)
            })

        # Execute queries to get fresh data
        logger.info("Fetching fresh data from Snowflake...")

        # Query 1: Warehouse Analytics Dashboard
        warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
        warehouse_data = data_manager.execute_query(cursor, warehouse_query)
        data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')

        # Query 2: Query History Summary
        summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
        query_summary_data = data_manager.execute_query(cursor, summary_query)
        data_manager.save_to_cache(query_summary_data, 'query_history_summary')

        # Query 3: Query Details Complete
        details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
        query_details_data = data_manager.execute_query(cursor, details_query)
        data_manager.save_to_cache(query_details_data, 'query_details_complete')

        logger.info("All data fetched and cached successfully")

        return jsonify({
            'status': 'success',
            'source': 'database',
            'warehouse_rows': len(warehouse_data),
            'query_summary_rows': len(query_summary_data),
            'query_details_rows': len(query_details_data)
        })

    except Exception as e:
        logger.error(f"Error initializing data: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/initialize-with-cursor', methods=['POST'])
def initialize_with_direct_cursor():
    """Initialize data when you have direct access to cursor object"""
    global warehouse_data, query_summary_data, query_details_data

    # This endpoint expects you to call it directly from Python with cursor
    # You would use this in your Python script that has the Snowflake cursor

    return jsonify({'message': 'Use initialize_data_direct function instead'})

def initialize_data_direct(cursor):
    """Direct initialization function - call this from your Python script"""
    global warehouse_data, query_summary_data, query_details_data

    try:
        # Check cache first
        if (data_manager.is_cache_valid('warehouse_analytics') and 
            data_manager.is_cache_valid('query_history_summary') and 
            data_manager.is_cache_valid('query_details_complete')):

            warehouse_data = data_manager.load_from_cache('warehouse_analytics')
            query_summary_data = data_manager.load_from_cache('query_history_summary')
            query_details_data = data_manager.load_from_cache('query_details_complete')

            logger.info("Data loaded from cache")
            return True

        # Execute queries to get fresh data
        logger.info("Fetching fresh data from Snowflake...")

        # Query 1: Warehouse Analytics Dashboard
        warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
        warehouse_data = data_manager.execute_query(cursor, warehouse_query)
        data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')

        # Query 2: Query History Summary
        summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
        query_summary_data = data_manager.execute_query(cursor, summary_query)
        data_manager.save_to_cache(query_summary_data, 'query_history_summary')

        # Query 3: Query Details Complete
        details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
        query_details_data = data_manager.execute_query(cursor, details_query)
        data_manager.save_to_cache(query_details_data, 'query_details_complete')

        logger.info("All data fetched and cached successfully")
        return True

    except Exception as e:
        logger.error(f"Error initializing data: {str(e)}")
        return False

@app.route('/warehouses', methods=['GET'])
def get_warehouses():
    """Get all warehouse data"""
    global warehouse_data

    if warehouse_data is None:
        return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400

    try:
        # Convert to dict and handle JSON serialization
        result = []
        for _, row in warehouse_data.iterrows():
            row_dict = row.to_dict()

            # Handle QUERY_IDS column (JSON object)
            if 'QUERY_IDS' in row_dict:
                if pd.isna(row_dict['QUERY_IDS']):
                    row_dict['QUERY_IDS'] = {}
                elif isinstance(row_dict['QUERY_IDS'], str):
                    try:
                        row_dict['QUERY_IDS'] = json.loads(row_dict['QUERY_IDS'])
                    except:
                        row_dict['QUERY_IDS'] = {}

            # Handle datetime objects
            for key, value in row_dict.items():
                if pd.isna(value):
                    row_dict[key] = None
                elif isinstance(value, pd.Timestamp):
                    row_dict[key] = value.isoformat()
                elif isinstance(value, (pd.Int64Dtype, pd.Float64Dtype)):
                    row_dict[key] = None if pd.isna(value) else value

            result.append(row_dict)

        return jsonify({
            'status': 'success',
            'count': len(result),
            'data': result
        })

    except Exception as e:
        logger.error(f"Error getting warehouses: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/queries/by-warehouse/<warehouse_id>/<metric_type>', methods=['GET'])
def get_queries_by_warehouse_metric(warehouse_id, metric_type):
    """Get queries for specific warehouse and metric type"""
    global warehouse_data, query_summary_data

    if warehouse_data is None or query_summary_data is None:
        return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400

    try:
        # Find the warehouse
        warehouse_row = warehouse_data[warehouse_data['WAREHOUSE_ID'] == warehouse_id]
        if warehouse_row.empty:
            return jsonify({'error': 'Warehouse not found'}), 404

        # Get query IDs for the metric type
        query_ids_json = warehouse_row.iloc[0]['QUERY_IDS']
        if isinstance(query_ids_json, str):
            query_ids_data = json.loads(query_ids_json)
        else:
            query_ids_data = query_ids_json if query_ids_json else {}

        # Map metric types to QUERY_IDS keys
        metric_mapping = {
            '1-10-seconds': '1-10_sec_ids',
            '10-20-seconds': '10-20_sec_ids',
            '20-60-seconds': '20-60_sec_ids',
            '1-3-minutes': '1-3_min_ids',
            '3-5-minutes': '3-5_min_ids',
            '5-plus-minutes': '5_plus_min_ids',
            'queued-1-2-minutes': 'queued_1-2_min_ids',
            'queued-2-5-minutes': 'queued_2-5_min_ids',
            'queued-5-10-minutes': 'queued_5-10_min_ids',
            'queued-10-20-minutes': 'queued_10-20_min_ids',
            'queued-20-plus-minutes': 'queued_20_plus_min_ids',
            'spilled-local': 'spilled_local_ids',
            'spilled-remote': 'spilled_remote_ids',
            'failed-queries': 'failed_queries_ids',
            'successful-queries': 'successful_queries_ids',
            'running-queries': 'running_queries_ids'
        }

        query_ids_key = metric_mapping.get(metric_type)
        if not query_ids_key:
            return jsonify({'error': 'Invalid metric type'}), 400

        query_ids = query_ids_data.get(query_ids_key, [])
        # Filter out null values
        query_ids = [qid for qid in query_ids if qid is not None]

        if not query_ids:
            return jsonify({
                'status': 'success',
                'warehouse_id': warehouse_id,
                'metric_type': metric_type,
                'count': 0,
                'data': []
            })

        # Get queries from summary data
        filtered_queries = query_summary_data[query_summary_data['QUERY_ID'].isin(query_ids)]

        # Group by user
        grouped_data = {}
        for _, query in filtered_queries.iterrows():
            user_name = query['USER_NAME']
            if user_name not in grouped_data:
                grouped_data[user_name] = []

            query_dict = query.to_dict()
            # Handle datetime and NaN values
            for key, value in query_dict.items():
                if pd.isna(value):
                    query_dict[key] = None
                elif isinstance(value, pd.Timestamp):
                    query_dict[key] = value.isoformat()

            grouped_data[user_name].append(query_dict)

        return jsonify({
            'status': 'success',
            'warehouse_id': warehouse_id,
            'metric_type': metric_type,
            'count': len(filtered_queries),
            'users': len(grouped_data),
            'data': grouped_data
        })

    except Exception as e:
        logger.error(f"Error getting queries by warehouse metric: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/queries/by-user/<warehouse_id>/<metric_type>/<user_name>', methods=['GET'])
def get_queries_by_user(warehouse_id, metric_type, user_name):
    """Get queries for specific user in warehouse metric"""
    global warehouse_data, query_summary_data

    if warehouse_data is None or query_summary_data is None:
        return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400

    try:
        # Get queries for warehouse and metric first
        response_data = get_queries_by_warehouse_metric(warehouse_id, metric_type)
        if response_data.status_code != 200:
            return response_data

        response_json = response_data.get_json()
        user_queries = response_json['data'].get(user_name, [])

        return jsonify({
            'status': 'success',
            'warehouse_id': warehouse_id,
            'metric_type': metric_type,
            'user_name': user_name,
            'count': len(user_queries),
            'data': user_queries
        })

    except Exception as e:
        logger.error(f"Error getting queries by user: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/query/details/<query_id>', methods=['GET'])
def get_query_details(query_id):
    """Get detailed information for a specific query"""
    global query_details_data

    if query_details_data is None:
        return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400

    try:
        # Find the specific query
        query_detail = query_details_data[query_details_data['QUERY_ID'] == query_id]

        if query_detail.empty:
            return jsonify({'error': 'Query not found'}), 404

        # Convert to dict
        result = query_detail.iloc[0].to_dict()

        # Handle datetime and NaN values
        for key, value in result.items():
            if pd.isna(value):
                result[key] = None
            elif isinstance(value, pd.Timestamp):
                result[key] = value.isoformat()

        return jsonify({
            'status': 'success',
            'query_id': query_id,
            'data': result
        })

    except Exception as e:
        logger.error(f"Error getting query details: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/queries/summary', methods=['GET'])
def get_query_summary():
    """Get query summary with optional filters"""
    global query_summary_data

    if query_summary_data is None:
        return jsonify({'error': 'Data not initialized. Call /initialize first'}), 400

    try:
        # Get query parameters for filtering
        warehouse_id = request.args.get('warehouse_id')
        user_name = request.args.get('user_name')
        execution_status = request.args.get('execution_status')
        duration_bucket = request.args.get('duration_bucket')

        filtered_data = query_summary_data.copy()

        # Apply filters
        if warehouse_id:
            filtered_data = filtered_data[filtered_data['WAREHOUSE_ID'] == warehouse_id]

        if user_name:
            filtered_data = filtered_data[filtered_data['USER_NAME'] == user_name]

        if execution_status:
            filtered_data = filtered_data[filtered_data['EXECUTION_STATUS'] == execution_status]

        if duration_bucket:
            filtered_data = filtered_data[filtered_data['DURATION_BUCKET'] == duration_bucket]

        # Convert to list of dicts
        result = []
        for _, row in filtered_data.iterrows():
            row_dict = row.to_dict()

            # Handle datetime and NaN values
            for key, value in row_dict.items():
                if pd.isna(value):
                    row_dict[key] = None
                elif isinstance(value, pd.Timestamp):
                    row_dict[key] = value.isoformat()

            result.append(row_dict)

        return jsonify({
            'status': 'success',
            'count': len(result),
            'filters': {
                'warehouse_id': warehouse_id,
                'user_name': user_name,
                'execution_status': execution_status,
                'duration_bucket': duration_bucket
            },
            'data': result
        })

    except Exception as e:
        logger.error(f"Error getting query summary: {str(e)}")
        return jsonify({'error': str(e)}), 500

@app.route('/cache/refresh', methods=['POST'])
def refresh_cache():
    """Force refresh cache (requires cursor to be provided)"""
    try:
        cursor = request.json.get('cursor')
        if not cursor:
            return jsonify({'error': 'Snowflake cursor required'}), 400

        # Force refresh by calling initialize without cache check
        global warehouse_data, query_summary_data, query_details_data

        # Execute queries to get fresh data
        logger.info("Force refreshing data from Snowflake...")

        warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
        warehouse_data = data_manager.execute_query(cursor, warehouse_query)
        data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')

        summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
        query_summary_data = data_manager.execute_query(cursor, summary_query)
        data_manager.save_to_cache(query_summary_data, 'query_history_summary')

        details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
        query_details_data = data_manager.execute_query(cursor, details_query)
        data_manager.save_to_cache(query_details_data, 'query_details_complete')

        logger.info("Cache refreshed successfully")

        return jsonify({
            'status': 'success',
            'message': 'Cache refreshed successfully',
            'warehouse_rows': len(warehouse_data),
            'query_summary_rows': len(query_summary_data),
            'query_details_rows': len(query_details_data)
        })

    except Exception as e:
        logger.error(f"Error refreshing cache: {str(e)}")
        return jsonify({'error': str(e)}), 500

def refresh_cache_direct(cursor):
    """Direct cache refresh function - call this from your Python script"""
    global warehouse_data, query_summary_data, query_details_data

    try:
        logger.info("Force refreshing data from Snowflake...")

        warehouse_query = "SELECT * FROM WAREHOUSE_ANALYTICS_DASHBOARD_with_queries"
        warehouse_data = data_manager.execute_query(cursor, warehouse_query)
        data_manager.save_to_cache(warehouse_data, 'warehouse_analytics')

        summary_query = "SELECT * FROM QUERY_HISTORY_SUMMARY"
        query_summary_data = data_manager.execute_query(cursor, summary_query)
        data_manager.save_to_cache(query_summary_data, 'query_history_summary')

        details_query = "SELECT * FROM QUERY_DETAILS_COMPLETE"
        query_details_data = data_manager.execute_query(cursor, details_query)
        data_manager.save_to_cache(query_details_data, 'query_details_complete')

        logger.info("Cache refreshed successfully")
        return True

    except Exception as e:
        logger.error(f"Error refreshing cache: {str(e)}")
        return False

if __name__ == '__main__':
    # Load cached data on startup
    try:
        warehouse_data = data_manager.load_from_cache('warehouse_analytics')
        query_summary_data = data_manager.load_from_cache('query_history_summary')
        query_details_data = data_manager.load_from_cache('query_details_complete')

        if warehouse_data is not None:
            logger.info("Cached data loaded on startup")
        else:
            logger.info("No cached data found. Call /initialize to load data.")

    except Exception as e:
        logger.error(f"Error loading cached data on startup: {str(e)}")

    app.run(debug=True, host='0.0.0.0', port=5000)

# Usage example for your Python script:
"""
# In your main Python script where you have the Snowflake cursor:

import requests
from your_flask_app import initialize_data_direct, refresh_cache_direct

# Assuming you have a Snowflake cursor
cursor = your_snowflake_connection.cursor()

# Initialize data
if initialize_data_direct(cursor):
    print("Data initialized successfully")

    # Start the Flask app in a separate thread or process
    # Then your React app can call the API endpoints

    # To refresh data later:
    # refresh_cache_direct(cursor)
else:
    print("Failed to initialize data")
"""
Enter fullscreen mode Exit fullscreen mode
# usage_script.py
"""
Usage script showing how to integrate the Flask backend with your Snowflake cursor
"""

import snowflake.connector
import threading
import time
from flask_app import app, initialize_data_direct, refresh_cache_direct

def run_flask_app():
    """Run Flask app in a separate thread"""
    app.run(debug=False, host='0.0.0.0', port=5000, use_reloader=False)

def main():
    """Main function to demonstrate usage"""

    # Your Snowflake connection parameters
    snowflake_config = {
        'user': 'your_username',
        'password': 'your_password',
        'account': 'your_account',
        'warehouse': 'your_warehouse',
        'database': 'your_database',
        'schema': 'your_schema'
    }

    try:
        # Establish Snowflake connection
        print("Connecting to Snowflake...")
        conn = snowflake.connector.connect(**snowflake_config)
        cursor = conn.cursor()
        print("Connected successfully!")

        # Initialize data using direct function
        print("Initializing data from Snowflake tables...")
        if initialize_data_direct(cursor):
            print("βœ… Data initialized successfully!")
            print("Cache files created in './cache/' directory")

            # Start Flask app in background thread
            print("Starting Flask API server...")
            flask_thread = threading.Thread(target=run_flask_app)
            flask_thread.daemon = True
            flask_thread.start()

            # Wait a moment for Flask to start
            time.sleep(2)
            print("πŸš€ Flask API server running on http://localhost:5000")

            print("\n" + "="*50)
            print("Available API Endpoints:")
            print("="*50)
            print("GET  /health                           - Health check")
            print("GET  /warehouses                      - Get all warehouses")
            print("GET  /queries/by-warehouse/<id>/<metric> - Get queries by warehouse metric")
            print("GET  /queries/by-user/<wh>/<metric>/<user> - Get queries by user")
            print("GET  /query/details/<query_id>        - Get detailed query info")
            print("GET  /queries/summary                 - Get query summary with filters")
            print("="*50)

            print("\nExample URLs:")
            print("- http://localhost:5000/health")
            print("- http://localhost:5000/warehouses")
            print("- http://localhost:5000/queries/by-warehouse/WH123/1-10-seconds")
            print("- http://localhost:5000/query/details/QUERY123")
            print("\nπŸ’‘ Your React app can now call these endpoints!")

            # Keep the script running
            print("\nPress Ctrl+C to stop...")
            try:
                while True:
                    time.sleep(1)
            except KeyboardInterrupt:
                print("\nπŸ‘‹ Shutting down...")

        else:
            print("❌ Failed to initialize data")

    except Exception as e:
        print(f"❌ Error: {str(e)}")

    finally:
        # Close connections
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()
        print("Database connections closed")

if __name__ == "__main__":
    main()

# Alternative: If you want to refresh data periodically
def setup_periodic_refresh(cursor, interval_hours=6):
    """Setup periodic data refresh"""
    def refresh_job():
        while True:
            time.sleep(interval_hours * 3600)  # Convert hours to seconds
            print(f"Refreshing data... (every {interval_hours} hours)")
            if refresh_cache_direct(cursor):
                print("βœ… Data refreshed successfully")
            else:
                print("❌ Data refresh failed")

    refresh_thread = threading.Thread(target=refresh_job)
    refresh_thread.daemon = True
    refresh_thread.start()
    print(f"πŸ“… Periodic refresh scheduled every {interval_hours} hours")

# Quick test function
def test_api_endpoints():
    """Test function to verify API endpoints"""
    import requests
    import json

    base_url = "http://localhost:5000"

    try:
        # Test health endpoint
        response = requests.get(f"{base_url}/health")
        print("Health Check:", response.json())

        # Test warehouses endpoint
        response = requests.get(f"{base_url}/warehouses")
        data = response.json()
        print(f"Warehouses: Found {data['count']} warehouses")

        if data['count'] > 0:
            # Get first warehouse for testing
            first_warehouse = data['data'][0]
            warehouse_id = first_warehouse['WAREHOUSE_ID']

            # Test queries by warehouse metric
            response = requests.get(f"{base_url}/queries/by-warehouse/{warehouse_id}/1-10-seconds")
            queries_data = response.json()
            print(f"Queries for {warehouse_id} (1-10 seconds): {queries_data['count']} queries")

    except requests.exceptions.RequestException as e:
        print(f"API test failed: {e}")
        print("Make sure the Flask server is running")

# Example for testing without starting the full server
def quick_test():
    """Quick test without starting server"""
    # Your cursor initialization here
    cursor = None  # Replace with your actual cursor

    if cursor and initialize_data_direct(cursor):
        print("Data loaded successfully!")
        print("You can now start the Flask server separately")
    else:
        print("Failed to load data")
Enter fullscreen mode Exit fullscreen mode

Top comments (0)