DEV Community

Cover image for Complete Guide: Dockerizing Spark, Kafka, and Jupyter for YouTube Pipeline
Lagat Josiah
Lagat Josiah

Posted on • Edited on

Complete Guide: Dockerizing Spark, Kafka, and Jupyter for YouTube Pipeline

Dockerized Spark, Kafka, and Jupyter for YouTube Data Pipeline

Table of Contents

  1. Project Overview
  2. Prerequisites
  3. Project Structure
  4. Environment Configuration
  5. Kafka Configuration
  6. Spark Configuration
  7. Jupyter Configuration
  8. Airflow Configuration
  9. Docker Compose Implementation
  10. Deployment and Testing
  11. Integration Verification

Project Overview {#overview}

This guide details the implementation of a containerized data pipeline for extracting, processing, and analyzing YouTube data using industry-standard tools.

Architecture:

YouTube API → Airflow (Orchestration) → Kafka (Streaming) → Spark (Processing) → PostgreSQL (Storage) → Jupyter (Analysis)
Enter fullscreen mode Exit fullscreen mode

Technology Stack:

  • Docker & Docker Compose
  • Apache Kafka (Message Streaming)
  • Apache Spark (Data Processing)
  • Jupyter Notebook (Analysis)
  • Apache Airflow (Orchestration)
  • PostgreSQL (Database)

Prerequisites {#prerequisites}

Required Software

# Verify Docker installation
docker --version
# Required: Docker version 20.x or higher

# Verify Docker Compose
docker-compose --version
# Required: docker-compose version 2.x or higher
Enter fullscreen mode Exit fullscreen mode

System Requirements

  • RAM: 8GB minimum (16GB recommended)
  • Disk Space: 15GB available
  • OS: Linux, macOS, or Windows with WSL2

Project Structure {#structure}

Directory Initialization

# Initialize project directory
mkdir youtube-pipeline
cd youtube-pipeline

# Create subdirectories
mkdir -p airflow/{dags,logs,plugins}
mkdir -p spark/scripts
mkdir -p jupyter/notebooks
mkdir -p youtube_extractor
mkdir -p certificates
mkdir -p data/postgres

# Initialize configuration files
touch docker-compose.yml
touch .env
touch airflow/Dockerfile
touch airflow/requirements.txt
touch airflow/dags/youtube_pipeline.py
touch spark/Dockerfile
touch spark/scripts/process_youtube_data.py
touch jupyter/Dockerfile
touch jupyter/notebooks/youtube_analysis.ipynb
touch youtube_extractor/Dockerfile
touch youtube_extractor/extractor.py
touch youtube_extractor/requirements.txt
Enter fullscreen mode Exit fullscreen mode

Directory Structure:

youtube-pipeline/
├── docker-compose.yml
├── .env
├── airflow/
│   ├── Dockerfile
│   ├── dags/
│   │   └── youtube_pipeline.py
│   ├── logs/
│   ├── plugins/
│   └── requirements.txt
├── spark/
│   ├── Dockerfile
│   └── scripts/
│       └── process_youtube_data.py
├── jupyter/
│   ├── Dockerfile
│   └── notebooks/
│       └── youtube_analysis.ipynb
├── youtube_extractor/
│   ├── Dockerfile
│   ├── extractor.py
│   └── requirements.txt
├── certificates/
│   └── ca.pem
└── data/
    └── postgres/
Enter fullscreen mode Exit fullscreen mode

Environment Configuration {#step1}

Configuration File Setup

Create .env in the project root directory:

# YouTube API Configuration
YOUTUBE_API_KEY=your_youtube_api_key_here

# PostgreSQL Configuration
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres123
POSTGRES_DB=youtube_db
POSTGRES_PORT=5432

# Airflow Configuration
AIRFLOW_UID=50000
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://postgres:postgres123@postgres:5432/youtube_db
AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
AIRFLOW__CORE__LOAD_EXAMPLES=False

# Kafka Configuration
KAFKA_BROKER=kafka:9092
KAFKA_TOPIC_INPUT=youtube-data
KAFKA_TOPIC_OUTPUT=processed-data

# Spark Configuration
SPARK_MASTER_URL=spark://spark-master:7077
SPARK_WORKER_MEMORY=2G
SPARK_WORKER_CORES=2

# Jupyter Configuration
JUPYTER_TOKEN=spark123
Enter fullscreen mode Exit fullscreen mode

Fernet Key Generation

python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
# Replace 'your_fernet_key_here' in .env with generated output
Enter fullscreen mode Exit fullscreen mode

Kafka Configuration {#step2}

Architecture Components

The Kafka implementation requires two primary components:

  1. Zookeeper: Manages cluster coordination and configuration
  2. Kafka Broker: Handles message distribution and storage

Network Configuration

  • Port 9092: Inter-container communication
  • Port 9093: Host machine access
  • Zookeeper Port 2181: Coordination service

Spark Configuration {#step3}

Dockerfile Implementation

Create spark/Dockerfile:

FROM bitnami/spark:3.5.0

USER root

# Install Python dependencies
RUN pip install --no-cache-dir \
    kafka-python==2.0.2 \
    pyspark==3.5.0 \
    requests==2.31.0 \
    pandas==2.1.4 \
    psycopg2-binary==2.9.9

# Initialize directories
RUN mkdir -p /opt/spark-apps /opt/spark-data

# Configure permissions
RUN chmod -R 777 /opt/spark-apps /opt/spark-data

USER 1001

WORKDIR /opt/spark-apps
Enter fullscreen mode Exit fullscreen mode

Data Processing Implementation

Create spark/scripts/process_youtube_data.py:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, count, avg, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import os

def create_spark_session():
    """Initialize Spark session with Kafka integration"""
    spark = SparkSession.builder \
        .appName("YouTubeDataProcessor") \
        .master(os.getenv("SPARK_MASTER_URL", "spark://spark-master:7077")) \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
        .config("spark.sql.streaming.checkpointLocation", "/opt/spark-data/checkpoint") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    return spark

# Schema definition for YouTube data
youtube_schema = StructType([
    StructField("channel_id", StringType(), True),
    StructField("channel_title", StringType(), True),
    StructField("subscriber_count", IntegerType(), True),
    StructField("total_views", IntegerType(), True),
    StructField("video_count", IntegerType(), True),
    StructField("timestamp", TimestampType(), True)
])

def process_youtube_stream():
    """Process streaming YouTube data from Kafka"""

    print("Initializing Spark Streaming Application...")
    spark = create_spark_session()

    # Configure Kafka source
    kafka_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", os.getenv("KAFKA_BROKER", "kafka:9092")) \
        .option("subscribe", os.getenv("KAFKA_TOPIC_INPUT", "youtube-data")) \
        .option("startingOffsets", "latest") \
        .load()

    # Parse JSON payload
    parsed_df = kafka_df.select(
        from_json(col("value").cast("string"), youtube_schema).alias("data")
    ).select("data.*")

    # Apply transformations
    processed_df = parsed_df \
        .withColumn("engagement_ratio", col("total_views") / col("subscriber_count")) \
        .withColumn("avg_views_per_video", col("total_views") / col("video_count"))

    # Windowed aggregations (5-minute tumbling window)
    aggregated_df = processed_df \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "5 minutes"),
            col("channel_title")
        ) \
        .agg(
            count("*").alias("record_count"),
            avg("subscriber_count").alias("avg_subscribers"),
            spark_sum("total_views").alias("total_views_sum")
        )

    # Console output for monitoring
    console_query = processed_df \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", "false") \
        .start()

    # Publish processed data to Kafka
    kafka_output_query = processed_df \
        .selectExpr("channel_id as key", "to_json(struct(*)) as value") \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", os.getenv("KAFKA_BROKER", "kafka:9092")) \
        .option("topic", os.getenv("KAFKA_TOPIC_OUTPUT", "processed-data")) \
        .option("checkpointLocation", "/opt/spark-data/checkpoint/kafka-output") \
        .outputMode("append") \
        .start()

    # Persist aggregated data to PostgreSQL
    def write_to_postgres(batch_df, batch_id):
        """Write batch data to PostgreSQL"""
        if not batch_df.isEmpty():
            batch_df.write \
                .format("jdbc") \
                .option("url", f"jdbc:postgresql://postgres:5432/{os.getenv('POSTGRES_DB')}") \
                .option("dbtable", "spark_aggregations") \
                .option("user", os.getenv("POSTGRES_USER")) \
                .option("password", os.getenv("POSTGRES_PASSWORD")) \
                .option("driver", "org.postgresql.Driver") \
                .mode("append") \
                .save()

    postgres_query = aggregated_df \
        .writeStream \
        .foreachBatch(write_to_postgres) \
        .outputMode("update") \
        .start()

    print("Spark Streaming application operational")
    print(f"Source topic: {os.getenv('KAFKA_TOPIC_INPUT', 'youtube-data')}")
    print(f"Destination topic: {os.getenv('KAFKA_TOPIC_OUTPUT', 'processed-data')}")

    spark.streams.awaitAnyTermination()

if __name__ == "__main__":
    try:
        process_youtube_stream()
    except Exception as e:
        print(f"Error in Spark application: {e}")
        raise
Enter fullscreen mode Exit fullscreen mode

Jupyter Configuration {#step4}

Dockerfile Implementation

Create jupyter/Dockerfile:

FROM jupyter/pyspark-notebook:latest

USER root

# Install dependencies
RUN pip install --no-cache-dir \
    kafka-python==2.0.2 \
    psycopg2-binary==2.9.9 \
    matplotlib==3.8.2 \
    seaborn==0.13.0 \
    plotly==5.18.0 \
    pandas==2.1.4 \
    sqlalchemy==2.0.25

# Install PostgreSQL client
RUN apt-get update && apt-get install -y \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Initialize directories
RUN mkdir -p /home/jovyan/work /home/jovyan/data

# Configure permissions
RUN chown -R ${NB_UID}:${NB_GID} /home/jovyan/work /home/jovyan/data

USER ${NB_UID}

WORKDIR /home/jovyan/work
Enter fullscreen mode Exit fullscreen mode

Analysis Notebook Template

Create jupyter/notebooks/youtube_analysis.ipynb:

{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# YouTube Data Analysis\n",
    "## Database Connectivity and Channel Metrics Analysis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import psycopg2\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sns\n",
    "from sqlalchemy import create_engine\n",
    "\n",
    "sns.set_style('whitegrid')\n",
    "plt.rcParams['figure.figsize'] = (12, 6)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Establish database connection\n",
    "engine = create_engine('postgresql://postgres:postgres123@postgres:5432/youtube_db')\n",
    "\n",
    "# Query channel statistics\n",
    "query = \"SELECT * FROM channel_stats ORDER BY subscriber_count DESC\"\n",
    "df = pd.read_sql(query, engine)\n",
    "\n",
    "print(f\"Retrieved {len(df)} channel records\")\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Visualize subscriber distribution\n",
    "plt.figure(figsize=(12, 6))\n",
    "plt.bar(df['channel_title'], df['subscriber_count'])\n",
    "plt.xlabel('Channel')\n",
    "plt.ylabel('Subscribers')\n",
    "plt.title('YouTube Channel Subscriber Distribution')\n",
    "plt.xticks(rotation=45, ha='right')\n",
    "plt.tight_layout()\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Calculate engagement metrics\n",
    "df['avg_views_per_video'] = df['total_views'] / df['video_count']\n",
    "df['engagement_ratio'] = df['total_views'] / df['subscriber_count']\n",
    "\n",
    "df[['channel_title', 'avg_views_per_video', 'engagement_ratio']].sort_values(\n",
    "    'engagement_ratio', ascending=False\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.11.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
Enter fullscreen mode Exit fullscreen mode

Airflow Configuration {#step5}

Dockerfile Implementation

Create airflow/Dockerfile:

FROM apache/airflow:2.8.1-python3.11

USER root

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    python3-dev \
    && rm -rf /var/lib/apt/lists/*

USER airflow

# Copy requirements
COPY requirements.txt /requirements.txt

# Install Python dependencies
RUN pip install --no-cache-dir -r /requirements.txt

# Initialize directories
RUN mkdir -p /opt/airflow/dags /opt/airflow/logs /opt/airflow/plugins
Enter fullscreen mode Exit fullscreen mode

Requirements Specification

Create airflow/requirements.txt:

apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-apache-kafka==1.3.0
kafka-python==2.0.2
requests==2.31.0
psycopg2-binary==2.9.9
google-api-python-client==2.111.0
Enter fullscreen mode Exit fullscreen mode

Pipeline DAG Implementation

Create airflow/dags/youtube_pipeline.py:

from datetime import datetime, timedelta
import json
import requests
from kafka import KafkaProducer
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable

default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

# Target YouTube channels for monitoring
CHANNEL_IDS = [
    "UC_x5XG1OV2P6uZZ5FSM9Ttw",  # Google for Developers
    "UCq-Fj5jknLsU6-M0R3PiHcA",  # YouTube Creators
    "UCBJycsmduvYEL83R_U4JriQ",  # MKBHD
]

def extract_youtube_data(**context):
    """Extract YouTube channel data via YouTube Data API"""
    import os

    api_key = os.getenv('YOUTUBE_API_KEY')
    if not api_key or api_key == 'your_youtube_api_key_here':
        raise ValueError("YOUTUBE_API_KEY environment variable not configured")

    print(f"Initiating extraction for {len(CHANNEL_IDS)} channels")

    channel_data = []

    for channel_id in CHANNEL_IDS:
        try:
            url = "https://www.googleapis.com/youtube/v3/channels"
            params = {
                'part': 'snippet,statistics',
                'id': channel_id,
                'key': api_key
            }

            response = requests.get(url, params=params)
            data = response.json()

            if 'items' in data and data['items']:
                channel_info = data['items'][0]
                snippet = channel_info['snippet']
                statistics = channel_info['statistics']

                channel_record = {
                    'channel_id': channel_id,
                    'channel_title': snippet['title'],
                    'channel_description': snippet.get('description', '')[:500],
                    'channel_created_at': snippet['publishedAt'],
                    'total_views': int(statistics.get('viewCount', 0)),
                    'subscriber_count': int(statistics.get('subscriberCount', 0)),
                    'video_count': int(statistics.get('videoCount', 0)),
                    'timestamp': datetime.now().isoformat()
                }

                channel_data.append(channel_record)
                print(f"Extracted: {snippet['title']}")

        except Exception as e:
            print(f"Extraction failed for channel {channel_id}: {e}")

    context['task_instance'].xcom_push(key='channel_data', value=channel_data)
    return f"Extraction complete: {len(channel_data)} channels processed"

def publish_to_kafka(**context):
    """Publish extracted data to Kafka message broker"""
    import os

    channel_data = context['task_instance'].xcom_pull(
        task_ids='extract_youtube_data',
        key='channel_data'
    )

    if not channel_data:
        print("No data available for publishing")
        return

    producer = KafkaProducer(
        bootstrap_servers=os.getenv('KAFKA_BROKER', 'kafka:9092'),
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        retries=3
    )

    topic = os.getenv('KAFKA_TOPIC_INPUT', 'youtube-data')

    for channel in channel_data:
        try:
            future = producer.send(topic, value=channel)
            future.get(timeout=10)
            print(f"Published: {channel['channel_title']}")
        except Exception as e:
            print(f"Publishing failed: {e}")

    producer.flush()
    producer.close()

    print(f"Published {len(channel_data)} records to topic '{topic}'")

def load_to_postgres(**context):
    """Persist data to PostgreSQL database"""
    channel_data = context['task_instance'].xcom_pull(
        task_ids='extract_youtube_data',
        key='channel_data'
    )

    if not channel_data:
        print("No data available for loading")
        return

    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')

    insert_sql = """
        INSERT INTO channel_stats 
        (channel_id, channel_title, channel_description, channel_created_at, 
         total_views, subscriber_count, video_count, processed_at)
        VALUES (%s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
        ON CONFLICT (channel_id) DO UPDATE SET
            channel_title = EXCLUDED.channel_title,
            total_views = EXCLUDED.total_views,
            subscriber_count = EXCLUDED.subscriber_count,
            video_count = EXCLUDED.video_count,
            processed_at = CURRENT_TIMESTAMP
    """

    for channel in channel_data:
        postgres_hook.run(insert_sql, parameters=(
            channel['channel_id'],
            channel['channel_title'],
            channel['channel_description'],
            channel['channel_created_at'],
            channel['total_views'],
            channel['subscriber_count'],
            channel['video_count']
        ))

    print(f"Loaded {len(channel_data)} records to PostgreSQL")

with DAG(
    'youtube_kafka_spark_pipeline',
    default_args=default_args,
    description='YouTube data pipeline with Kafka and Spark integration',
    schedule_interval=timedelta(hours=6),
    catchup=False,
    tags=['youtube', 'kafka', 'spark'],
) as dag:

    create_tables = PostgresOperator(
        task_id='create_tables',
        postgres_conn_id='postgres_default',
        sql='''
            CREATE TABLE IF NOT EXISTS channel_stats (
                channel_id VARCHAR(255) PRIMARY KEY,
                channel_title TEXT,
                channel_description TEXT,
                channel_created_at TIMESTAMP,
                total_views BIGINT,
                subscriber_count BIGINT,
                video_count BIGINT,
                processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );

            CREATE TABLE IF NOT EXISTS spark_aggregations (
                window_start TIMESTAMP,
                window_end TIMESTAMP,
                channel_title TEXT,
                record_count INTEGER,
                avg_subscribers DOUBLE PRECISION,
                total_views_sum BIGINT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        '''
    )

    extract_data = PythonOperator(
        task_id='extract_youtube_data',
        python_callable=extract_youtube_data,
        provide_context=True
    )

    publish_kafka = PythonOperator(
        task_id='publish_to_kafka',
        python_callable=publish_to_kafka,
        provide_context=True
    )

    load_postgres = PythonOperator(
        task_id='load_to_postgres',
        python_callable=load_to_postgres,
        provide_context=True
    )

    create_tables >> extract_data >> [publish_kafka, load_postgres]
Enter fullscreen mode Exit fullscreen mode

Docker Compose Implementation {#step6}

Complete Configuration

Create docker-compose.yml:


yaml
version: '3.8'

services:
  postgres:
    image: postgres:13
    container_name: postgres
    environment:
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DB}
    ports:
      - "5432:5432"
    volumes:
      - ./data/postgres:/var/lib/postgresql/data
    networks:
      - youtube-network
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks:
      - youtube-network
    healthcheck:
      test: ["CMD-SHELL", "echo ruok | nc localhost 2181"]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_NUM_PARTITIONS: 3
    networks:
      - youtube-network
    healthcheck:
      test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092"]
      interval: 10s
      timeout: 10s
      retries: 5

  spark-master:
    build:
      context: ./spark
      dockerfile: Dockerfile
    container_name: spark-master
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_MASTER_WEBUI_PORT=8080
      - POSTGRES_USER=${POSTGRES_USER}
      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
      - POSTGRES_DB=${POSTGRES_DB}
      - KAFKA_BROKER=${KAFKA_BROKER}
      - KAFKA_TOPIC_INPUT=${KAFKA_TOPIC_INPUT}
      - KAFKA_TOPIC_OUTPUT=${KAFKA_TOPIC_OUTPUT}
    ports:
      - "8080:8080"
      - "7077:7077"
    volumes:
      - ./spark/scripts:/opt/spark-apps
      - ./data:/opt/spark-data
    networks:
      - youtube-network
    depends_on:
      kafka:
        condition: service_healthy

  spark-worker-1:
    build:
      context: ./spark
      dockerfile: Dockerfile
    container_name: spark-worker-1
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY}
      - SPARK_WORKER_CORES=${SPARK_WORKER_CORES}
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_WORKER_WEBUI_PORT=8081
    ports:
      - "8081:8081"
    volumes:
      - ./spark/scripts:/opt/spark-apps
      - ./data:/opt/spark-data
    networks:
      - youtube-network
    depends_on:
      - spark-master

  spark-worker-2:
    build:
      context: ./spark
      dockerfile: Dockerfile
    container_name: spark-worker-2
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY}
      - SPARK_WORKER_CORES=${SPARK_WORKER_CORES}
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_WORKER_WEBUI_PORT=8082
    ports:
      - "8082:8082"
    volumes:
      - ./spark/scripts:/opt/spark-apps
      - ./data:/opt/spark-data
    networks:
      - youtube-network
    depends_on:
      - spark-master

  jupyter:
    build:
      context: ./jupyter
      dockerfile: Dockerfile
    container_name: jupyter
    environment:
      - JUPYTER_ENABLE_LAB=yes
      - JUPYTER_TOKEN=${JUPYTER_TOKEN}
      - SPARK_MASTER_URL=${SPARK_MASTER_URL}
      - POSTGRES_USER=${POSTGRES_USER}
      - POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
      - POSTGRES_DB=${POSTGRES_DB}
    ports:
      - "8888:8888"
    volumes:
      - ./jupyter/notebooks:/home/jovyan/work
      - ./data:/home/jovyan/data
    networks:
      - youtube-network
    depends_on:
      - postgres
      - spark-master
    command: start-notebook.sh --NotebookApp.token='${JUPYTER_TOKEN}'

  airflow:
    build:
      context: ./airflow
      dockerfile: Dockerfile
    container_name: airflow
    environment:
      - AIRFLOW_UID=${AIRFLOW_UID}
      - AIRFLOW__CORE__EXECUTOR=${AIRFLOW__CORE__EXECUTOR}
      - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=${AIRFLOW__CORE__SQL_ALCHEMY_CONN}
      - AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY}
      - AIRFLOW__CORE__LOAD_EXAMPLES=${AIRFLOW__CORE__LOAD_EXAMPLES}
      - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=false
      - YOUTUBE_API_KEY=${YOUTUBE_API_KEY}
      - KAFKA_BROKER=${KAFKA_BROKER}
      - KAFKA_TOPIC_INPUT=${KAFKA_TOPIC_INPUT}
      - KAFKA_TOPIC_OUTPUT=${KAFKA_TOPIC_OUTPUT}
    ports:
      - "8085
Enter fullscreen mode Exit fullscreen mode

Top comments (0)