DEV Community

Kate Vu
Kate Vu

Posted on • Originally published at Medium

Develop AWS Glue job interactive sessions locally using Jupiter Notebook

Building and testing AWS Glue scripts locally is more flexible and can speed up your development. In this blog, I walk through how to develop and test AWS Glue jobs locally using Jupiter Notebook
For a full pipeline implementation with AWS CDK, refer to Automatic Trigger Data Pipeline with AWS using AWS CDK
For a full pipeline implementation with AWS CDK, refer to Automatic Trigger Data Pipeline with AWS using AWS CDKFor a full pipeline implementation with AWS CDK, refer to Automatic Trigger Data Pipeline with AWS using AWS CDK

Overview

When developing AWS Glue for Spark jobs, there are several approaches available :

  • AWS Glue console
  • Develop and test script locally using Jupiter notebook or Docker image Choosing which approach will depend on your workflow and team setup. However developing locally often provides you more flexibility, faster testing, and easier debugging.

1. AWS Glue console:

To get started, simply open AWS Glue console and navigate to ETL jobs
Press enter or click to view image in full size

AWS Glue Console

For these, you can choose from three options:

  • Visual editor: if you prefer less code or even no code, visual editor is a great option.
  • Script editor: Script editor is more straight forward and less setup than AWS Glue Studio notebook for my options. However, when deploying using AWS CDK, I encounter a significant challenge. We ended up manually calculating asset_hash to make sure it is different for each deployment. So if you decide to develop the job via script editor and using s3asset for deployment, make sure to isolate the change to your deployment only.
  • AWS Glue Studio: provide an interactive development experience via built-in notebook interface. You can develop and test the scripts interactively without running the whole job. When you are satisfied with the scripts, you can save and download it either as .ipynb files or job scripts. With a single click, you can convert it into Glue jobs. In the next session, we will explore how to develop and test the script using Jupiter Notebook.

2. Develop and test scripts locally

When developing AWS Glue for Spark, you can develop and test your scripts locally using Jupiter notebook or Docker before deploying it in AWS.
Below is a step by step walkthrough using Jupiter Notebook.
The Jupyter Notebook is an interactive environment for running code in the browser (Introduction to machine learning, Andreas * Sarah). For more information refer to What is the Jupyter Notebook?

Pricing consideration

