Building and testing AWS Glue scripts locally is more flexible and can speed up your development. In this blog, I walk through how to develop and test AWS Glue jobs locally using Jupiter Notebook
For a full pipeline implementation with AWS CDK, refer to Automatic Trigger Data Pipeline with AWS using AWS CDK
For a full pipeline implementation with AWS CDK, refer to Automatic Trigger Data Pipeline with AWS using AWS CDKFor a full pipeline implementation with AWS CDK, refer to Automatic Trigger Data Pipeline with AWS using AWS CDK
Overview
When developing AWS Glue for Spark jobs, there are several approaches available :
- AWS Glue console
- Develop and test script locally using Jupiter notebook or Docker image Choosing which approach will depend on your workflow and team setup. However developing locally often provides you more flexibility, faster testing, and easier debugging.
1. AWS Glue console:
To get started, simply open AWS Glue console and navigate to ETL jobs
Press enter or click to view image in full size
For these, you can choose from three options:
- Visual editor: if you prefer less code or even no code, visual editor is a great option.
- Script editor: Script editor is more straight forward and less setup than AWS Glue Studio notebook for my options. However, when deploying using AWS CDK, I encounter a significant challenge. We ended up manually calculating asset_hash to make sure it is different for each deployment. So if you decide to develop the job via script editor and using s3asset for deployment, make sure to isolate the change to your deployment only.
- AWS Glue Studio: provide an interactive development experience via built-in notebook interface. You can develop and test the scripts interactively without running the whole job. When you are satisfied with the scripts, you can save and download it either as .ipynb files or job scripts. With a single click, you can convert it into Glue jobs. In the next session, we will explore how to develop and test the script using Jupiter Notebook.
2. Develop and test scripts locally
When developing AWS Glue for Spark, you can develop and test your scripts locally using Jupiter notebook or Docker before deploying it in AWS.
Below is a step by step walkthrough using Jupiter Notebook.
The Jupyter Notebook is an interactive environment for running code in the browser (Introduction to machine learning, Andreas * Sarah). For more information refer to What is the Jupyter Notebook?
Pricing consideration
Before getting start let check the pricing for our sessions.
AWS Glue Studio Job Notebooks and Interactive Sessions: Suppose you use a notebook in AWS Glue Studio to interactively develop your ETL code. An Interactive Session has 5 DPU by default. The price of 1 DPU-hour is $0.44. If you keep the session running for 24 minutes, you will be billed for 5 DPUs * 0.4 hours * $0.44, or $0.88.” (https://aws.amazon.com/glue/pricing/)
To avoid unnecessary charges, remember to set timeout and terminate sessions when you are done testing.
Setup
- Install Jupyter and AWS glue interactive sessions Jupiter kernels
pip3 install - upgrade jupyter boto3 aws-glue-sessionsb
install-glue-kernels
- Run jupiter notebook
jupyter notebook
This will launch the interface in your browser.
Alternatively, many IDEs (such as VSCode or Cursor) support Jupyter extensions, allowing you to run notebooks and view outputs directly within the IDE.
Once Jupyter is open, choose Glue PySpark as the kernel.
Press enter or click to view image in full size
- Config session credentials and region In the first notebook cell, configure session credential and region, along with other preferences using Jupiter Magic commands. Refer to Configuring AWS Glue interactive sessions for Jupyter and AWS Glue Studio notebooks for more details
# Set region and assume provided role
%idle_timeout 20
%region ap-southeast-2
%iam_role <Replace with GlueIngestionGlueJobRoleARN>
# %additional_python_modules ipython
# %extra_py_files
# %glue_version 5.0
# %worker_type G.1X
# %number_of_workers 5
# Verify identity
import json, boto3
print(json.dumps(boto3.client("sts").get_caller_identity(), indent=2))
- Input arguments For AWS ETL jobs, arguments usually come from geRolsolveOptions. You can stimulate this by defining parameters as below
# Parameters
# Adjust these for your local run; in Glue these come from getResolvedOptions
params = {
"env_name": "kate",
"input_bucket": "source-data",
"output_bucket": "staging-data",
"error_bucket": "error-staging-data",
"file_path": "kate/default",
# Single file for local test from your config
"file_names": "2023_yellow_taxi_trip_data_light.csv",
"sns_topic_arn": <sns arn>,
"JOB_NAME": "local_ingestion_job",
}
params
-
Include extra py files
Upload them to S3 bucket and refer them using
%extra py filesmagic command. However, if you are working on these files, frequency update can be tedious, you will have to re-upload every time you change a function. To make my life a bit easier for these, I include these modules directly within notebook cells and comment out their import lines in the main scripts. For example, include modules for: logging, configuration, and utility functions Logging module
"""
Centralized logging configuration for AWS Data Pipeline
"""
import logging
import logging.config
from typing import Optional, Dict, Any
import json
def setup_logging(
level: str = "INFO",
log_format: Optional[str] = None,
log_file: Optional[str] = None,
environment: str = "dev",
) -> logging.Logger:
"""
Set up centralized logging configuration.
Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_format: Custom log format string
log_file: Path to log file (optional)
environment: Environment name (dev, staging, prod)
Returns:
Configured logger instance
"""
# Default format with structured information
if log_format is None:
log_format = (
"%(asctime)s | %(levelname)-8s | %(name)-20s | "
"%(funcName)-15s:%(lineno)-4d | %(message)s"
)
# Create handlers
handlers = {
"stdout": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "simple",
"stream": "ext://sys.stderr",
},
}
# Add file handler if specified
if log_file:
handlers["file"] = {
"class": "logging.handlers.RotatingFileHandler",
"level": level,
"formatter": "detailed",
"filename": log_file,
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"encoding": "utf8",
}
# Logging configuration dictionary
logging_config = {
"version": 1,
"disable_existing_loggers": False, # Important to keep this False
"formatters": {
"detailed": {"format": log_format, "datefmt": "%Y-%m-%d %H:%M:%S"},
"simple": {"format": "%(name)-20s - %(asctime)s - %(levelname)s - %(message)s"},
},
"handlers": handlers,
"loggers": {
# Root logger - Parent of all loggers
"": {
"level": level,
"handlers": list(handlers.keys()),
},
# Base application logger
"glue": {
"level": level,
"propagate": True, # Allow propagation to root
},
# AWS SDK loggers (reduce noise)
"boto3": {"level": "WARNING"},
"botocore": {"level": "WARNING"},
"urllib3": {"level": "WARNING"},
# PySpark loggers
"pyspark": {"level": "WARNING"},
"py4j": {"level": "WARNING"},
},
}
# Apply configuration
logging.config.dictConfig(logging_config)
# Get logger for the calling module
logger = logging.getLogger(__name__)
# Log configuration info
logger.info(f"Logging configured - Level: {level}, Environment: {environment}")
if log_file:
logger.info(f"Log file: {log_file}")
return logger
def get_logger(name: str) -> logging.Logger:
"""
Get a logger instance for a specific module.
Args:
name: Logger name (usually __name__)
Returns:
Logger instance
"""
return logging.getLogger(name)
def log_function_call(logger: logging.Logger, func_name: str, **kwargs):
"""
Log function entry with parameters.
Args:
logger: Logger instance
func_name: Function name
**kwargs: Function parameters to log
"""
params = {
k: v for k, v in kwargs.items() if k not in ["password", "secret", "token"]
}
logger.debug(f"Entering {func_name} with params: {params}")
def log_performance(logger: logging.Logger, operation: str, duration: float, **metrics):
"""
Log performance metrics.
Args:
logger: Logger instance
operation: Operation name
duration: Duration in seconds
**metrics: Additional metrics to log
"""
logger.info(f"Performance - {operation}: {duration:.3f}s")
if metrics:
for key, value in metrics.items():
logger.info(f" {key}: {value}")
def log_dataframe_info(logger: logging.Logger, df_name: str, df):
"""
Log DataFrame information for debugging.
Args:
logger: Logger instance
df_name: DataFrame name/description
df: PySpark DataFrame
"""
try:
row_count = df.count()
col_count = len(df.columns)
logger.info(
f"DataFrame '{df_name}' - Rows: {row_count:,}, Columns: {col_count}"
)
logger.debug(f"DataFrame '{df_name}' columns: {df.columns}")
except Exception as e:
logger.warning(f"Could not get DataFrame info for '{df_name}': {e}")
def log_s3_operation(
logger: logging.Logger, operation: str, bucket: str, key: str, **kwargs
):
"""
Log S3 operations with consistent format.
Args:
logger: Logger instance
operation: S3 operation (read, write, delete, etc.)
bucket: S3 bucket name
key: S3 object key
**kwargs: Additional operation details
"""
details = f" | {kwargs}" if kwargs else ""
logger.info(f"S3 {operation.upper()} - s3://{bucket}/{key}{details}")
def log_error_with_context(
logger: logging.Logger, error: Exception, context: Dict[str, Any]
):
"""
Log errors with additional context information.
Args:
logger: Logger instance
error: Exception that occurred
context: Additional context information
"""
logger.error(f"Error: {str(error)}")
logger.error(f"Context: {json.dumps(context, indent=2, default=str)}")
logger.exception("Full traceback:")
Initialise the logger in a single cell and use this logger throughout your notebook. Additional, some extra love to give to logger so we can avoid ValueError: I/O Operation on Closed File
# # Initialize logging and config
setup_logging(level="INFO", environment=params["env_name"])
logger = get_logger('glue.notebook')
# # Give logger some extra love so Jupyter Notebook when working with Pyspark will not hit `ValueError: I/O Operation on Closed File`
#https://stackoverflow.com/questions/31599940/how-to-print-current-logging-configuration-used-by-the-python-logging-module
root = logging.getLogger()
root.handlers[0].stream.write = print
Config module
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Dict, Any
@dataclass
class JobConfig:
"""Configuration class for AWS Glue ETL jobs.
This class centralizes all configuration parameters needed for Glue jobs,
making it easier to manage and modify job settings.
Attributes:
env_name (str): Environment name (e.g., 'dev', 'prod')
input_bucket (str): S3 bucket for input data
output_bucket (str): S3 bucket for processed data
error_bucket (str): S3 bucket for error files
file_path (str): Base path in S3 buckets
sns_topic_arn (str): ARN of SNS topic for notifications
job_name (str): Name of the Glue job
correlation_id (Optional[str]): Unique ID for tracing requests
current_time (datetime): Timestamp for job execution
spark_configs (Dict[str, Any]): Optional Spark configurations
"""
# Required parameters
env_name: str
input_bucket: str
output_bucket: str
error_bucket: str
file_path: str
sns_topic_arn: str
job_name: str
# Optional parameters with defaults
correlation_id: Optional[str] = None
current_time: datetime = datetime.utcnow()
spark_configs: Dict[str, Any] = None
def __post_init__(self):
"""Validate configuration after initialization."""
if not all([self.env_name, self.input_bucket, self.output_bucket,
self.error_bucket, self.sns_topic_arn]):
raise ValueError("Missing required configuration parameters")
# Set default Spark configurations if none provided
if self.spark_configs is None:
self.spark_configs = {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.sql.shuffle.partitions": "200",
# "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.broadcastTimeout": "600"
}
def get_s3_paths(self, file_name: str) -> Dict[str, str]:
"""Generate S3 paths for a given file.
Args:
file_name (str): Name of the file being processed
Returns:
Dict[str, str]: Dictionary containing input, output, and error paths
"""
return {
"input": f"s3://{self.input_bucket}/{self.env_name}/staging_{file_name}/",
"output": f"s3://{self.output_bucket}/{self.env_name}/transform_{file_name}/",
"error": f"s3://{self.error_bucket}/{self.env_name}/error_{file_name}"
}
def generate_correlation_id(self, file_name: str) -> str:
"""Generate a unique correlation ID for request tracing.
Args:
file_name (str): Name of the file being processed
Returns:
str: Unique correlation ID
"""
if not self.correlation_id:
self.correlation_id = f"{self.env_name}-{file_name}-{int(datetime.utcnow().timestamp())}"
return self.correlation_id
Utils module
import sys
import json
import time
from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql.types import NumericType
from botocore.exceptions import ClientError
import boto3
from tenacity import retry, stop_after_attempt, wait_exponential
from pyspark.sql import SparkSession
# from config import JobConfig
# from logging_config import (
# get_logger,
# log_function_call,
# log_performance,
# log_s3_operation,
# log_error_with_context,
# log_dataframe_info,
# )
# # Get module-specific logger
# logger = get_logger("glue.utils")
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def move_to_error_bucket(s3_client, source: dict, destination: dict):
"""Move file to error bucket with retry mechanism."""
s3_client.copy_object(
Bucket=destination["bucket"],
CopySource={"Bucket": source["bucket"], "Key": source["key"]},
Key=destination["key"],
)
def notify_sns(sns_client, topic_arn, message):
"""Send a notification to an SNS topic."""
log_function_call(logger, "notify_sns", topic_arn=topic_arn)
try:
sns_client.publish(TopicArn=topic_arn, Message=message)
logger.info(
"SNS notification sent successfully",
extra={"topic_arn": topic_arn, "operation": "sns_publish"},
)
except ClientError as e:
log_error_with_context(
logger,
e,
{
"operation": "sns_publish",
"topic_arn": topic_arn,
"message_length": len(message),
},
)
sys.exit(1)
def check_files_exist(s3_client, bucket, env_name, file_path, file_names):
"""Check if files exist in S3 bucket."""
log_function_call(
logger,
"check_files_exist",
bucket=bucket,
env_name=env_name,
file_path=file_path,
file_count=len(file_names),
)
for file_name in file_names:
try:
s3_path = f"{file_path}/{file_name}"
s3_client.head_object(Bucket=bucket, Key=s3_path)
log_s3_operation(logger, "check", bucket, s3_path, exists=True)
except s3_client.exceptions.ClientError as e:
log_error_with_context(
logger,
e,
{
"operation": "check_file_exists",
"bucket": bucket,
"file_path": s3_path,
"env_name": env_name,
},
)
sys.exit(1)
def delete_directory_in_s3(s3_client, bucket, directory_path):
"""Delete all files in S3 directory."""
log_function_call(
logger, "delete_directory_in_s3", bucket=bucket, directory_path=directory_path
)
try:
objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=directory_path)
if "Contents" in objects:
deleted_count = 0
for obj in objects["Contents"]:
s3_client.delete_object(Bucket=bucket, Key=obj["Key"])
log_s3_operation(logger, "delete", bucket, obj["Key"])
deleted_count += 1
logger.info(
"Directory cleanup completed",
extra={
"bucket": bucket,
"directory": directory_path,
"files_deleted": deleted_count,
},
)
else:
logger.info(
"No files to delete",
extra={"bucket": bucket, "directory": directory_path},
)
except Exception as e:
log_error_with_context(
logger,
e,
{
"operation": "delete_directory",
"bucket": bucket,
"directory_path": directory_path,
},
)
sys.exit(1)
def save_quality_report(s3_client, bucket, env_name, file_name, quality_report):
"""Save data quality report to S3."""
log_function_call(
logger,
"save_quality_report",
bucket=bucket,
env_name=env_name,
file_name=file_name,
)
try:
report_key = f"{env_name}/quality_reports/quality_report_{file_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
s3_client.put_object(
Bucket=bucket,
Key=report_key,
Body=json.dumps(quality_report, indent=2),
ContentType="application/json",
)
log_s3_operation(
logger,
"write",
bucket,
report_key,
size=len(json.dumps(quality_report)),
content_type="application/json",
)
except Exception as e:
log_error_with_context(
logger,
e,
{
"operation": "save_quality_report",
"bucket": bucket,
"env_name": env_name,
"file_name": file_name,
},
)
def validate_dataframe(df, validation_rules: list[dict]) -> tuple[bool, list[str]]:
"""Validate DataFrame against defined rules."""
errors = []
for rule in validation_rules:
if rule["type"] == "not_empty" and df.isEmpty():
errors.append("DataFrame is empty")
elif rule["type"] == "required_columns":
missing = [col for col in rule["columns"] if col not in df.columns]
if missing:
errors.append(f"Missing required columns: {missing}")
return len(errors) == 0, errors
def get_aws_clients():
"""Create and return AWS clients."""
return {
"s3": boto3.client("s3"),
"sns": boto3.client("sns"),
}
def optimize_spark_session(spark: SparkSession, config: JobConfig) -> SparkSession:
"""Apply Spark optimizations based on config."""
for key, value in config.spark_configs.items():
spark.conf.set(key, value)
# Set dynamic partition pruning
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
return spark
def check_dependencies(config: JobConfig) -> bool:
"""Verify all required services and resources are available."""
try:
s3 = boto3.client("s3")
sns = boto3.client("sns")
# Check S3 buckets
for bucket in [config.input_bucket, config.output_bucket, config.error_bucket]:
s3.head_bucket(Bucket=bucket)
# Check SNS topic
sns.get_topic_attributes(TopicArn=config.sns_topic_arn)
return True
except Exception as e:
logger.error(f"Dependency check failed: {str(e)}")
return False
def perform_data_quality_checks(df, file_name):
"""Perform data quality checks on DataFrame."""
log_function_call(logger, "perform_data_quality_checks", file_name=file_name)
start_time = time.time()
# Log initial DataFrame info
log_dataframe_info(logger, f"input_dataframe_{file_name}", df)
quality_report = {
"file_name": file_name,
"total_rows": df.count(),
"total_columns": len(df.columns),
"quality_issues": [],
"quality_score": 100.0,
"is_valid": True,
}
logger.info(f"Starting data quality checks for {file_name}")
# Check 1: Empty dataset
if quality_report["total_rows"] == 0:
quality_report["quality_issues"].append("Dataset is empty")
quality_report["quality_score"] -= 50
quality_report["is_valid"] = False
return quality_report
# Check 2: Null value analysis
null_counts = {}
for column in df.columns:
null_count = df.filter(col(column).isNull()).count()
null_percentage = (null_count / quality_report["total_rows"]) * 100
null_counts[column] = {
"count": null_count,
"percentage": round(null_percentage, 2),
}
# Flag columns with high null percentage
if null_percentage > 50:
quality_report["quality_issues"].append(
f"Column '{column}' has {null_percentage:.2f}% null values"
)
quality_report["quality_score"] -= 10
elif null_percentage > 25:
quality_report["quality_issues"].append(
f"Column '{column}' has {null_percentage:.2f}% null values (warning)"
)
quality_report["quality_score"] -= 5
quality_report["null_analysis"] = null_counts
# Check 3: Duplicate rows
duplicate_count = quality_report["total_rows"] - df.dropDuplicates().count()
if duplicate_count > 0:
duplicate_percentage = (duplicate_count / quality_report["total_rows"]) * 100
quality_report["quality_issues"].append(
f"Found {duplicate_count} duplicate rows ({duplicate_percentage:.2f}%)"
)
quality_report["duplicate_count"] = duplicate_count
if duplicate_percentage > 10:
quality_report["quality_score"] -= 15
else:
quality_report["quality_score"] -= 5
# Check 4: Data type consistency
schema_issues = []
for field in df.schema.fields:
column_name = field.name
expected_type = field.dataType
# Check for mixed data types in string columns
if str(expected_type) == "StringType":
# Check if column contains only numeric values (might need to be numeric)
try:
numeric_count = df.filter(
col(column_name).rlike("^[0-9]+\\.?[0-9]*$")
).count()
if numeric_count == quality_report["total_rows"]:
schema_issues.append(
f"Column '{column_name}' contains only numeric values but is string type"
)
except:
pass
if schema_issues:
quality_report["quality_issues"].extend(schema_issues)
quality_report["quality_score"] -= len(schema_issues) * 3
# Check 5: Column name validation
column_issues = []
for column in df.columns:
# Check for spaces in column names
if " " in column:
column_issues.append(f"Column '{column}' contains spaces")
# Check for special characters
if not column.replace("_", "").replace("-", "").isalnum():
column_issues.append(f"Column '{column}' contains special characters")
if column_issues:
quality_report["quality_issues"].extend(column_issues)
quality_report["quality_score"] -= len(column_issues) * 2
# Check 6: Numeric column validation
numeric_columns = [
f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)
]
for column in numeric_columns:
try:
# Check for negative values where they might not be expected
negative_count = df.filter(col(column) < 0).count()
if negative_count > 0:
quality_report["quality_issues"].append(
f"Column '{column}' has {negative_count} negative values"
)
# Check for extreme outliers (values beyond 3 standard deviations)
stats = df.select(column).describe().collect()
if len(stats) >= 3: # mean, stddev available
try:
mean_val = float(stats[1][1]) # mean
stddev_val = float(stats[2][1]) # stddev
if stddev_val > 0:
outlier_count = df.filter(
(col(column) > mean_val + 3 * stddev_val)
| (col(column) < mean_val - 3 * stddev_val)
).count()
if outlier_count > 0:
outlier_percentage = (
outlier_count / quality_report["total_rows"]
) * 100
if outlier_percentage > 5:
quality_report["quality_issues"].append(
f"Column '{column}' has {outlier_count} potential outliers ({outlier_percentage:.2f}%)"
)
except (ValueError, IndexError):
pass
except Exception as e:
logger.warning(
f"Could not perform numeric validation on column '{column}': {e}"
)
# Final quality assessment
if quality_report["quality_score"] < 70:
quality_report["is_valid"] = False
elif quality_report["quality_score"] < 85:
quality_report["quality_issues"].append(
"Data quality is below recommended threshold"
)
quality_report["quality_score"] = max(0, quality_report["quality_score"])
logger.info(f"Data quality check completed for {file_name}")
logger.info(f"Quality Score: {quality_report['quality_score']:.1f}/100")
logger.info(f"Issues Found: {len(quality_report['quality_issues'])}")
# Log performance metrics
duration = time.time() - start_time
log_performance(
logger,
"data_quality_check",
duration,
file_name=file_name,
rows_processed=quality_report["total_rows"],
issues_found=len(quality_report["quality_issues"]),
quality_score=quality_report["quality_score"],
)
return quality_report
def track_job_metrics(metrics: dict) -> None:
"""Track job metrics for monitoring.
Args:
metrics (dict): Dictionary containing job metrics including:
- duration: Total job duration
- processed: Number of files processed
- success_rate: Processing success rate
- total_rows: Total number of rows processed
- spark_metrics (optional): Spark-specific metrics
"""
metric_data = {
"processing_time": metrics.get("duration"),
"files_processed": metrics.get("processed"),
"success_rate": metrics.get("success_rate"),
"total_rows": metrics.get("total_rows"),
}
logger.info("Job metrics", extra={"metrics": metric_data})
- Example execution log output When running the job, your output might look like this:
glue.notebook - 2025-11-07 11:37:25,573 - INFO - S3 CHECK - s3://***/2023_yellow_taxi_trip_data_light.csv | {'exists': True}
glue.notebook - 2025-11-07 11:37:25,623 - INFO - No files to delete
glue.notebook - 2025-11-07 11:37:25,668 - INFO - No files to delete
glue.notebook - 2025-11-07 11:37:25,668 - INFO - Processing file 1/1: 2023_yellow_taxi_trip_data_light.csv
glue.notebook - 2025-11-07 11:37:41,147 - INFO - DataFrame columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']
glue.notebook - 2025-11-07 11:37:49,241 - INFO - Performance - process_file_2023_yellow_taxi_trip_data_light.csv: 23.257s
glue.notebook - 2025-11-07 11:37:49,242 - INFO - correlation_id: kate-2023_yellow_taxi_trip_data_light.csv-1762515445
glue.notebook - 2025-11-07 11:37:49,242 - INFO - rows_processed: 4
glue.notebook - 2025-11-07 11:37:49,242 - INFO - read_duration: 15.443103075027466
glue.notebook - 2025-11-07 11:37:49,242 - INFO - write_duration: 6.300567388534546
glue.notebook - 2025-11-07 11:37:49,242 - INFO - Job completed successfully - Processed 1/1 files in 25.20s
glue.notebook - 2025-11-07 11:37:49,242 - INFO - Job metrics
glue.notebook - 2025-11-07 11:37:49,330 - INFO - Spark session stopped
%stop_session
Stopping session: ccc5b160-7f7e-4c93-ab19-1eb0966a796e
Stopped session.glue.notebook - 2025-11-07 11:37:25,573 - INFO - S3 CHECK - s3://***/2023_yellow_taxi_trip_data_light.csv | {'exists': True}
glue.notebook - 2025-11-07 11:37:25,623 - INFO - No files to delete
glue.notebook - 2025-11-07 11:37:25,668 - INFO - No files to delete
glue.notebook - 2025-11-07 11:37:25,668 - INFO - Processing file 1/1: 2023_yellow_taxi_trip_data_light.csv
glue.notebook - 2025-11-07 11:37:41,147 - INFO - DataFrame columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']
glue.notebook - 2025-11-07 11:37:49,241 - INFO - Performance - process_file_2023_yellow_taxi_trip_data_light.csv: 23.257s
glue.notebook - 2025-11-07 11:37:49,242 - INFO - correlation_id: kate-2023_yellow_taxi_trip_data_light.csv-1762515445
glue.notebook - 2025-11-07 11:37:49,242 - INFO - rows_processed: 4
glue.notebook - 2025-11-07 11:37:49,242 - INFO - read_duration: 15.443103075027466
glue.notebook - 2025-11-07 11:37:49,242 - INFO - write_duration: 6.300567388534546
glue.notebook - 2025-11-07 11:37:49,242 - INFO - Job completed successfully - Processed 1/1 files in 25.20s
glue.notebook - 2025-11-07 11:37:49,242 - INFO - Job metrics
glue.notebook - 2025-11-07 11:37:49,330 - INFO - Spark session stopped
%stop_session
Stopping session: ccc5b160-7f7e-4c93-ab19-1eb0966a796e
Stopped session.
- At the end of your notebook, include the following command to stop the session and avoid unnecessary costs.
%stop_session
In the very first cell, config idle timeout to terminate the session after a certain period of inactivity. For example %idle_timeout 20 . This setting makes sure your session will be terminated after 20 minutes of inactivity, preventing us from unexpected billing.
Thoughts
- Convenient for testing and debugging: you can rerun only modified cells instead of the entire job.
- Support develop, and test locally.
- Managing extra py files can be troublesome especially if you update them frequently.
- Additional setup might be required. For example ETL Jobs arguments


Top comments (0)