Table of Contents
- Overview
- Architecture
- Prerequisites
- Phase 1: ROSA Cluster Setup
- Phase 2: AWS Glue Data Catalog Configuration
- Phase 3: S3 Data Lake Setup
- Phase 4: Apache Spark on OpenShift
- Phase 5: Apache Iceberg Integration
- Phase 6: Spark-Glue Catalog Integration
- Phase 7: Sample Data Pipelines
- Testing and Validation
- Resource Cleanup
- Troubleshooting
Overview
Project Purpose
This platform implements a modern data lakehouse architecture that achieves true separation of compute and storage. By running Apache Spark on OpenShift while leveraging AWS Glue Data Catalog for metadata management and S3 for storage (in Apache Iceberg format), organizations can scale compute independently, shut down clusters without data loss, and achieve significant cost optimization.
Key Value Propositions
- Stateless Compute: Completely decouple compute from storage and metadata
- Cloud-Native Flexibility: Destroy and recreate compute clusters without losing data
- Cost Optimization: Pay for compute only when running jobs
- Unified Metadata: AWS Glue Catalog provides central metadata repository
- ACID Transactions: Apache Iceberg enables reliable data lake operations
- Performance at Scale: Run high-performance Spark jobs on Kubernetes
Solution Components
| Component | Purpose | Layer |
|---|---|---|
| ROSA | Managed OpenShift cluster for Spark compute | Compute |
| Apache Spark | Distributed data processing engine | Processing |
| Spark Operator | Kubernetes-native Spark job management | Orchestration |
| AWS Glue Data Catalog | Centralized metadata repository | Metadata |
| Amazon S3 | Object storage for data lake | Storage |
| Apache Iceberg | Table format with ACID guarantees | Data Format |
| AWS IAM | Authentication and authorization | Security |
Architecture
High-Level Architecture Diagram
┌──────────────────────────────────────────────────────────────────────┐
│ AWS Cloud │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ ROSA Cluster (VPC) │ │
│ │ ┌──────────────────────────────────────────────────────────┐ │ │
│ │ │ Spark Operator │ │ │
│ │ │ ┌────────────────┐ ┌──────────────────────────┐ │ │ │
│ │ │ │ SparkApplication│ │ Driver Pod │ │ │ │
│ │ │ │ CRD Controller │─────►│ - Coordinates job │ │ │ │
│ │ │ └────────────────┘ │ - Connects to Glue │ │ │ │
│ │ │ └──────────┬───────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ┌──────────────────────────────────▼─────────────────┐ │ │ │
│ │ │ │ Executor Pods (Ephemeral) │ │ │ │
│ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │
│ │ │ │ │Executor 1│ │Executor 2│ │Executor N│ │ │ │ │
│ │ │ │ │Read/Write│ │Read/Write│ │Read/Write│ │ │ │ │
│ │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │
│ │ │ └───────┼─────────────┼─────────────┼───────────────┘ │ │ │
│ │ └──────────┼─────────────┼─────────────┼─────────────────┘ │ │
│ └─────────────┼─────────────┼─────────────┼────────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌─────────────▼─────────────▼─────────────▼────────────────────┐ │
│ │ Amazon S3 (Data Lake) │ │
│ │ ┌──────────────────────────────────────────────────────┐ │ │
│ │ │ Apache Iceberg Tables │ │ │
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │
│ │ │ │ Raw Data │ │ Curated │ │ Analytics │ │ │ │
│ │ │ │ (Bronze) │ │ (Silver) │ │ (Gold) │ │ │ │
│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ Iceberg Metadata Files: │ │ │
│ │ │ - Snapshots │ │ │
│ │ │ - Manifests │ │ │
│ │ │ - Data Files (Parquet) │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ ┌──────────────────────────────┴───────────────────────────────┐ │
│ │ AWS Glue Data Catalog (Metadata) │ │
│ │ ┌────────────────────────────────────────────────────────┐ │ │
│ │ │ Databases & Tables Metadata │ │ │
│ │ │ - Schema definitions │ │ │
│ │ │ - Partitions info │ │ │
│ │ │ - Table properties (Iceberg config) │ │ │
│ │ │ - Column statistics │ │ │
│ │ └────────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ AWS IAM │ │
│ │ - IRSA (IAM Roles for Service Accounts) │ │
│ │ - S3 bucket policies │ │
│ │ - Glue Catalog permissions │ │
│ └───────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
Workflow
- Data Ingestion: Raw data lands in S3 bronze layer
- Spark Job Submission: Developer submits SparkApplication CR
- Job Orchestration: Spark Operator creates driver pod
- Resource Provisioning: Driver spawns executor pods dynamically
- Metadata Discovery: Spark connects to Glue Catalog for table metadata
- Data Processing: Executors read/write Iceberg tables from/to S3
- Metadata Update: Glue Catalog automatically updated with new partitions/schemas
- Job Completion: Executor pods terminate, freeing resources
- Cluster Shutdown: ROSA cluster can be deleted without data loss
- State Recovery: New cluster can access all data via Glue Catalog
Stateless Compute Demonstration
Traditional Approach:
- Local Hive Metastore tied to cluster
- Cluster deletion = metadata loss
- Requires persistent volumes and backups
Lakehouse Approach:
- Metadata in AWS Glue (managed, durable)
- Data in S3 (infinitely scalable)
- Compute fully ephemeral
- Result: Complete cluster rebuild in 40 minutes with zero data loss
Prerequisites
Required Accounts and Subscriptions
- [ ] AWS Account with administrative access
- [ ] Red Hat Account with OpenShift subscription
- [ ] ROSA Enabled in your AWS account
- [ ] AWS Glue Access in your target region
Required Tools
Install the following CLI tools on your workstation:
# AWS CLI (v2)
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
# ROSA CLI
wget https://mirror.openshift.com/pub/openshift-v4/clients/rosa/latest/rosa-linux.tar.gz
tar -xvf rosa-linux.tar.gz
sudo mv rosa /usr/local/bin/rosa
rosa version
# OpenShift CLI (oc)
wget https://mirror.openshift.com/pub/openshift-v4/clients/ocp/stable/openshift-client-linux.tar.gz
tar -xvf openshift-client-linux.tar.gz
sudo mv oc kubectl /usr/local/bin/
oc version
# Helm (v3)
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
helm version
AWS Prerequisites
Service Quotas
# Check EC2 quotas for ROSA
aws service-quotas get-service-quota \
--service-code ec2 \
--quota-code L-1216C47A \
--region us-east-1
# Check S3 bucket quota
aws service-quotas get-service-quota \
--service-code s3 \
--quota-code L-DC2B2D3D \
--region us-east-1
IAM Permissions
Your AWS IAM user/role needs permissions for:
- EC2 (VPC, subnets, security groups)
- IAM (roles, policies)
- S3 (buckets, objects)
- Glue (databases, tables, catalog)
- CloudWatch (logs, metrics)
Knowledge Prerequisites
You should be familiar with:
- Apache Spark fundamentals (DataFrames, transformations, actions)
- Data engineering concepts (ETL, data lakes, partitioning)
- AWS fundamentals (S3, IAM)
- Kubernetes basics (pods, deployments, services)
- SQL and data modeling
Phase 1: ROSA Cluster Setup
Step 1.1: Configure AWS CLI
# Configure AWS credentials
aws configure
# Verify configuration
aws sts get-caller-identity
Step 1.2: Initialize ROSA
# Log in to Red Hat
rosa login
# Verify ROSA prerequisites
rosa verify quota
rosa verify permissions
# Initialize ROSA in your AWS account
rosa init
Step 1.3: Create ROSA Cluster
Create a ROSA cluster optimized for Spark workloads:
# Set environment variables
export CLUSTER_NAME="data-lakehouse"
export AWS_REGION="us-east-1"
export MACHINE_TYPE="m5.4xlarge"
export COMPUTE_NODES=3
# Create ROSA cluster (takes ~40 minutes)
rosa create cluster \
--cluster-name $CLUSTER_NAME \
--region $AWS_REGION \
--multi-az \
--compute-machine-type $MACHINE_TYPE \
--compute-nodes $COMPUTE_NODES \
--machine-cidr 10.0.0.0/16 \
--service-cidr 172.30.0.0/16 \
--pod-cidr 10.128.0.0/14 \
--host-prefix 23 \
--yes
Configuration Rationale:
- m5.4xlarge: 16 vCPUs, 64 GB RAM - suitable for Spark executors
- 3 nodes: Allows distributed Spark processing
- Multi-AZ: High availability for production workloads
Step 1.4: Monitor Cluster Creation
# Watch cluster installation progress
rosa logs install --cluster=$CLUSTER_NAME --watch
# Check cluster status
rosa describe cluster --cluster=$CLUSTER_NAME
Step 1.5: Create Admin User and Connect
# Create cluster admin user
rosa create admin --cluster=$CLUSTER_NAME
# Use the login command from output
oc login https://api.data-lakehouse.xxxx.p1.openshiftapps.com:6443 \
--username cluster-admin \
--password <your-password>
# Verify cluster access
oc cluster-info
oc get nodes
Step 1.6: Create Project Namespaces
# Create namespace for Spark workloads
oc new-project spark-jobs
# Create namespace for Spark operator
oc new-project spark-operator
Phase 2: AWS Glue Data Catalog Configuration
Step 2.1: Create Glue Database
# Create Glue database for lakehouse
aws glue create-database \
--database-input '{
"Name": "lakehouse",
"Description": "Data lakehouse with Iceberg tables"
}' \
--region $AWS_REGION
# Create additional databases for different layers
aws glue create-database \
--database-input '{
"Name": "bronze",
"Description": "Raw data landing zone"
}' \
--region $AWS_REGION
aws glue create-database \
--database-input '{
"Name": "silver",
"Description": "Curated and cleaned data"
}' \
--region $AWS_REGION
aws glue create-database \
--database-input '{
"Name": "gold",
"Description": "Analytics-ready aggregated data"
}' \
--region $AWS_REGION
# Verify database creation
aws glue get-databases --region $AWS_REGION
Step 2.2: Create IAM Role for Glue Catalog Access
# Get ROSA cluster OIDC provider
export OIDC_PROVIDER=$(rosa describe cluster -c $CLUSTER_NAME -o json | jq -r .aws.sts.oidc_endpoint_url | sed 's|https://||')
export ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
# Create trust policy for Spark service account
cat > spark-glue-trust-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:sub": "system:serviceaccount:spark-jobs:spark-sa"
}
}
}
]
}
EOF
# Create IAM role
export SPARK_ROLE_ARN=$(aws iam create-role \
--role-name SparkGlueCatalogRole \
--assume-role-policy-document file://spark-glue-trust-policy.json \
--query 'Role.Arn' \
--output text)
echo "Spark IAM Role ARN: $SPARK_ROLE_ARN"
Step 2.3: Create IAM Policy for Glue and S3 Access
# Create policy for Glue Catalog access
cat > spark-glue-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"glue:GetDatabase",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetTables",
"glue:GetPartition",
"glue:GetPartitions",
"glue:CreateTable",
"glue:UpdateTable",
"glue:DeleteTable",
"glue:BatchCreatePartition",
"glue:BatchDeletePartition",
"glue:BatchUpdatePartition",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:UpdatePartition"
],
"Resource": [
"arn:aws:glue:${AWS_REGION}:${ACCOUNT_ID}:catalog",
"arn:aws:glue:${AWS_REGION}:${ACCOUNT_ID}:database/*",
"arn:aws:glue:${AWS_REGION}:${ACCOUNT_ID}:table/*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::lakehouse-*",
"arn:aws:s3:::lakehouse-*/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets"
],
"Resource": "*"
}
]
}
EOF
# Create and attach policy
aws iam put-role-policy \
--role-name SparkGlueCatalogRole \
--policy-name GlueS3Access \
--policy-document file://spark-glue-policy.json
echo "IAM policy created and attached"
Phase 3: S3 Data Lake Setup
Step 3.1: Create S3 Buckets
# Create S3 bucket for data lake
export LAKEHOUSE_BUCKET="lakehouse-data-${ACCOUNT_ID}"
aws s3 mb s3://$LAKEHOUSE_BUCKET --region $AWS_REGION
# Enable versioning for data protection
aws s3api put-bucket-versioning \
--bucket $LAKEHOUSE_BUCKET \
--versioning-configuration Status=Enabled \
--region $AWS_REGION
# Create folder structure for medallion architecture
aws s3api put-object --bucket $LAKEHOUSE_BUCKET --key bronze/
aws s3api put-object --bucket $LAKEHOUSE_BUCKET --key silver/
aws s3api put-object --bucket $LAKEHOUSE_BUCKET --key gold/
aws s3api put-object --bucket $LAKEHOUSE_BUCKET --key warehouse/
echo "S3 Data Lake bucket created: s3://$LAKEHOUSE_BUCKET"
Step 3.2: Configure S3 Bucket Policies
# Create bucket policy for secure access
cat > lakehouse-bucket-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowSparkAccess",
"Effect": "Allow",
"Principal": {
"AWS": "$SPARK_ROLE_ARN"
},
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::${LAKEHOUSE_BUCKET}",
"arn:aws:s3:::${LAKEHOUSE_BUCKET}/*"
]
}
]
}
EOF
# Apply bucket policy
aws s3api put-bucket-policy \
--bucket $LAKEHOUSE_BUCKET \
--policy file://lakehouse-bucket-policy.json
echo "Bucket policy applied"
Step 3.3: Upload Sample Data
# Create sample dataset
mkdir -p sample-data
cd sample-data
# Generate sample sales data
python3 <<PYTHON
import csv
import random
from datetime import datetime, timedelta
# Generate sample sales data
with open('sales_data.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['transaction_id', 'date', 'product', 'category', 'amount', 'quantity', 'region'])
products = ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones']
categories = ['Electronics', 'Accessories']
regions = ['North', 'South', 'East', 'West']
base_date = datetime(2024, 1, 1)
for i in range(10000):
transaction_date = base_date + timedelta(days=random.randint(0, 365))
product = random.choice(products)
category = 'Electronics' if product in ['Laptop', 'Monitor'] else 'Accessories'
writer.writerow([
f'TXN{i:06d}',
transaction_date.strftime('%Y-%m-%d'),
product,
category,
round(random.uniform(10, 2000), 2),
random.randint(1, 10),
random.choice(regions)
])
print("Sample data generated: sales_data.csv")
PYTHON
# Upload to S3 bronze layer
aws s3 cp sales_data.csv s3://$LAKEHOUSE_BUCKET/bronze/sales/sales_data.csv
cd ..
echo "Sample data uploaded to S3"
Phase 4: Apache Spark on OpenShift
Step 4.1: Install Spark Operator
# Add Spark Operator Helm repository
helm repo add spark-operator https://kubeflow.github.io/spark-operator
helm repo update
# Install Spark Operator
helm install spark-operator spark-operator/spark-operator \
--namespace spark-operator \
--create-namespace \
--set webhook.enable=true \
--set sparkJobNamespace=spark-jobs
# Verify installation
kubectl get pods -n spark-operator
kubectl get crd | grep spark
Step 4.2: Create Service Account for Spark
# Create service account with IAM role annotation
cat <<EOF | oc apply -f -
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-sa
namespace: spark-jobs
annotations:
eks.amazonaws.com/role-arn: $SPARK_ROLE_ARN
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-role
namespace: spark-jobs
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps"]
verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-rolebinding
namespace: spark-jobs
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: spark-role
subjects:
- kind: ServiceAccount
name: spark-sa
namespace: spark-jobs
EOF
# Verify service account
oc get sa spark-sa -n spark-jobs -o yaml
Step 4.3: Create ConfigMap for Spark Configuration
# Create Spark configuration
cat <<EOF | oc apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: spark-config
namespace: spark-jobs
data:
spark-defaults.conf: |
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.glue_catalog.warehouse=s3://${LAKEHOUSE_BUCKET}/warehouse
spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.eventLog.enabled=true
spark.eventLog.dir=s3a://${LAKEHOUSE_BUCKET}/spark-events
lakehouse.conf: |
LAKEHOUSE_BUCKET=${LAKEHOUSE_BUCKET}
AWS_REGION=${AWS_REGION}
GLUE_DATABASE=lakehouse
EOF
Phase 5: Apache Iceberg Integration
Step 5.1: Build Custom Spark Image with Iceberg
# Create directory for custom Spark image
mkdir -p spark-iceberg
cd spark-iceberg
# Create Dockerfile
cat > Dockerfile <<'DOCKERFILE'
FROM gcr.io/spark-operator/spark:v3.5.0
USER root
# Install AWS dependencies and Iceberg
RUN curl -L https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.4.2/iceberg-spark-runtime-3.5_2.12-1.4.2.jar \
-o /opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.4.2.jar
RUN curl -L https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar \
-o /opt/spark/jars/hadoop-aws-3.3.4.jar
RUN curl -L https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar \
-o /opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar
RUN curl -L https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.20.18/bundle-2.20.18.jar \
-o /opt/spark/jars/bundle-2.20.18.jar
RUN curl -L https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.20.18/url-connection-client-2.20.18.jar \
-o /opt/spark/jars/url-connection-client-2.20.18.jar
USER 185
ENTRYPOINT ["/opt/entrypoint.sh"]
DOCKERFILE
# Build and push to a container registry
# For this example, we'll use OpenShift internal registry
oc create imagestream spark-iceberg -n spark-jobs
# Build image using OpenShift build
cat > BuildConfig.yaml <<EOF
apiVersion: build.openshift.io/v1
kind: BuildConfig
metadata:
name: spark-iceberg
namespace: spark-jobs
spec:
output:
to:
kind: ImageStreamTag
name: spark-iceberg:latest
source:
dockerfile: |
$(cat Dockerfile | sed 's/^/ /')
type: Dockerfile
strategy:
dockerStrategy: {}
type: Docker
EOF
oc apply -f BuildConfig.yaml
# Start build
oc start-build spark-iceberg -n spark-jobs --follow
# Get image reference
export SPARK_IMAGE=$(oc get is spark-iceberg -n spark-jobs -o jsonpath='{.status.dockerImageRepository}'):latest
cd ..
echo "Custom Spark image with Iceberg built: $SPARK_IMAGE"
Phase 6: Spark-Glue Catalog Integration
Step 6.1: Create Sample Spark Application
# Create PySpark script for data processing
mkdir -p spark-jobs
cd spark-jobs
cat > process_sales.py <<'PYTHON'
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sum as _sum, avg, count
import sys
def main():
# Create Spark session with Iceberg and Glue Catalog
spark = SparkSession.builder \
.appName("ProcessSalesData") \
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
# Get configuration from environment
bucket = sys.argv[1] if len(sys.argv) > 1 else "lakehouse-data"
print(f"Reading data from s3a://{bucket}/bronze/sales/")
# Read raw CSV data
df_raw = spark.read.csv(
f"s3a://{bucket}/bronze/sales/sales_data.csv",
header=True,
inferSchema=True
)
print(f"Raw data count: {df_raw.count()}")
df_raw.show(5)
# Create bronze table in Glue Catalog (if not exists)
df_raw.write \
.format("iceberg") \
.mode("overwrite") \
.option("path", f"s3a://{bucket}/warehouse/bronze.db/sales") \
.saveAsTable("glue_catalog.bronze.sales")
print("Bronze table created in Glue Catalog")
# Transform data for silver layer
df_silver = df_raw \
.withColumn("year", year(col("date"))) \
.withColumn("month", month(col("date"))) \
.filter(col("amount") > 0) \
.dropDuplicates(["transaction_id"])
# Write to silver layer
df_silver.write \
.format("iceberg") \
.mode("overwrite") \
.partitionBy("year", "month") \
.option("path", f"s3a://{bucket}/warehouse/silver.db/sales_clean") \
.saveAsTable("glue_catalog.silver.sales_clean")
print("Silver table created with partitioning")
# Create aggregated gold layer
df_gold = df_silver.groupBy("year", "month", "category", "region") \
.agg(
_sum("amount").alias("total_revenue"),
_sum("quantity").alias("total_quantity"),
avg("amount").alias("avg_transaction_value"),
count("transaction_id").alias("transaction_count")
)
# Write to gold layer
df_gold.write \
.format("iceberg") \
.mode("overwrite") \
.option("path", f"s3a://{bucket}/warehouse/gold.db/sales_summary") \
.saveAsTable("glue_catalog.gold.sales_summary")
print("Gold table created with aggregations")
# Show sample results
print("\n=== Bronze Layer Sample ===")
spark.sql("SELECT * FROM glue_catalog.bronze.sales LIMIT 5").show()
print("\n=== Silver Layer Sample ===")
spark.sql("SELECT * FROM glue_catalog.silver.sales_clean LIMIT 5").show()
print("\n=== Gold Layer Sample ===")
spark.sql("SELECT * FROM glue_catalog.gold.sales_summary ORDER BY total_revenue DESC LIMIT 10").show()
# Verify tables in Glue Catalog
print("\n=== Tables in Glue Catalog ===")
spark.sql("SHOW TABLES IN glue_catalog.bronze").show()
spark.sql("SHOW TABLES IN glue_catalog.silver").show()
spark.sql("SHOW TABLES IN glue_catalog.gold").show()
spark.stop()
if __name__ == "__main__":
main()
PYTHON
# Upload script to S3
aws s3 cp process_sales.py s3://$LAKEHOUSE_BUCKET/scripts/
cd ..
Step 6.2: Create SparkApplication Custom Resource
# Create SparkApplication manifest
cat <<EOF | oc apply -f -
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: process-sales-data
namespace: spark-jobs
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: $SPARK_IMAGE
imagePullPolicy: Always
mainApplicationFile: s3a://$LAKEHOUSE_BUCKET/scripts/process_sales.py
arguments:
- "$LAKEHOUSE_BUCKET"
sparkVersion: "3.5.0"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "2g"
labels:
version: "3.5.0"
serviceAccount: spark-sa
env:
- name: AWS_REGION
value: "$AWS_REGION"
- name: AWS_ROLE_ARN
value: "$SPARK_ROLE_ARN"
- name: AWS_WEB_IDENTITY_TOKEN_FILE
value: "/var/run/secrets/eks.amazonaws.com/serviceaccount/token"
volumeMounts:
- name: aws-iam-token
mountPath: /var/run/secrets/eks.amazonaws.com/serviceaccount
readOnly: true
executor:
cores: 2
instances: 3
memory: "4g"
labels:
version: "3.5.0"
env:
- name: AWS_REGION
value: "$AWS_REGION"
- name: AWS_ROLE_ARN
value: "$SPARK_ROLE_ARN"
- name: AWS_WEB_IDENTITY_TOKEN_FILE
value: "/var/run/secrets/eks.amazonaws.com/serviceaccount/token"
volumeMounts:
- name: aws-iam-token
mountPath: /var/run/secrets/eks.amazonaws.com/serviceaccount
readOnly: true
volumes:
- name: aws-iam-token
projected:
sources:
- serviceAccountToken:
audience: sts.amazonaws.com
expirationSeconds: 86400
path: token
sparkConf:
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
"spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
"spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog"
"spark.sql.catalog.glue_catalog.warehouse": "s3a://$LAKEHOUSE_BUCKET/warehouse"
"spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog"
"spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
"spark.kubernetes.allocation.batch.size": "3"
EOF
Phase 7: Sample Data Pipelines
Step 7.1: Create Incremental Processing Pipeline
# Create incremental processing script
cat > spark-jobs/incremental_pipeline.py <<'PYTHON'
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit
from datetime import datetime
import sys
def main():
spark = SparkSession.builder \
.appName("IncrementalPipeline") \
.getOrCreate()
bucket = sys.argv[1]
batch_date = sys.argv[2] if len(sys.argv) > 2 else datetime.now().strftime('%Y-%m-%d')
print(f"Processing incremental data for date: {batch_date}")
# Read existing silver table
df_existing = spark.read \
.format("iceberg") \
.load(f"glue_catalog.silver.sales_clean")
# Read new data (simulate incremental load)
df_new = spark.read.csv(
f"s3a://{bucket}/bronze/sales/sales_data.csv",
header=True,
inferSchema=True
).filter(col("date") == batch_date) \
.withColumn("processed_timestamp", current_timestamp())
# Append to silver table using Iceberg merge
df_new.writeTo("glue_catalog.silver.sales_clean") \
.append()
print(f"Appended {df_new.count()} records to silver table")
# Update gold aggregations
df_updated = spark.read \
.format("iceberg") \
.load("glue_catalog.silver.sales_clean") \
.filter(col("date") == batch_date)
# Recalculate aggregations for affected partitions
from pyspark.sql.functions import year, month, sum as _sum, avg, count
df_agg = df_updated \
.withColumn("year", year(col("date"))) \
.withColumn("month", month(col("date"))) \
.groupBy("year", "month", "category", "region") \
.agg(
_sum("amount").alias("total_revenue"),
_sum("quantity").alias("total_quantity"),
avg("amount").alias("avg_transaction_value"),
count("transaction_id").alias("transaction_count")
)
# Merge into gold table
df_agg.writeTo("glue_catalog.gold.sales_summary") \
.using("iceberg") \
.tableProperty("write.merge.mode", "merge-on-read") \
.append()
print("Gold table updated with incremental aggregations")
spark.stop()
if __name__ == "__main__":
main()
PYTHON
# Upload to S3
aws s3 cp spark-jobs/incremental_pipeline.py s3://$LAKEHOUSE_BUCKET/scripts/
Step 7.2: Create Time Travel Query Example
# Create time travel demonstration script
cat > spark-jobs/time_travel.py <<'PYTHON'
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import sys
def main():
spark = SparkSession.builder \
.appName("IcebergTimeTravel") \
.getOrCreate()
bucket = sys.argv[1]
# Read current version
print("=== Current Version ===")
df_current = spark.read \
.format("iceberg") \
.load("glue_catalog.silver.sales_clean")
print(f"Current record count: {df_current.count()}")
df_current.show(5)
# Show table history
print("\n=== Table History ===")
spark.sql("SELECT * FROM glue_catalog.silver.sales_clean.history").show()
# Show table snapshots
print("\n=== Table Snapshots ===")
spark.sql("SELECT * FROM glue_catalog.silver.sales_clean.snapshots").show()
# Query specific snapshot (if exists)
snapshots = spark.sql("SELECT snapshot_id FROM glue_catalog.silver.sales_clean.snapshots ORDER BY committed_at LIMIT 1").collect()
if snapshots:
snapshot_id = snapshots[0][0]
print(f"\n=== Data at Snapshot {snapshot_id} ===")
df_snapshot = spark.read \
.format("iceberg") \
.option("snapshot-id", snapshot_id) \
.load("glue_catalog.silver.sales_clean")
print(f"Snapshot record count: {df_snapshot.count()}")
df_snapshot.show(5)
# Show table metadata
print("\n=== Table Metadata ===")
spark.sql("DESCRIBE EXTENDED glue_catalog.silver.sales_clean").show(100, False)
spark.stop()
if __name__ == "__main__":
main()
PYTHON
# Upload to S3
aws s3 cp spark-jobs/time_travel.py s3://$LAKEHOUSE_BUCKET/scripts/
Testing and Validation
Test 1: Monitor Spark Application
# Check SparkApplication status
kubectl get sparkapplication -n spark-jobs
# Describe application
kubectl describe sparkapplication process-sales-data -n spark-jobs
# Watch driver pod logs
export DRIVER_POD=$(kubectl get pods -n spark-jobs -l spark-role=driver -o jsonpath='{.items[0].metadata.name}')
kubectl logs -f $DRIVER_POD -n spark-jobs
# Check executor pods
kubectl get pods -n spark-jobs -l spark-role=executor
Test 2: Verify Glue Catalog Tables
# List databases
aws glue get-databases --region $AWS_REGION
# List tables in bronze database
aws glue get-tables --database-name bronze --region $AWS_REGION
# Get table details
aws glue get-table --database-name silver --name sales_clean --region $AWS_REGION
# Check table location and format
aws glue get-table --database-name silver --name sales_clean --region $AWS_REGION \
--query 'Table.StorageDescriptor.Location'
Test 3: Verify Data in S3
# List warehouse contents
aws s3 ls s3://$LAKEHOUSE_BUCKET/warehouse/ --recursive --human-readable
# Check Iceberg metadata
aws s3 ls s3://$LAKEHOUSE_BUCKET/warehouse/silver.db/sales_clean/metadata/
# List data files
aws s3 ls s3://$LAKEHOUSE_BUCKET/warehouse/silver.db/sales_clean/data/
Test 4: Query Data with Athena
# Create Athena workgroup (optional)
aws athena create-work-group \
--name lakehouse-queries \
--configuration "ResultConfigurationUpdates={OutputLocation=s3://$LAKEHOUSE_BUCKET/athena-results/}" \
--region $AWS_REGION
# Query silver table using Athena
aws athena start-query-execution \
--query-string "SELECT * FROM silver.sales_clean LIMIT 10" \
--result-configuration "OutputLocation=s3://$LAKEHOUSE_BUCKET/athena-results/" \
--region $AWS_REGION
# Query gold aggregations
aws athena start-query-execution \
--query-string "SELECT category, region, SUM(total_revenue) as revenue FROM gold.sales_summary GROUP BY category, region ORDER BY revenue DESC" \
--result-configuration "OutputLocation=s3://$LAKEHOUSE_BUCKET/athena-results/" \
--region $AWS_REGION
Test 5: Stateless Compute Validation
# Step 1: Note current table state
echo "=== Before Cluster Deletion ==="
aws glue get-tables --database-name silver --region $AWS_REGION --query 'TableList[*].Name'
# Step 2: Delete ROSA cluster
echo "Deleting ROSA cluster..."
rosa delete cluster --cluster=$CLUSTER_NAME --yes
# Wait for deletion (or do this async)
# rosa logs uninstall --cluster=$CLUSTER_NAME --watch
# Step 3: Verify data persists in S3
echo "=== Data Still Exists in S3 ==="
aws s3 ls s3://$LAKEHOUSE_BUCKET/warehouse/ --recursive | wc -l
# Step 4: Verify metadata persists in Glue
echo "=== Metadata Still Exists in Glue ==="
aws glue get-tables --database-name silver --region $AWS_REGION --query 'TableList[*].Name'
# Step 5: Recreate cluster and verify access
# (Follow Phase 1 steps to recreate cluster)
# Then resubmit Spark job to prove data is accessible
echo "=== Stateless Compute Validated ==="
echo "All data and metadata persisted despite cluster deletion!"
Resource Cleanup
To avoid ongoing AWS charges, follow these steps to clean up all resources.
Step 1: Delete Spark Applications
# Delete all Spark applications
kubectl delete sparkapplication --all -n spark-jobs
# Wait for pods to terminate
kubectl get pods -n spark-jobs
Step 2: Delete Spark Operator
# Uninstall Spark Operator
helm uninstall spark-operator -n spark-operator
# Delete namespace
kubectl delete namespace spark-operator
kubectl delete namespace spark-jobs
Step 3: Delete ROSA Cluster
# Delete ROSA cluster
rosa delete cluster --cluster=$CLUSTER_NAME --yes
# Wait for deletion
rosa logs uninstall --cluster=$CLUSTER_NAME --watch
# Verify deletion
rosa list clusters
Step 4: Delete Glue Catalog Resources
# Delete tables from all databases
for db in bronze silver gold lakehouse; do
echo "Deleting tables from database: $db"
# Get table names
TABLES=$(aws glue get-tables --database-name $db --region $AWS_REGION --query 'TableList[*].Name' --output text)
# Delete each table
for table in $TABLES; do
echo " Deleting table: $table"
aws glue delete-table --database-name $db --name $table --region $AWS_REGION
done
# Delete database
echo "Deleting database: $db"
aws glue delete-database --name $db --region $AWS_REGION
done
echo "Glue Catalog resources deleted"
Step 5: Delete S3 Bucket
# Delete all objects in bucket
aws s3 rm s3://$LAKEHOUSE_BUCKET --recursive --region $AWS_REGION
# Delete bucket
aws s3 rb s3://$LAKEHOUSE_BUCKET --region $AWS_REGION
echo "S3 bucket deleted"
Step 6: Delete IAM Resources
# Delete IAM role policy
aws iam delete-role-policy \
--role-name SparkGlueCatalogRole \
--policy-name GlueS3Access
# Delete IAM role
aws iam delete-role --role-name SparkGlueCatalogRole
echo "IAM resources deleted"
Step 7: Clean Up Local Files
# Remove temporary files
rm -f spark-glue-trust-policy.json
rm -f spark-glue-policy.json
rm -f lakehouse-bucket-policy.json
rm -rf sample-data/
rm -rf spark-jobs/
rm -rf spark-iceberg/
echo "Local files cleaned up"
Verification
# Verify ROSA cluster is deleted
rosa list clusters
# Verify S3 bucket is deleted
aws s3 ls | grep lakehouse
# Verify Glue databases are deleted
aws glue get-databases --region $AWS_REGION | grep -E "bronze|silver|gold|lakehouse"
# Verify IAM role is deleted
aws iam get-role --role-name SparkGlueCatalogRole 2>&1 | grep NoSuchEntity
echo "Cleanup verification complete"
Troubleshooting
Issue: Spark Cannot Connect to Glue Catalog
Symptoms: Spark jobs fail with Glue Catalog connection errors
Solutions:
- Verify IAM role has Glue permissions
- Check service account annotation
- Verify AWS region configuration
- Check Glue Catalog connectivity
# Verify service account has IAM role
kubectl get sa spark-sa -n spark-jobs -o yaml | grep eks.amazonaws.com
# Test Glue access from pod
kubectl run aws-test --rm -it --image=amazon/aws-cli --serviceaccount=spark-sa -n spark-jobs -- \
glue get-databases --region $AWS_REGION
# Check Spark configuration
kubectl get configmap spark-config -n spark-jobs -o yaml
Issue: S3 Access Denied Errors
Symptoms: Spark jobs fail with S3 403 Forbidden errors
Solutions:
- Verify IAM role has S3 permissions
- Check bucket policy
- Verify IRSA configuration
- Check S3 endpoint configuration
# Test S3 access from pod
kubectl run aws-test --rm -it --image=amazon/aws-cli --serviceaccount=spark-sa -n spark-jobs -- \
s3 ls s3://$LAKEHOUSE_BUCKET/
# Check IAM role permissions
aws iam get-role-policy --role-name SparkGlueCatalogRole --policy-name GlueS3Access
# Verify bucket policy
aws s3api get-bucket-policy --bucket $LAKEHOUSE_BUCKET
Issue: Iceberg Table Not Found
Symptoms: Queries fail with "Table not found" errors
Solutions:
- Verify table exists in Glue Catalog
- Check Spark Catalog configuration
- Verify warehouse location
- Check table format
# List tables in Glue
aws glue get-tables --database-name silver --region $AWS_REGION
# Check if table is Iceberg format
aws glue get-table --database-name silver --name sales_clean --region $AWS_REGION \
--query 'Table.Parameters."table_type"'
# Verify warehouse location
aws s3 ls s3://$LAKEHOUSE_BUCKET/warehouse/silver.db/
Issue: Spark Executors Not Starting
Symptoms: Driver pod runs but executors don't start
Solutions:
- Check resource availability
- Verify RBAC permissions
- Check image pull policy
- Review executor logs
# Check node resources
kubectl top nodes
# Check pending pods
kubectl get pods -n spark-jobs
# Describe pending executor pod
kubectl describe pod <executor-pod-name> -n spark-jobs
# Check events
kubectl get events -n spark-jobs --sort-by='.lastTimestamp'
Issue: Performance Issues
Symptoms: Spark jobs are slow
Solutions:
- Increase executor resources
- Adjust partition count
- Enable adaptive query execution
- Optimize Iceberg table layout
# Update SparkApplication with more resources
kubectl edit sparkapplication process-sales-data -n spark-jobs
# Check execution plan
# Add to Spark configuration:
# spark.sql.adaptive.enabled=true
# spark.sql.adaptive.coalescePartitions.enabled=true
# Compact Iceberg table
# Run in Spark:
# spark.sql("CALL glue_catalog.system.rewrite_data_files('silver.sales_clean')")
Debug Commands
# View all Spark applications
kubectl get sparkapplication -n spark-jobs
# Get application status
kubectl get sparkapplication process-sales-data -n spark-jobs -o yaml
# View driver logs
kubectl logs -n spark-jobs -l spark-role=driver
# View executor logs
kubectl logs -n spark-jobs -l spark-role=executor --tail=100
# Check Spark Operator logs
kubectl logs -n spark-operator deployment/spark-operator
# List all pods
kubectl get pods -n spark-jobs -o wide
# Check configmaps
kubectl get configmap -n spark-jobs
# View events
kubectl get events -n spark-jobs --sort-by='.lastTimestamp' | tail -20
Top comments (0)