Before getting start let check the pricing for our sessions.
AWS Glue Studio Job Notebooks and Interactive Sessions: Suppose you use a notebook in AWS Glue Studio to interactively develop your ETL code. An Interactive Session has 5 DPU by default. The price of 1 DPU-hour is $0.44. If you keep the session running for 24 minutes, you will be billed for 5 DPUs * 0.4 hours * $0.44, or $0.88.” (https://aws.amazon.com/glue/pricing/)
To avoid unnecessary charges, remember to set timeout and terminate sessions when you are done testing.

Setup

  • Install Jupyter and AWS glue interactive sessions Jupiter kernels
pip3 install - upgrade jupyter boto3 aws-glue-sessionsb
install-glue-kernels
Enter fullscreen mode Exit fullscreen mode
  • Run jupiter notebook
jupyter notebook
Enter fullscreen mode Exit fullscreen mode

This will launch the interface in your browser.
Alternatively, many IDEs (such as VSCode or Cursor) support Jupyter extensions, allowing you to run notebooks and view outputs directly within the IDE.
Once Jupyter is open, choose Glue PySpark as the kernel.
Press enter or click to view image in full size

# Set region and assume provided role

%idle_timeout 20
%region ap-southeast-2
%iam_role <Replace with GlueIngestionGlueJobRoleARN>
# %additional_python_modules ipython
# %extra_py_files 
# %glue_version 5.0
# %worker_type G.1X
# %number_of_workers 5

# Verify identity
import json, boto3
print(json.dumps(boto3.client("sts").get_caller_identity(), indent=2))
Enter fullscreen mode Exit fullscreen mode
  • Input arguments For AWS ETL jobs, arguments usually come from geRolsolveOptions. You can stimulate this by defining parameters as below
# Parameters
# Adjust these for your local run; in Glue these come from getResolvedOptions
params = {
    "env_name": "kate",
    "input_bucket": "source-data",
    "output_bucket": "staging-data",
    "error_bucket": "error-staging-data",
    "file_path": "kate/default",
    # Single file for local test from your config
    "file_names": "2023_yellow_taxi_trip_data_light.csv",
    "sns_topic_arn": <sns arn>,
    "JOB_NAME": "local_ingestion_job",
}

params
Enter fullscreen mode Exit fullscreen mode
  • Include extra py files Upload them to S3 bucket and refer them using %extra py files magic command. However, if you are working on these files, frequency update can be tedious, you will have to re-upload every time you change a function. To make my life a bit easier for these, I include these modules directly within notebook cells and comment out their import lines in the main scripts. For example, include modules for: logging, configuration, and utility functions Logging module
"""
Centralized logging configuration for AWS Data Pipeline
"""

import logging
import logging.config
from typing import Optional, Dict, Any
import json


def setup_logging(
    level: str = "INFO",
    log_format: Optional[str] = None,
    log_file: Optional[str] = None,
    environment: str = "dev",
) -> logging.Logger:
    """
    Set up centralized logging configuration.

    Args:
        level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        log_format: Custom log format string
        log_file: Path to log file (optional)
        environment: Environment name (dev, staging, prod)

    Returns:
        Configured logger instance
    """

    # Default format with structured information
    if log_format is None:
        log_format = (
            "%(asctime)s | %(levelname)-8s | %(name)-20s | "
            "%(funcName)-15s:%(lineno)-4d | %(message)s"
        )

    # Create handlers
    handlers = {
        "stdout": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "simple",
            "stream": "ext://sys.stdout",
        },
        "stderr": {
            "class": "logging.StreamHandler",
            "level": "ERROR",
            "formatter": "simple",
            "stream": "ext://sys.stderr",
        },
    }

    # Add file handler if specified
    if log_file:
        handlers["file"] = {
            "class": "logging.handlers.RotatingFileHandler",
            "level": level,
            "formatter": "detailed",
            "filename": log_file,
            "maxBytes": 10485760,  # 10MB
            "backupCount": 5,
            "encoding": "utf8",
        }

    # Logging configuration dictionary
    logging_config = {
        "version": 1,
        "disable_existing_loggers": False,  # Important to keep this False
        "formatters": {
            "detailed": {"format": log_format, "datefmt": "%Y-%m-%d %H:%M:%S"},
            "simple": {"format": "%(name)-20s - %(asctime)s - %(levelname)s - %(message)s"},
        },
        "handlers": handlers,
        "loggers": {
            # Root logger - Parent of all loggers
            "": {
                "level": level,
                "handlers": list(handlers.keys()),
            },
            # Base application logger
            "glue": {
                "level": level,
                "propagate": True,  # Allow propagation to root
            },
            # AWS SDK loggers (reduce noise)
            "boto3": {"level": "WARNING"},
            "botocore": {"level": "WARNING"},
            "urllib3": {"level": "WARNING"},
            # PySpark loggers
            "pyspark": {"level": "WARNING"},
            "py4j": {"level": "WARNING"},
        },
    }

    # Apply configuration
    logging.config.dictConfig(logging_config)

    # Get logger for the calling module
    logger = logging.getLogger(__name__)

    # Log configuration info
    logger.info(f"Logging configured - Level: {level}, Environment: {environment}")

    if log_file:
        logger.info(f"Log file: {log_file}")

    return logger


def get_logger(name: str) -> logging.Logger:
    """
    Get a logger instance for a specific module.

    Args:
        name: Logger name (usually __name__)

    Returns:
        Logger instance
    """
    return logging.getLogger(name)


def log_function_call(logger: logging.Logger, func_name: str, **kwargs):
    """
    Log function entry with parameters.

    Args:
        logger: Logger instance
        func_name: Function name
        **kwargs: Function parameters to log
    """
    params = {
        k: v for k, v in kwargs.items() if k not in ["password", "secret", "token"]
    }
    logger.debug(f"Entering {func_name} with params: {params}")


