Original Japanese article: S3トリガー×AWS Lambda×Glue Python Shellの起動パターン整理
Introduction
I'm Aki, an AWS Community Builder (@jitepengin).
In my previous articles, I introduced lightweight ETL using Glue Python Shell.
In this article, I’ll organize two patterns for triggering Glue Python Shell when a file is placed in S3, and explain the reasoning behind calling Glue via Lambda.
Why Use Lambda to Trigger Glue Python Shell
While it is possible to implement ETL processing entirely within Lambda triggered by S3 events, there are limitations in runtime and memory.
By using Lambda as a trigger and delegating lightweight preprocessing or integration with other services to Lambda while executing the main ETL in Glue Python Shell, you can achieve flexible service integration and long-running processing.
For more details on when to use Lambda vs Glue Python Shell, check out my previous article:
AWS Lambda and AWS Glue Python Shell in the Context of Lightweight ETL
Trigger Patterns
The two patterns covered in this article are:
-
Lambda +
start_job_run(Direct Job Execution) -
Lambda +
start_workflow_run(Workflow Execution)
Other patterns exist, such as S3 → EventBridge → Step Functions → Glue Python Shell, but we’ll focus on these two simpler approaches.
Pattern 1: Lambda + start_job_run (Direct Job Execution)
In this pattern, Lambda receives the S3 file placement event and directly triggers a Glue Python Shell job using start_job_run.
This was the setup used in my previous article.
Characteristics:
- High flexibility: Lambda can integrate with many services.
- Error handling and retries need consideration. Glue Python Shell allows job retries, but trigger retries must be implemented in Lambda.
- Flow control can get complex if needed (may require EventBridge → Step Functions → Lambda setup).
- Lambda alone might suffice in simple cases; choose between flexible Lambda and long-running Glue Python Shell.
Lambda Sample Code
Set the target Job name and parameters for start_job_run. Here we pass the S3 file path.
import boto3
def lambda_handler(event, context):
glue = boto3.client("glue")
s3_bucket = event['Records'][0]['s3']['bucket']['name']
s3_object_key = event['Records'][0]['s3']['object']['key']
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
response = glue.start_job_run(
JobName="YOUR_TARGET_JOB_NAME",
Arguments={
"--s3_input": s3_input_path
}
)
print(f"Glue Job started: {response['JobRunId']}")
return response
Setup Steps
- Create a Glue Python Shell job.
import boto3
import sys
import os
from awsglue.utils import getResolvedOptions
def get_job_parameters():
try:
required_args = ['s3_input']
args = getResolvedOptions(sys.argv, required_args)
s3_file_path = args['s3_input']
print(f"s3_input: {s3_file_path}")
return s3_file_path
except Exception as e:
print(f"parameters error: {e}")
raise
def _to_pyarrow_table(result):
"""
Compatibility helper to extract a pyarrow.Table from a chDB query_result.
"""
import chdb
if hasattr(chdb, "to_arrowTable"):
return chdb.to_arrowTable(result)
if hasattr(result, "to_pyarrow"):
return result.to_pyarrow()
if hasattr(result, "to_arrow"):
return result.to_arrow()
raise RuntimeError(
"Cannot convert chdb query_result to pyarrow.Table. "
f"Available attributes: {sorted(dir(result))[:200]}"
)
def normalize_arrow_for_iceberg(table):
"""
Normalize Arrow schema for Iceberg compatibility.
- timestamptz -> timestamp
"""
import pyarrow as pa
new_fields = []
new_columns = []
for field, column in zip(table.schema, table.columns):
# timestamp with timezone -> timestamp
if pa.types.is_timestamp(field.type) and field.type.tz is not None:
new_type = pa.timestamp(field.type.unit)
new_fields.append(pa.field(field.name, new_type, field.nullable))
new_columns.append(column.cast(new_type))
else:
new_fields.append(field)
new_columns.append(column)
new_schema = pa.schema(new_fields)
return pa.Table.from_arrays(new_columns, schema=new_schema)
def read_parquet_with_chdb(s3_input):
"""
Read Parquet file from S3 using chDB.
"""
import chdb
if s3_input.startswith("s3://"):
bucket, key = s3_input.replace("s3://", "").split("/", 1)
s3_url = f"https://{bucket}.s3.ap-northeast-1.amazonaws.com/{key}"
else:
s3_url = s3_input
print(f"Reading data from S3: {s3_url}")
query = f"""
SELECT *
FROM s3('{s3_url}', 'Parquet')
WHERE VendorID = 1
"""
result = chdb.query(query, "Arrow")
arrow_table = _to_pyarrow_table(result)
print("Original schema:")
print(arrow_table.schema)
# Normalize schema for Iceberg compatibility
arrow_table = normalize_arrow_for_iceberg(arrow_table)
print("Normalized schema:")
print(arrow_table.schema)
print(f"Rows: {arrow_table.num_rows:,}")
return arrow_table
def write_iceberg_table(arrow_table):
"""
Write Arrow table to Iceberg table using PyIceberg.
"""
try:
print("Writing started...")
from pyiceberg.catalog import load_catalog
catalog_config = {
"type": "glue",
"warehouse": "s3://your-bucket/your-warehouse/", # Adjust to your environment.
"region": "ap-northeast-1",
}
catalog = load_catalog("glue_catalog", **catalog_config)
table_identifier = "icebergdb.yellow_tripdata"
table = catalog.load_table(table_identifier)
print(f"Target data to write: {arrow_table.num_rows:,} rows")
table.append(arrow_table)
return True
except Exception as e:
print(f"Writing error: {e}")
import traceback
traceback.print_exc()
return False
def main():
try:
import chdb
import pyiceberg
# Read input parameter
s3_input = get_job_parameters()
# Read data with chDB
arrow_tbl = read_parquet_with_chdb(s3_input)
print(f"Data read success: {arrow_tbl.num_rows:,} rows")
# Write to Iceberg table
if write_iceberg_table(arrow_tbl):
print("\nWriting fully successful!")
else:
print("Writing failed")
except Exception as e:
print(f"Main error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Execution Result
Pattern 2: Lambda + start_workflow_run (Workflow Execution)
In this pattern, Lambda receives the S3 file event and triggers a Glue Workflow using start_workflow_run.
The Workflow then runs the Glue Python Shell jobs.
Characteristics:
- Combines Lambda flexibility with Glue Workflow flow control.
- Error handling requires clear responsibility division between Lambda and Workflow.
- Workflows allow preprocessing and postprocessing to be added later.
Lambda Sample Code
import boto3
def lambda_handler(event, context):
glue = boto3.client("glue")
s3_bucket = event['Records'][0]['s3']['bucket']['name']
s3_object_key = event['Records'][0]['s3']['object']['key']
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
response = glue.start_workflow_run(
Name="YOUR_TARGET_WORKFLOW_NAME",
RunProperties={'--s3_input': s3_input_path}
)
print(f"Glue Workflow started: {response['RunId']}")
return response
Setup Steps
- Create Glue Python Shell jobs.
1st Job
import boto3
import sys
import os
from awsglue.utils import getResolvedOptions
def get_job_parameters():
args = getResolvedOptions(sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
glue = boto3.client('glue')
resp = glue.get_workflow_run_properties(
Name=args['WORKFLOW_NAME'],
RunId=args['WORKFLOW_RUN_ID']
)
s3_input = resp['RunProperties'].get('s3_input')
if not s3_input:
raise ValueError("s3_input Not Found")
print(f"s3_input: {s3_input}")
return s3_input
def _to_pyarrow_table(result):
"""
Compatibility helper to extract a pyarrow.Table from a chDB query_result.
"""
import chdb
if hasattr(chdb, "to_arrowTable"):
return chdb.to_arrowTable(result)
if hasattr(result, "to_pyarrow"):
return result.to_pyarrow()
if hasattr(result, "to_arrow"):
return result.to_arrow()
raise RuntimeError(
"Cannot convert chdb query_result to pyarrow.Table. "
f"Available attributes: {sorted(dir(result))[:200]}"
)
def normalize_arrow_for_iceberg(table):
"""
Normalize Arrow schema for Iceberg compatibility.
- timestamptz -> timestamp
- binary -> string
"""
import pyarrow as pa
new_fields = []
new_columns = []
for field, column in zip(table.schema, table.columns):
# timestamp with timezone -> timestamp
if pa.types.is_timestamp(field.type) and field.type.tz is not None:
new_type = pa.timestamp(field.type.unit)
new_fields.append(pa.field(field.name, new_type, field.nullable))
new_columns.append(column.cast(new_type))
else:
new_fields.append(field)
new_columns.append(column)
new_schema = pa.schema(new_fields)
return pa.Table.from_arrays(new_columns, schema=new_schema)
def read_parquet_with_chdb(s3_input):
"""
Read Parquet file from S3 using chDB.
"""
import chdb
if s3_input.startswith("s3://"):
bucket, key = s3_input.replace("s3://", "").split("/", 1)
s3_url = f"https://{bucket}.s3.ap-northeast-1.amazonaws.com/{key}"
else:
s3_url = s3_input
print(f"Reading data from S3: {s3_url}")
query = f"""
SELECT *
FROM s3('{s3_url}', 'Parquet')
WHERE VendorID = 1
"""
result = chdb.query(query, "Arrow")
arrow_table = _to_pyarrow_table(result)
print("Original schema:")
print(arrow_table.schema)
# Normalize schema for Iceberg compatibility
arrow_table = normalize_arrow_for_iceberg(arrow_table)
print("Normalized schema:")
print(arrow_table.schema)
print(f"Rows: {arrow_table.num_rows:,}")
return arrow_table
def write_iceberg_table(arrow_table):
"""
Write Arrow table to Iceberg table using PyIceberg.
"""
try:
print("Writing started...")
from pyiceberg.catalog import load_catalog
catalog_config = {
"type": "glue",
"warehouse": "s3://your-bucket/your-warehouse/", # Adjust to your environment.
"region": "ap-northeast-1",
}
catalog = load_catalog("glue_catalog", **catalog_config)
table_identifier = "icebergdb.yellow_tripdata"
table = catalog.load_table(table_identifier)
print(f"Target data to write: {arrow_table.num_rows:,} rows")
table.append(arrow_table)
return True
except Exception as e:
print(f"Writing error: {e}")
import traceback
traceback.print_exc()
return False
def main():
try:
import chdb
import pyiceberg
# Read input parameter
s3_input = get_job_parameters()
# Read data with chDB
arrow_tbl = read_parquet_with_chdb(s3_input)
print(f"Data read success: {arrow_tbl.num_rows:,} rows")
# Write to Iceberg table
if write_iceberg_table(arrow_tbl):
print("\nWriting fully successful!")
else:
print("Writing failed")
except Exception as e:
print(f"Main error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
2nd Job
import boto3
import sys
import os
from awsglue.utils import getResolvedOptions
def get_job_parameters():
args = getResolvedOptions(sys.argv, ['WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
glue = boto3.client('glue')
resp = glue.get_workflow_run_properties(
Name=args['WORKFLOW_NAME'],
RunId=args['WORKFLOW_RUN_ID']
)
s3_input = resp['RunProperties'].get('s3_input')
if not s3_input:
raise ValueError("s3_input Not Found")
print(f"s3_input: {s3_input}")
return s3_input
def _to_pyarrow_table(result):
"""
Compatibility helper to extract a pyarrow.Table from a chDB query_result.
"""
import chdb
if hasattr(chdb, "to_arrowTable"):
return chdb.to_arrowTable(result)
if hasattr(result, "to_pyarrow"):
return result.to_pyarrow()
if hasattr(result, "to_arrow"):
return result.to_arrow()
raise RuntimeError(
"Cannot convert chdb query_result to pyarrow.Table. "
f"Available attributes: {sorted(dir(result))[:200]}"
)
def normalize_arrow_for_iceberg(table):
"""
Normalize Arrow schema for Iceberg compatibility.
- timestamptz -> timestamp
- binary -> string
"""
import pyarrow as pa
new_fields = []
new_columns = []
for field, column in zip(table.schema, table.columns):
# timestamp with timezone -> timestamp
if pa.types.is_timestamp(field.type) and field.type.tz is not None:
new_type = pa.timestamp(field.type.unit)
new_fields.append(pa.field(field.name, new_type, field.nullable))
new_columns.append(column.cast(new_type))
else:
new_fields.append(field)
new_columns.append(column)
new_schema = pa.schema(new_fields)
return pa.Table.from_arrays(new_columns, schema=new_schema)
def read_parquet_with_chdb(s3_input):
"""
Read Parquet file from S3 using chDB.
"""
import chdb
if s3_input.startswith("s3://"):
bucket, key = s3_input.replace("s3://", "").split("/", 1)
s3_url = f"https://{bucket}.s3.ap-northeast-1.amazonaws.com/{key}"
else:
s3_url = s3_input
print(f"Reading data from S3: {s3_url}")
query = f"""
SELECT *
FROM s3('{s3_url}', 'Parquet')
WHERE VendorID = 2
"""
result = chdb.query(query, "Arrow")
arrow_table = _to_pyarrow_table(result)
print("Original schema:")
print(arrow_table.schema)
# Normalize schema for Iceberg compatibility
arrow_table = normalize_arrow_for_iceberg(arrow_table)
print("Normalized schema:")
print(arrow_table.schema)
print(f"Rows: {arrow_table.num_rows:,}")
return arrow_table
def write_iceberg_table(arrow_table):
"""
Write Arrow table to Iceberg table using PyIceberg.
"""
try:
print("Writing started...")
from pyiceberg.catalog import load_catalog
catalog_config = {
"type": "glue",
"warehouse": "s3://your-bucket/your-warehouse/", # Adjust to your environment.
"region": "ap-northeast-1",
}
catalog = load_catalog("glue_catalog", **catalog_config)
table_identifier = "icebergdb.yellow_tripdata"
table = catalog.load_table(table_identifier)
print(f"Target data to write: {arrow_table.num_rows:,} rows")
table.append(arrow_table)
return True
except Exception as e:
print(f"Writing error: {e}")
import traceback
traceback.print_exc()
return False
def main():
try:
import chdb
import pyiceberg
# Read input parameter
s3_input = get_job_parameters()
# Read data with chDB
arrow_tbl = read_parquet_with_chdb(s3_input)
print(f"Data read success: {arrow_tbl.num_rows:,} rows")
# Write to Iceberg table
if write_iceberg_table(arrow_tbl):
print("\nWriting fully successful!")
else:
print("Writing failed")
except Exception as e:
print(f"Main error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Execution Result
Comparing Workflow Execution vs Direct Job Execution
Workflow Execution Pros
- Dependency management between multiple jobs (order, branching, parallel control)
- Unified parameter management (Run Properties)
- Centralized monitoring and management (stop/restart on error, unified execution history)
Workflow Execution Cons
- Complex setup (triggers and dependencies)
- Overhead (management cost even for single jobs, longer startup)
- Harder debugging (difficult to test individual jobs)
Direct Job Execution Pros
- Simple setup and immediate execution
- Flexible parameters, independent scheduling, easier debugging
- Cost-efficient: run only when needed, low management overhead
Direct Job Execution Cons
- Difficult dependency management (manual order control, error handling)
- Dispersed parameter management (per-job configuration)
- Dispersed monitoring (hard to get overall picture)
When to Choose Workflow vs Direct Job
Use Workflow When:
- Multiple jobs are chained (e.g., data ingestion → transformation → validation → output)
- Shared parameters across jobs
- Unified monitoring needed
- Regular batch processing
Example:
S3 → Lambda → Glue Workflow
├── Job1: Data ingestion & preprocessing
├── Job2: Data transformation
└── Job3: Validation & output
Use Direct Job When:
- Single-job processing is sufficient
- Real-time processing needed
- Flexible individual control required
- Prototype or testing phase
Example:
S3 → Lambda → Glue Job (single)
Conclusion
We introduced two patterns for triggering Glue Python Shell via S3 events.
When using Lambda to trigger Glue:
- For single-job processing → Lambda + Job
- For multiple jobs / flow control → Lambda + Workflow
Glue Python Shell may seem like a niche service compared to Glue Spark, EMR, or Lambda, but it can be cost-effective, long-running, and Spark-independent.
Combining it with chDB or DuckDB can boost efficiency, and PyIceberg makes Iceberg integration straightforward.
While this article focused on S3-triggered jobs, Glue Python Shell can also be used as a general-purpose long-running job environment.
I hope this helps you design your ETL workflows and data platforms more effectively.







Top comments (0)