Dockerized Spark, Kafka, and Jupyter for YouTube Data Pipeline
Table of Contents
- Project Overview
- Prerequisites
- Project Structure
- Environment Configuration
- Kafka Configuration
- Spark Configuration
- Jupyter Configuration
- Airflow Configuration
- Docker Compose Implementation
- Deployment and Testing
- Integration Verification
Project Overview {#overview}
This guide details the implementation of a containerized data pipeline for extracting, processing, and analyzing YouTube data using industry-standard tools.
Architecture:
YouTube API → Airflow (Orchestration) → Kafka (Streaming) → Spark (Processing) → PostgreSQL (Storage) → Jupyter (Analysis)
Technology Stack:
- Docker & Docker Compose
- Apache Kafka (Message Streaming)
- Apache Spark (Data Processing)
- Jupyter Notebook (Analysis)
- Apache Airflow (Orchestration)
- PostgreSQL (Database)
Prerequisites {#prerequisites}
Required Software
# Verify Docker installation
docker --version
# Required: Docker version 20.x or higher
# Verify Docker Compose
docker-compose --version
# Required: docker-compose version 2.x or higher
System Requirements
- RAM: 8GB minimum (16GB recommended)
- Disk Space: 15GB available
- OS: Linux, macOS, or Windows with WSL2
Project Structure {#structure}
Directory Initialization
# Initialize project directory
mkdir youtube-pipeline
cd youtube-pipeline
# Create subdirectories
mkdir -p airflow/{dags,logs,plugins}
mkdir -p spark/scripts
mkdir -p jupyter/notebooks
mkdir -p youtube_extractor
mkdir -p certificates
mkdir -p data/postgres
# Initialize configuration files
touch docker-compose.yml
touch .env
touch airflow/Dockerfile
touch airflow/requirements.txt
touch airflow/dags/youtube_pipeline.py
touch spark/Dockerfile
touch spark/scripts/process_youtube_data.py
touch jupyter/Dockerfile
touch jupyter/notebooks/youtube_analysis.ipynb
touch youtube_extractor/Dockerfile
touch youtube_extractor/extractor.py
touch youtube_extractor/requirements.txt
Directory Structure:
youtube-pipeline/
├── docker-compose.yml
├── .env
├── airflow/
│ ├── Dockerfile
│ ├── dags/
│ │ └── youtube_pipeline.py
│ ├── logs/
│ ├── plugins/
│ └── requirements.txt
├── spark/
│ ├── Dockerfile
│ └── scripts/
│ └── process_youtube_data.py
├── jupyter/
│ ├── Dockerfile
│ └── notebooks/
│ └── youtube_analysis.ipynb
├── youtube_extractor/
│ ├── Dockerfile
│ ├── extractor.py
│ └── requirements.txt
├── certificates/
│ └── ca.pem
└── data/
└── postgres/
Environment Configuration {#step1}
Configuration File Setup
Create .env in the project root directory:
# YouTube API Configuration
YOUTUBE_API_KEY=your_youtube_api_key_here
# PostgreSQL Configuration
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres123
POSTGRES_DB=youtube_db
POSTGRES_PORT=5432
# Airflow Configuration
AIRFLOW_UID=50000
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://postgres:postgres123@postgres:5432/youtube_db
AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
AIRFLOW__CORE__LOAD_EXAMPLES=False
# Kafka Configuration
KAFKA_BROKER=kafka:9092
KAFKA_TOPIC_INPUT=youtube-data
KAFKA_TOPIC_OUTPUT=processed-data
# Spark Configuration
SPARK_MASTER_URL=spark://spark-master:7077
SPARK_WORKER_MEMORY=2G
SPARK_WORKER_CORES=2
# Jupyter Configuration
JUPYTER_TOKEN=spark123
Fernet Key Generation
python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
# Replace 'your_fernet_key_here' in .env with generated output
Kafka Configuration {#step2}
Architecture Components
The Kafka implementation requires two primary components:
- Zookeeper: Manages cluster coordination and configuration
- Kafka Broker: Handles message distribution and storage
Network Configuration
- Port 9092: Inter-container communication
- Port 9093: Host machine access
- Zookeeper Port 2181: Coordination service
Spark Configuration {#step3}
Dockerfile Implementation
Create spark/Dockerfile:
FROM bitnami/spark:3.5.0
USER root
# Install Python dependencies
RUN pip install --no-cache-dir \
kafka-python==2.0.2 \
pyspark==3.5.0 \
requests==2.31.0 \
pandas==2.1.4 \
psycopg2-binary==2.9.9
# Initialize directories
RUN mkdir -p /opt/spark-apps /opt/spark-data
# Configure permissions
RUN chmod -R 777 /opt/spark-apps /opt/spark-data
USER 1001
WORKDIR /opt/spark-apps
Data Processing Implementation
Create spark/scripts/process_youtube_data.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, count, avg, sum as spark_sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import os
def create_spark_session():
"""Initialize Spark session with Kafka integration"""
spark = SparkSession.builder \
.appName("YouTubeDataProcessor") \
.master(os.getenv("SPARK_MASTER_URL", "spark://spark-master:7077")) \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.config("spark.sql.streaming.checkpointLocation", "/opt/spark-data/checkpoint") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
return spark
# Schema definition for YouTube data
youtube_schema = StructType([
StructField("channel_id", StringType(), True),
StructField("channel_title", StringType(), True),
StructField("subscriber_count", IntegerType(), True),
StructField("total_views", IntegerType(), True),
StructField("video_count", IntegerType(), True),
StructField("timestamp", TimestampType(), True)
])
def process_youtube_stream():
"""Process streaming YouTube data from Kafka"""
print("Initializing Spark Streaming Application...")
spark = create_spark_session()
# Configure Kafka source
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", os.getenv("KAFKA_BROKER", "kafka:9092")) \
.option("subscribe", os.getenv("KAFKA_TOPIC_INPUT", "youtube-data")) \
.option("startingOffsets", "latest") \
.load()
# Parse JSON payload
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), youtube_schema).alias("data")
).select("data.*")
# Apply transformations
processed_df = parsed_df \
.withColumn("engagement_ratio", col("total_views") / col("subscriber_count")) \
.withColumn("avg_views_per_video", col("total_views") / col("video_count"))
# Windowed aggregations (5-minute tumbling window)
aggregated_df = processed_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("channel_title")
) \
.agg(
count("*").alias("record_count"),
avg("subscriber_count").alias("avg_subscribers"),
spark_sum("total_views").alias("total_views_sum")
)
# Console output for monitoring
console_query = processed_df \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
# Publish processed data to Kafka
kafka_output_query = processed_df \
.selectExpr("channel_id as key", "to_json(struct(*)) as value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", os.getenv("KAFKA_BROKER", "kafka:9092")) \
.option("topic", os.getenv("KAFKA_TOPIC_OUTPUT", "processed-data")) \
.option("checkpointLocation", "/opt/spark-data/checkpoint/kafka-output") \
.outputMode("append") \
.start()
# Persist aggregated data to PostgreSQL
def write_to_postgres(batch_df, batch_id):
"""Write batch data to PostgreSQL"""
if not batch_df.isEmpty():
batch_df.write \
.format("jdbc") \
.option("url", f"jdbc:postgresql://postgres:5432/{os.getenv('POSTGRES_DB')}") \
.option("dbtable", "spark_aggregations") \
.option("user", os.getenv("POSTGRES_USER")) \
.option("password", os.getenv("POSTGRES_PASSWORD")) \
.option("driver", "org.postgresql.Driver") \
.mode("append") \
.save()
postgres_query = aggregated_df \
.writeStream \
.foreachBatch(write_to_postgres) \
.outputMode("update") \
.start()
print("Spark Streaming application operational")
print(f"Source topic: {os.getenv('KAFKA_TOPIC_INPUT', 'youtube-data')}")
print(f"Destination topic: {os.getenv('KAFKA_TOPIC_OUTPUT', 'processed-data')}")
spark.streams.awaitAnyTermination()
if __name__ == "__main__":
try:
process_youtube_stream()
except Exception as e:
print(f"Error in Spark application: {e}")
raise
Jupyter Configuration {#step4}
Dockerfile Implementation
Create jupyter/Dockerfile:
FROM jupyter/pyspark-notebook:latest
USER root
# Install dependencies
RUN pip install --no-cache-dir \
kafka-python==2.0.2 \
psycopg2-binary==2.9.9 \
matplotlib==3.8.2 \
seaborn==0.13.0 \
plotly==5.18.0 \
pandas==2.1.4 \
sqlalchemy==2.0.25
# Install PostgreSQL client
RUN apt-get update && apt-get install -y \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Initialize directories
RUN mkdir -p /home/jovyan/work /home/jovyan/data
# Configure permissions
RUN chown -R ${NB_UID}:${NB_GID} /home/jovyan/work /home/jovyan/data
USER ${NB_UID}
WORKDIR /home/jovyan/work
Analysis Notebook Template
Create jupyter/notebooks/youtube_analysis.ipynb:
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# YouTube Data Analysis\n",
"## Database Connectivity and Channel Metrics Analysis"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import psycopg2\n",
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"from sqlalchemy import create_engine\n",
"\n",
"sns.set_style('whitegrid')\n",
"plt.rcParams['figure.figsize'] = (12, 6)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Establish database connection\n",
"engine = create_engine('postgresql://postgres:postgres123@postgres:5432/youtube_db')\n",
"\n",
"# Query channel statistics\n",
"query = \"SELECT * FROM channel_stats ORDER BY subscriber_count DESC\"\n",
"df = pd.read_sql(query, engine)\n",
"\n",
"print(f\"Retrieved {len(df)} channel records\")\n",
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Visualize subscriber distribution\n",
"plt.figure(figsize=(12, 6))\n",
"plt.bar(df['channel_title'], df['subscriber_count'])\n",
"plt.xlabel('Channel')\n",
"plt.ylabel('Subscribers')\n",
"plt.title('YouTube Channel Subscriber Distribution')\n",
"plt.xticks(rotation=45, ha='right')\n",
"plt.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Calculate engagement metrics\n",
"df['avg_views_per_video'] = df['total_views'] / df['video_count']\n",
"df['engagement_ratio'] = df['total_views'] / df['subscriber_count']\n",
"\n",
"df[['channel_title', 'avg_views_per_video', 'engagement_ratio']].sort_values(\n",
" 'engagement_ratio', ascending=False\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Airflow Configuration {#step5}
Dockerfile Implementation
Create airflow/Dockerfile:
FROM apache/airflow:2.8.1-python3.11
USER root
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
python3-dev \
&& rm -rf /var/lib/apt/lists/*
USER airflow
# Copy requirements
COPY requirements.txt /requirements.txt
# Install Python dependencies
RUN pip install --no-cache-dir -r /requirements.txt
# Initialize directories
RUN mkdir -p /opt/airflow/dags /opt/airflow/logs /opt/airflow/plugins
Requirements Specification
Create airflow/requirements.txt:
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-apache-kafka==1.3.0
kafka-python==2.0.2
requests==2.31.0
psycopg2-binary==2.9.9
google-api-python-client==2.111.0
Pipeline DAG Implementation
Create airflow/dags/youtube_pipeline.py:
from datetime import datetime, timedelta
import json
import requests
from kafka import KafkaProducer
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
# Target YouTube channels for monitoring
CHANNEL_IDS = [
"UC_x5XG1OV2P6uZZ5FSM9Ttw", # Google for Developers
"UCq-Fj5jknLsU6-M0R3PiHcA", # YouTube Creators
"UCBJycsmduvYEL83R_U4JriQ", # MKBHD
]
def extract_youtube_data(**context):
"""Extract YouTube channel data via YouTube Data API"""
import os
api_key = os.getenv('YOUTUBE_API_KEY')
if not api_key or api_key == 'your_youtube_api_key_here':
raise ValueError("YOUTUBE_API_KEY environment variable not configured")
print(f"Initiating extraction for {len(CHANNEL_IDS)} channels")
channel_data = []
for channel_id in CHANNEL_IDS:
try:
url = "https://www.googleapis.com/youtube/v3/channels"
params = {
'part': 'snippet,statistics',
'id': channel_id,
'key': api_key
}
response = requests.get(url, params=params)
data = response.json()
if 'items' in data and data['items']:
channel_info = data['items'][0]
snippet = channel_info['snippet']
statistics = channel_info['statistics']
channel_record = {
'channel_id': channel_id,
'channel_title': snippet['title'],
'channel_description': snippet.get('description', '')[:500],
'channel_created_at': snippet['publishedAt'],
'total_views': int(statistics.get('viewCount', 0)),
'subscriber_count': int(statistics.get('subscriberCount', 0)),
'video_count': int(statistics.get('videoCount', 0)),
'timestamp': datetime.now().isoformat()
}
channel_data.append(channel_record)
print(f"Extracted: {snippet['title']}")
except Exception as e:
print(f"Extraction failed for channel {channel_id}: {e}")
context['task_instance'].xcom_push(key='channel_data', value=channel_data)
return f"Extraction complete: {len(channel_data)} channels processed"
def publish_to_kafka(**context):
"""Publish extracted data to Kafka message broker"""
import os
channel_data = context['task_instance'].xcom_pull(
task_ids='extract_youtube_data',
key='channel_data'
)
if not channel_data:
print("No data available for publishing")
return
producer = KafkaProducer(
bootstrap_servers=os.getenv('KAFKA_BROKER', 'kafka:9092'),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
retries=3
)
topic = os.getenv('KAFKA_TOPIC_INPUT', 'youtube-data')
for channel in channel_data:
try:
future = producer.send(topic, value=channel)
future.get(timeout=10)
print(f"Published: {channel['channel_title']}")
except Exception as e:
print(f"Publishing failed: {e}")
producer.flush()
producer.close()
print(f"Published {len(channel_data)} records to topic '{topic}'")
def load_to_postgres(**context):
"""Persist data to PostgreSQL database"""
channel_data = context['task_instance'].xcom_pull(
task_ids='extract_youtube_data',
key='channel_data'
)
if not channel_data:
print("No data available for loading")
return
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
insert_sql = """
INSERT INTO channel_stats
(channel_id, channel_title, channel_description, channel_created_at,
total_views, subscriber_count, video_count, processed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (channel_id) DO UPDATE SET
channel_title = EXCLUDED.channel_title,
total_views = EXCLUDED.total_views,
subscriber_count = EXCLUDED.subscriber_count,
video_count = EXCLUDED.video_count,
processed_at = CURRENT_TIMESTAMP
"""
for channel in channel_data:
postgres_hook.run(insert_sql, parameters=(
channel['channel_id'],
channel['channel_title'],
channel['channel_description'],
channel['channel_created_at'],
channel['total_views'],
channel['subscriber_count'],
channel['video_count']
))
print(f"Loaded {len(channel_data)} records to PostgreSQL")
with DAG(
'youtube_kafka_spark_pipeline',
default_args=default_args,
description='YouTube data pipeline with Kafka and Spark integration',
schedule_interval=timedelta(hours=6),
catchup=False,
tags=['youtube', 'kafka', 'spark'],
) as dag:
create_tables = PostgresOperator(
task_id='create_tables',
postgres_conn_id='postgres_default',
sql='''
CREATE TABLE IF NOT EXISTS channel_stats (
channel_id VARCHAR(255) PRIMARY KEY,
channel_title TEXT,
channel_description TEXT,
channel_created_at TIMESTAMP,
total_views BIGINT,
subscriber_count BIGINT,
video_count BIGINT,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS spark_aggregations (
window_start TIMESTAMP,
window_end TIMESTAMP,
channel_title TEXT,
record_count INTEGER,
avg_subscribers DOUBLE PRECISION,
total_views_sum BIGINT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
'''
)
extract_data = PythonOperator(
task_id='extract_youtube_data',
python_callable=extract_youtube_data,
provide_context=True
)
publish_kafka = PythonOperator(
task_id='publish_to_kafka',
python_callable=publish_to_kafka,
provide_context=True
)
load_postgres = PythonOperator(
task_id='load_to_postgres',
python_callable=load_to_postgres,
provide_context=True
)
create_tables >> extract_data >> [publish_kafka, load_postgres]
Docker Compose Implementation {#step6}
Complete Configuration
Create docker-compose.yml:
yaml
version: '3.8'
services:
postgres:
image: postgres:13
container_name: postgres
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "5432:5432"
volumes:
- ./data/postgres:/var/lib/postgresql/data
networks:
- youtube-network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
networks:
- youtube-network
healthcheck:
test: ["CMD-SHELL", "echo ruok | nc localhost 2181"]
interval: 10s
timeout: 5s
retries: 5
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_NUM_PARTITIONS: 3
networks:
- youtube-network
healthcheck:
test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092"]
interval: 10s
timeout: 10s
retries: 5
spark-master:
build:
context: ./spark
dockerfile: Dockerfile
container_name: spark-master
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_MASTER_WEBUI_PORT=8080
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
- KAFKA_BROKER=${KAFKA_BROKER}
- KAFKA_TOPIC_INPUT=${KAFKA_TOPIC_INPUT}
- KAFKA_TOPIC_OUTPUT=${KAFKA_TOPIC_OUTPUT}
ports:
- "8080:8080"
- "7077:7077"
volumes:
- ./spark/scripts:/opt/spark-apps
- ./data:/opt/spark-data
networks:
- youtube-network
depends_on:
kafka:
condition: service_healthy
spark-worker-1:
build:
context: ./spark
dockerfile: Dockerfile
container_name: spark-worker-1
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY}
- SPARK_WORKER_CORES=${SPARK_WORKER_CORES}
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_WORKER_WEBUI_PORT=8081
ports:
- "8081:8081"
volumes:
- ./spark/scripts:/opt/spark-apps
- ./data:/opt/spark-data
networks:
- youtube-network
depends_on:
- spark-master
spark-worker-2:
build:
context: ./spark
dockerfile: Dockerfile
container_name: spark-worker-2
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY}
- SPARK_WORKER_CORES=${SPARK_WORKER_CORES}
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_WORKER_WEBUI_PORT=8082
ports:
- "8082:8082"
volumes:
- ./spark/scripts:/opt/spark-apps
- ./data:/opt/spark-data
networks:
- youtube-network
depends_on:
- spark-master
jupyter:
build:
context: ./jupyter
dockerfile: Dockerfile
container_name: jupyter
environment:
- JUPYTER_ENABLE_LAB=yes
- JUPYTER_TOKEN=${JUPYTER_TOKEN}
- SPARK_MASTER_URL=${SPARK_MASTER_URL}
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
ports:
- "8888:8888"
volumes:
- ./jupyter/notebooks:/home/jovyan/work
- ./data:/home/jovyan/data
networks:
- youtube-network
depends_on:
- postgres
- spark-master
command: start-notebook.sh --NotebookApp.token='${JUPYTER_TOKEN}'
airflow:
build:
context: ./airflow
dockerfile: Dockerfile
container_name: airflow
environment:
- AIRFLOW_UID=${AIRFLOW_UID}
- AIRFLOW__CORE__EXECUTOR=${AIRFLOW__CORE__EXECUTOR}
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=${AIRFLOW__CORE__SQL_ALCHEMY_CONN}
- AIRFLOW__CORE__FERNET_KEY=${AIRFLOW__CORE__FERNET_KEY}
- AIRFLOW__CORE__LOAD_EXAMPLES=${AIRFLOW__CORE__LOAD_EXAMPLES}
- AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=false
- YOUTUBE_API_KEY=${YOUTUBE_API_KEY}
- KAFKA_BROKER=${KAFKA_BROKER}
- KAFKA_TOPIC_INPUT=${KAFKA_TOPIC_INPUT}
- KAFKA_TOPIC_OUTPUT=${KAFKA_TOPIC_OUTPUT}
ports:
- "8085
Top comments (0)