def log_performance(logger: logging.Logger, operation: str, duration: float, **metrics):
    """
    Log performance metrics.

    Args:
        logger: Logger instance
        operation: Operation name
        duration: Duration in seconds
        **metrics: Additional metrics to log
    """
    logger.info(f"Performance - {operation}: {duration:.3f}s")
    if metrics:
        for key, value in metrics.items():
            logger.info(f"  {key}: {value}")


def log_dataframe_info(logger: logging.Logger, df_name: str, df):
    """
    Log DataFrame information for debugging.

    Args:
        logger: Logger instance
        df_name: DataFrame name/description
        df: PySpark DataFrame
    """
    try:
        row_count = df.count()
        col_count = len(df.columns)
        logger.info(
            f"DataFrame '{df_name}' - Rows: {row_count:,}, Columns: {col_count}"
        )
        logger.debug(f"DataFrame '{df_name}' columns: {df.columns}")
    except Exception as e:
        logger.warning(f"Could not get DataFrame info for '{df_name}': {e}")


def log_s3_operation(
    logger: logging.Logger, operation: str, bucket: str, key: str, **kwargs
):
    """
    Log S3 operations with consistent format.

    Args:
        logger: Logger instance
        operation: S3 operation (read, write, delete, etc.)
        bucket: S3 bucket name
        key: S3 object key
        **kwargs: Additional operation details
    """
    details = f" | {kwargs}" if kwargs else ""
    logger.info(f"S3 {operation.upper()} - s3://{bucket}/{key}{details}")


def log_error_with_context(
    logger: logging.Logger, error: Exception, context: Dict[str, Any]
):
    """
    Log errors with additional context information.

    Args:
        logger: Logger instance
        error: Exception that occurred
        context: Additional context information
    """
    logger.error(f"Error: {str(error)}")
    logger.error(f"Context: {json.dumps(context, indent=2, default=str)}")
    logger.exception("Full traceback:")
Enter fullscreen mode Exit fullscreen mode

Initialise the logger in a single cell and use this logger throughout your notebook. Additional, some extra love to give to logger so we can avoid ValueError: I/O Operation on Closed File

# # Initialize logging and config
setup_logging(level="INFO", environment=params["env_name"])
logger = get_logger('glue.notebook')
# # Give logger some extra love so Jupyter Notebook when working with Pyspark will not hit `ValueError: I/O Operation on Closed File`
#https://stackoverflow.com/questions/31599940/how-to-print-current-logging-configuration-used-by-the-python-logging-module
root = logging.getLogger()
root.handlers[0].stream.write = print
Enter fullscreen mode Exit fullscreen mode

Config module

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Dict, Any

@dataclass
class JobConfig:
    """Configuration class for AWS Glue ETL jobs.

    This class centralizes all configuration parameters needed for Glue jobs,
    making it easier to manage and modify job settings.

    Attributes:
        env_name (str): Environment name (e.g., 'dev', 'prod')
        input_bucket (str): S3 bucket for input data
        output_bucket (str): S3 bucket for processed data
        error_bucket (str): S3 bucket for error files
        file_path (str): Base path in S3 buckets
        sns_topic_arn (str): ARN of SNS topic for notifications
        job_name (str): Name of the Glue job
        correlation_id (Optional[str]): Unique ID for tracing requests
        current_time (datetime): Timestamp for job execution
        spark_configs (Dict[str, Any]): Optional Spark configurations
    """

    # Required parameters
    env_name: str
    input_bucket: str
    output_bucket: str
    error_bucket: str
    file_path: str
    sns_topic_arn: str
    job_name: str

    # Optional parameters with defaults
    correlation_id: Optional[str] = None
    current_time: datetime = datetime.utcnow()
    spark_configs: Dict[str, Any] = None

    def __post_init__(self):
        """Validate configuration after initialization."""
        if not all([self.env_name, self.input_bucket, self.output_bucket, 
                   self.error_bucket, self.sns_topic_arn]):
            raise ValueError("Missing required configuration parameters")

        # Set default Spark configurations if none provided
        if self.spark_configs is None:
            self.spark_configs = {
                "spark.sql.adaptive.enabled": "true",
                "spark.sql.adaptive.coalescePartitions.enabled": "true",
                "spark.sql.shuffle.partitions": "200",
                # "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
                "spark.sql.broadcastTimeout": "600"
            }

    def get_s3_paths(self, file_name: str) -> Dict[str, str]:
        """Generate S3 paths for a given file.

        Args:
            file_name (str): Name of the file being processed

        Returns:
            Dict[str, str]: Dictionary containing input, output, and error paths
        """
        return {
            "input": f"s3://{self.input_bucket}/{self.env_name}/staging_{file_name}/",
            "output": f"s3://{self.output_bucket}/{self.env_name}/transform_{file_name}/",
            "error": f"s3://{self.error_bucket}/{self.env_name}/error_{file_name}"
        }

    def generate_correlation_id(self, file_name: str) -> str:
        """Generate a unique correlation ID for request tracing.

        Args:
            file_name (str): Name of the file being processed

        Returns:
            str: Unique correlation ID
        """
        if not self.correlation_id:
            self.correlation_id = f"{self.env_name}-{file_name}-{int(datetime.utcnow().timestamp())}"
        return self.correlation_id
Enter fullscreen mode Exit fullscreen mode

Utils module

import sys
import json
import time
from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql.types import NumericType
from botocore.exceptions import ClientError
import boto3
from tenacity import retry, stop_after_attempt, wait_exponential
from pyspark.sql import SparkSession

# from config import JobConfig
# from logging_config import (
#     get_logger,
#     log_function_call,
#     log_performance,
#     log_s3_operation,
#     log_error_with_context,
#     log_dataframe_info,
# )

# # Get module-specific logger
# logger = get_logger("glue.utils")


@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def move_to_error_bucket(s3_client, source: dict, destination: dict):
    """Move file to error bucket with retry mechanism."""

    s3_client.copy_object(
        Bucket=destination["bucket"],
        CopySource={"Bucket": source["bucket"], "Key": source["key"]},
        Key=destination["key"],
    )


def notify_sns(sns_client, topic_arn, message):
    """Send a notification to an SNS topic."""
    log_function_call(logger, "notify_sns", topic_arn=topic_arn)
    try:
        sns_client.publish(TopicArn=topic_arn, Message=message)
        logger.info(
            "SNS notification sent successfully",
            extra={"topic_arn": topic_arn, "operation": "sns_publish"},
        )
    except ClientError as e:
        log_error_with_context(
            logger,
            e,
            {
                "operation": "sns_publish",
                "topic_arn": topic_arn,
                "message_length": len(message),
            },
        )
        sys.exit(1)


def check_files_exist(s3_client, bucket, env_name, file_path, file_names):
    """Check if files exist in S3 bucket."""

    log_function_call(
        logger,
        "check_files_exist",
        bucket=bucket,
        env_name=env_name,
        file_path=file_path,
        file_count=len(file_names),
    )

    for file_name in file_names:
        try:
            s3_path = f"{file_path}/{file_name}"
            s3_client.head_object(Bucket=bucket, Key=s3_path)
            log_s3_operation(logger, "check", bucket, s3_path, exists=True)
        except s3_client.exceptions.ClientError as e:
            log_error_with_context(
                logger,
                e,
                {
                    "operation": "check_file_exists",
                    "bucket": bucket,
                    "file_path": s3_path,
                    "env_name": env_name,
                },
            )
            sys.exit(1)


def delete_directory_in_s3(s3_client, bucket, directory_path):
    """Delete all files in S3 directory."""
    log_function_call(
        logger, "delete_directory_in_s3", bucket=bucket, directory_path=directory_path
    )

    try:
        objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=directory_path)
        if "Contents" in objects:
            deleted_count = 0
            for obj in objects["Contents"]:
                s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
                log_s3_operation(logger, "delete", bucket, obj["Key"])
                deleted_count += 1

            logger.info(
                "Directory cleanup completed",
                extra={
                    "bucket": bucket,
                    "directory": directory_path,
                    "files_deleted": deleted_count,
                },
            )
        else:
            logger.info(
                "No files to delete",
                extra={"bucket": bucket, "directory": directory_path},
            )
    except Exception as e:
        log_error_with_context(
            logger,
            e,
            {
                "operation": "delete_directory",
                "bucket": bucket,
                "directory_path": directory_path,
            },
        )
        sys.exit(1)


def save_quality_report(s3_client, bucket, env_name, file_name, quality_report):
    """Save data quality report to S3."""
    log_function_call(
        logger,
        "save_quality_report",
        bucket=bucket,
        env_name=env_name,
        file_name=file_name,
    )

    try:
        report_key = f"{env_name}/quality_reports/quality_report_{file_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"

        s3_client.put_object(
            Bucket=bucket,
            Key=report_key,
            Body=json.dumps(quality_report, indent=2),
            ContentType="application/json",
        )

        log_s3_operation(
            logger,
            "write",
            bucket,
            report_key,
            size=len(json.dumps(quality_report)),
            content_type="application/json",
        )
    except Exception as e:
        log_error_with_context(
            logger,
            e,
            {
                "operation": "save_quality_report",
                "bucket": bucket,
                "env_name": env_name,
                "file_name": file_name,
            },
        )


def validate_dataframe(df, validation_rules: list[dict]) -> tuple[bool, list[str]]:
    """Validate DataFrame against defined rules."""
    errors = []

    for rule in validation_rules:
        if rule["type"] == "not_empty" and df.isEmpty():
            errors.append("DataFrame is empty")
        elif rule["type"] == "required_columns":
            missing = [col for col in rule["columns"] if col not in df.columns]
            if missing:
                errors.append(f"Missing required columns: {missing}")

    return len(errors) == 0, errors


def get_aws_clients():
    """Create and return AWS clients."""
    return {
        "s3": boto3.client("s3"),
        "sns": boto3.client("sns"),
    }


def optimize_spark_session(spark: SparkSession, config: JobConfig) -> SparkSession:
    """Apply Spark optimizations based on config."""
    for key, value in config.spark_configs.items():
        spark.conf.set(key, value)

    # Set dynamic partition pruning
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

    return spark


def check_dependencies(config: JobConfig) -> bool:
    """Verify all required services and resources are available."""
    try:
        s3 = boto3.client("s3")
        sns = boto3.client("sns")

        # Check S3 buckets
        for bucket in [config.input_bucket, config.output_bucket, config.error_bucket]:
            s3.head_bucket(Bucket=bucket)

        # Check SNS topic
        sns.get_topic_attributes(TopicArn=config.sns_topic_arn)

        return True
    except Exception as e:
        logger.error(f"Dependency check failed: {str(e)}")
        return False


def perform_data_quality_checks(df, file_name):
    """Perform data quality checks on DataFrame."""
    log_function_call(logger, "perform_data_quality_checks", file_name=file_name)
    start_time = time.time()

    # Log initial DataFrame info
    log_dataframe_info(logger, f"input_dataframe_{file_name}", df)

    quality_report = {
        "file_name": file_name,
        "total_rows": df.count(),
        "total_columns": len(df.columns),
        "quality_issues": [],
        "quality_score": 100.0,
        "is_valid": True,
    }

    logger.info(f"Starting data quality checks for {file_name}")

    # Check 1: Empty dataset
    if quality_report["total_rows"] == 0:
        quality_report["quality_issues"].append("Dataset is empty")
        quality_report["quality_score"] -= 50
        quality_report["is_valid"] = False
        return quality_report

    # Check 2: Null value analysis
    null_counts = {}
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        null_percentage = (null_count / quality_report["total_rows"]) * 100
        null_counts[column] = {
            "count": null_count,
            "percentage": round(null_percentage, 2),
        }

        # Flag columns with high null percentage
        if null_percentage > 50:
            quality_report["quality_issues"].append(
                f"Column '{column}' has {null_percentage:.2f}% null values"
            )
            quality_report["quality_score"] -= 10
        elif null_percentage > 25:
            quality_report["quality_issues"].append(
                f"Column '{column}' has {null_percentage:.2f}% null values (warning)"
            )
            quality_report["quality_score"] -= 5

    quality_report["null_analysis"] = null_counts

    # Check 3: Duplicate rows
    duplicate_count = quality_report["total_rows"] - df.dropDuplicates().count()
    if duplicate_count > 0:
        duplicate_percentage = (duplicate_count / quality_report["total_rows"]) * 100
        quality_report["quality_issues"].append(
            f"Found {duplicate_count} duplicate rows ({duplicate_percentage:.2f}%)"
        )
        quality_report["duplicate_count"] = duplicate_count
        if duplicate_percentage > 10:
            quality_report["quality_score"] -= 15
        else:
            quality_report["quality_score"] -= 5

    # Check 4: Data type consistency
    schema_issues = []
    for field in df.schema.fields:
        column_name = field.name
        expected_type = field.dataType

        # Check for mixed data types in string columns
        if str(expected_type) == "StringType":
            # Check if column contains only numeric values (might need to be numeric)
            try:
                numeric_count = df.filter(
                    col(column_name).rlike("^[0-9]+\\.?[0-9]*$")
                ).count()
                if numeric_count == quality_report["total_rows"]:
                    schema_issues.append(
                        f"Column '{column_name}' contains only numeric values but is string type"
                    )
            except:
                pass

    if schema_issues:
        quality_report["quality_issues"].extend(schema_issues)
        quality_report["quality_score"] -= len(schema_issues) * 3

    # Check 5: Column name validation
    column_issues = []
    for column in df.columns:
        # Check for spaces in column names
        if " " in column:
            column_issues.append(f"Column '{column}' contains spaces")
        # Check for special characters
        if not column.replace("_", "").replace("-", "").isalnum():
            column_issues.append(f"Column '{column}' contains special characters")

    if column_issues:
        quality_report["quality_issues"].extend(column_issues)
        quality_report["quality_score"] -= len(column_issues) * 2

    # Check 6: Numeric column validation
    numeric_columns = [
        f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)
    ]
    for column in numeric_columns:
        try:
            # Check for negative values where they might not be expected
            negative_count = df.filter(col(column) < 0).count()
            if negative_count > 0:
                quality_report["quality_issues"].append(
                    f"Column '{column}' has {negative_count} negative values"
                )

            # Check for extreme outliers (values beyond 3 standard deviations)
            stats = df.select(column).describe().collect()
            if len(stats) >= 3:  # mean, stddev available
                try:
                    mean_val = float(stats[1][1])  # mean
                    stddev_val = float(stats[2][1])  # stddev
                    if stddev_val > 0:
                        outlier_count = df.filter(
                            (col(column) > mean_val + 3 * stddev_val)
                            | (col(column) < mean_val - 3 * stddev_val)
                        ).count()
                        if outlier_count > 0:
                            outlier_percentage = (
                                outlier_count / quality_report["total_rows"]
                            ) * 100
                            if outlier_percentage > 5:
                                quality_report["quality_issues"].append(
                                    f"Column '{column}' has {outlier_count} potential outliers ({outlier_percentage:.2f}%)"
                                )
                except (ValueError, IndexError):
                    pass
        except Exception as e:
            logger.warning(
                f"Could not perform numeric validation on column '{column}': {e}"
            )

    # Final quality assessment
    if quality_report["quality_score"] < 70:
        quality_report["is_valid"] = False
    elif quality_report["quality_score"] < 85:
        quality_report["quality_issues"].append(
            "Data quality is below recommended threshold"
        )

    quality_report["quality_score"] = max(0, quality_report["quality_score"])

    logger.info(f"Data quality check completed for {file_name}")
    logger.info(f"Quality Score: {quality_report['quality_score']:.1f}/100")
    logger.info(f"Issues Found: {len(quality_report['quality_issues'])}")

    # Log performance metrics
    duration = time.time() - start_time
    log_performance(
        logger,
        "data_quality_check",
        duration,
        file_name=file_name,
        rows_processed=quality_report["total_rows"],
        issues_found=len(quality_report["quality_issues"]),
        quality_score=quality_report["quality_score"],
    )

    return quality_report


def track_job_metrics(metrics: dict) -> None:
    """Track job metrics for monitoring.

    Args:
        metrics (dict): Dictionary containing job metrics including:
            - duration: Total job duration
            - processed: Number of files processed
            - success_rate: Processing success rate
            - total_rows: Total number of rows processed
            - spark_metrics (optional): Spark-specific metrics
    """
    metric_data = {
        "processing_time": metrics.get("duration"),
        "files_processed": metrics.get("processed"),
        "success_rate": metrics.get("success_rate"),
        "total_rows": metrics.get("total_rows"),
    }

    logger.info("Job metrics", extra={"metrics": metric_data}) 
Enter fullscreen mode Exit fullscreen mode
  • Example execution log output When running the job, your output might look like this:
glue.notebook        - 2025-11-07 11:37:25,573 - INFO - S3 CHECK - s3://***/2023_yellow_taxi_trip_data_light.csv | {'exists': True}

glue.notebook        - 2025-11-07 11:37:25,623 - INFO - No files to delete

glue.notebook        - 2025-11-07 11:37:25,668 - INFO - No files to delete

glue.notebook        - 2025-11-07 11:37:25,668 - INFO - Processing file 1/1: 2023_yellow_taxi_trip_data_light.csv

glue.notebook        - 2025-11-07 11:37:41,147 - INFO - DataFrame columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']

glue.notebook        - 2025-11-07 11:37:49,241 - INFO - Performance - process_file_2023_yellow_taxi_trip_data_light.csv: 23.257s

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   correlation_id: kate-2023_yellow_taxi_trip_data_light.csv-1762515445

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   rows_processed: 4

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   read_duration: 15.443103075027466

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   write_duration: 6.300567388534546

glue.notebook        - 2025-11-07 11:37:49,242 - INFO - Job completed successfully - Processed 1/1 files in 25.20s

glue.notebook        - 2025-11-07 11:37:49,242 - INFO - Job metrics

glue.notebook        - 2025-11-07 11:37:49,330 - INFO - Spark session stopped
%stop_session
Stopping session: ccc5b160-7f7e-4c93-ab19-1eb0966a796e
Stopped session.glue.notebook        - 2025-11-07 11:37:25,573 - INFO - S3 CHECK - s3://***/2023_yellow_taxi_trip_data_light.csv | {'exists': True}

glue.notebook        - 2025-11-07 11:37:25,623 - INFO - No files to delete

glue.notebook        - 2025-11-07 11:37:25,668 - INFO - No files to delete

glue.notebook        - 2025-11-07 11:37:25,668 - INFO - Processing file 1/1: 2023_yellow_taxi_trip_data_light.csv

glue.notebook        - 2025-11-07 11:37:41,147 - INFO - DataFrame columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']

glue.notebook        - 2025-11-07 11:37:49,241 - INFO - Performance - process_file_2023_yellow_taxi_trip_data_light.csv: 23.257s

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   correlation_id: kate-2023_yellow_taxi_trip_data_light.csv-1762515445

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   rows_processed: 4

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   read_duration: 15.443103075027466

glue.notebook        - 2025-11-07 11:37:49,242 - INFO -   write_duration: 6.300567388534546

glue.notebook        - 2025-11-07 11:37:49,242 - INFO - Job completed successfully - Processed 1/1 files in 25.20s

glue.notebook        - 2025-11-07 11:37:49,242 - INFO - Job metrics

glue.notebook        - 2025-11-07 11:37:49,330 - INFO - Spark session stopped
%stop_session
Stopping session: ccc5b160-7f7e-4c93-ab19-1eb0966a796e
Stopped session.
Enter fullscreen mode Exit fullscreen mode
  • At the end of your notebook, include the following command to stop the session and avoid unnecessary costs.
%stop_session
Enter fullscreen mode Exit fullscreen mode

In the very first cell, config idle timeout to terminate the session after a certain period of inactivity. For example %idle_timeout 20 . This setting makes sure your session will be terminated after 20 minutes of inactivity, preventing us from unexpected billing.

Thoughts

  • Convenient for testing and debugging: you can rerun only modified cells instead of the entire job.
  • Support develop, and test locally.
  • Managing extra py files can be troublesome especially if you update them frequently.
  • Additional setup might be required. For example ETL Jobs arguments

Reference:

Top comments (0)