DEV Community

Chad Dower
Chad Dower

Posted on

Building a Streaming Data Pipeline with Kafka and Spark: Real-Time Analytics Implementation Guide

Why Kafka + Spark for Real-Time Analytics

The combination of Apache Kafka and Apache Spark has become the de facto standard for building streaming data pipelines. Here's why this matters for your projects:

Key benefits:

  • Scalability: Handle millions of events per second across distributed clusters
  • Fault tolerance: Automatic recovery from failures with no data loss
  • Flexibility: Process streams with SQL, DataFrames, or custom functions
  • Low latency: Sub-second processing from ingestion to insights
  • Exactly-once semantics: Guaranteed processing without duplicates

Prerequisites

Before we dive in, make sure you have:

  • Java 8 or 11 installed
  • Python 3.7+ or Scala 2.12 (we'll use Python for examples)
  • Docker and Docker Compose for running Kafka
  • Basic understanding of distributed systems concepts
  • Familiarity with DataFrames (Pandas or Spark)
  • 8GB RAM minimum (16GB recommended)

Setting Up Your Streaming Infrastructure

Let's start by creating our development environment with Kafka and Spark. We'll use Docker Compose to simplify the setup.

Step 1: Create the Docker Compose Configuration

Create a file named docker-compose.yml:

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
Enter fullscreen mode Exit fullscreen mode

Pro Tip: The Kafka UI at http://localhost:8080 provides valuable insights into your topics, consumer groups, and message flow—essential for debugging streaming applications.

Step 2: Install PySpark and Dependencies

Create a requirements.txt file:

pyspark==3.4.0
kafka-python==2.0.2
pandas==2.0.3
numpy==1.24.3
Enter fullscreen mode Exit fullscreen mode

Install the dependencies:

pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

Step 3: Start Your Infrastructure

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Wait about 30 seconds for all services to initialize. You can verify everything is running:

docker-compose ps
Enter fullscreen mode Exit fullscreen mode

Understanding the Streaming Architecture

Before writing code, let's understand what we're building:

The Data Flow

Data Sources → Kafka Topics → Spark Streaming → Processing → Output Sinks
     ↑              ↓                ↓                ↓            ↓
  Producers    Partitions      Micro-batches    Transformations  Storage
Enter fullscreen mode Exit fullscreen mode

Key Components Explained

Kafka Topics: Think of these as distributed, append-only logs that store your streaming data. Each topic is partitioned for parallelism and replicated for fault tolerance.

Spark Structured Streaming: Treats streaming data as an unbounded table that continuously grows. New data arrives as new rows appended to this virtual table.

Checkpointing: Spark periodically saves processing state to recover from failures without data loss or duplication.

Building the Data Producer

Let's create a Python producer that simulates e-commerce events:

# producer.py
import json
import random
import time
from datetime import datetime
from kafka import KafkaProducer
from kafka.errors import KafkaError

class EcommerceEventProducer:
    """Simulates e-commerce events for our streaming pipeline"""

    def __init__(self, bootstrap_servers='localhost:9092'):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            # Ensure all replicas acknowledge
            acks='all',
            # Retry failed sends
            retries=5,
            # Enable compression for better throughput
            compression_type='snappy'
        )

        # Sample data for simulation
        self.products = [
            {'id': 'LAPTOP01', 'name': 'Gaming Laptop', 'category': 'Electronics', 'price': 1299.99},
            {'id': 'PHONE01', 'name': 'Smartphone Pro', 'category': 'Electronics', 'price': 899.99},
            {'id': 'BOOK01', 'name': 'Python for Data Science', 'category': 'Books', 'price': 49.99},
            {'id': 'SHIRT01', 'name': 'Cotton T-Shirt', 'category': 'Clothing', 'price': 29.99},
            {'id': 'COFFEE01', 'name': 'Premium Coffee Beans', 'category': 'Food', 'price': 24.99}
        ]

        self.event_types = ['page_view', 'add_to_cart', 'purchase', 'remove_from_cart']
        self.user_ids = [f'user_{i:04d}' for i in range(1, 101)]

    def generate_event(self):
        """Generate a realistic e-commerce event"""
        product = random.choice(self.products)
        event_type = random.choice(self.event_types)

        event = {
            'event_id': f'evt_{datetime.now().timestamp()}_{random.randint(1000, 9999)}',
            'event_type': event_type,
            'timestamp': datetime.now().isoformat(),
            'user_id': random.choice(self.user_ids),
            'product_id': product['id'],
            'product_name': product['name'],
            'category': product['category'],
            'price': product['price'],
            'quantity': random.randint(1, 3) if event_type in ['add_to_cart', 'purchase'] else 0,
            'session_id': f'session_{random.randint(10000, 99999)}',
            'device_type': random.choice(['mobile', 'desktop', 'tablet']),
            'country': random.choice(['US', 'UK', 'DE', 'FR', 'JP'])
        }

        # Add revenue for purchase events
        if event_type == 'purchase':
            event['revenue'] = event['price'] * event['quantity']

        return event

    def send_events(self, topic='ecommerce-events', events_per_second=10):
        """Send events to Kafka at specified rate"""
        print(f"Starting event producer - sending {events_per_second} events/second to '{topic}'")

        try:
            while True:
                event = self.generate_event()

                # Use user_id as key for partitioning
                future = self.producer.send(
                    topic=topic,
                    key=event['user_id'],
                    value=event
                )

                # Block for 'synchronous' sends (optional)
                try:
                    record_metadata = future.get(timeout=10)
                    print(f"Sent: {event['event_type']} for {event['user_id']} "
                          f"to partition {record_metadata.partition}")
                except KafkaError as e:
                    print(f"Failed to send event: {e}")

                time.sleep(1.0 / events_per_second)

        except KeyboardInterrupt:
            print("Shutting down producer...")
        finally:
            self.producer.flush()
            self.producer.close()

if __name__ == "__main__":
    # Create Kafka topic first
    from kafka.admin import KafkaAdminClient, NewTopic

    admin_client = KafkaAdminClient(
        bootstrap_servers="localhost:9092",
        client_id='ecommerce-admin'
    )

    topic = NewTopic(
        name='ecommerce-events',
        num_partitions=3,
        replication_factor=1
    )

    try:
        admin_client.create_topics(new_topics=[topic], validate_only=False)
        print("Topic 'ecommerce-events' created successfully")
    except Exception as e:
        print(f"Topic might already exist: {e}")

    # Start producing events
    producer = EcommerceEventProducer()
    producer.send_events(events_per_second=5)
Enter fullscreen mode Exit fullscreen mode

💡 Note: The producer uses the user_id as a partitioning key, ensuring all events for a user go to the same partition—crucial for maintaining order and enabling stateful processing.

Creating the Spark Streaming Application

Now let's build the core streaming application using PySpark's Structured Streaming API:

# streaming_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class StreamingPipeline:
    """Real-time analytics pipeline for e-commerce events"""

    def __init__(self, app_name="EcommerceStreaming"):
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.streaming.stopGracefullyOnShutdown", "true") \
            .config("spark.sql.shuffle.partitions", "3") \
            .getOrCreate()

        self.spark.sparkContext.setLogLevel("WARN")

        # Define the schema for our events
        self.event_schema = StructType([
            StructField("event_id", StringType(), False),
            StructField("event_type", StringType(), False),
            StructField("timestamp", StringType(), False),
            StructField("user_id", StringType(), False),
            StructField("product_id", StringType(), False),
            StructField("product_name", StringType(), True),
            StructField("category", StringType(), True),
            StructField("price", DoubleType(), True),
            StructField("quantity", IntegerType(), True),
            StructField("session_id", StringType(), True),
            StructField("device_type", StringType(), True),
            StructField("country", StringType(), True),
            StructField("revenue", DoubleType(), True)
        ])

    def read_from_kafka(self, topic="ecommerce-events", bootstrap_servers="localhost:9092"):
        """Create streaming DataFrame from Kafka"""
        return self.spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", bootstrap_servers) \
            .option("subscribe", topic) \
            .option("startingOffsets", "latest") \
            .option("maxOffsetsPerTrigger", 1000) \
            .option("failOnDataLoss", "false") \
            .load()

    def parse_events(self, df):
        """Parse JSON events and add processing time"""
        return df.select(
            col("key").cast("string").alias("key"),
            from_json(col("value").cast("string"), self.event_schema).alias("data"),
            col("timestamp").alias("kafka_timestamp")
        ).select(
            "key",
            "data.*",
            "kafka_timestamp"
        ).withColumn(
            "event_time", 
            to_timestamp(col("timestamp"))
        ).withColumn(
            "processing_time",
            current_timestamp()
        )

    def calculate_metrics(self, df):
        """Calculate real-time business metrics"""

        # Revenue by category (windowed aggregation)
        revenue_by_category = df \
            .filter(col("event_type") == "purchase") \
            .withWatermark("event_time", "1 minute") \
            .groupBy(
                window(col("event_time"), "1 minute", "30 seconds"),
                col("category")
            ).agg(
                sum("revenue").alias("total_revenue"),
                count("event_id").alias("purchase_count"),
                avg("revenue").alias("avg_order_value")
            ).select(
                col("window.start").alias("window_start"),
                col("window.end").alias("window_end"),
                "category",
                "total_revenue",
                "purchase_count",
                "avg_order_value"
            )

        return revenue_by_category

    def detect_high_value_users(self, df):
        """Identify users with high purchase activity"""

        return df \
            .filter(col("event_type") == "purchase") \
            .withWatermark("event_time", "5 minutes") \
            .groupBy(
                window(col("event_time"), "5 minutes", "1 minute"),
                col("user_id")
            ).agg(
                sum("revenue").alias("total_spent"),
                count("event_id").alias("purchase_count"),
                collect_list("product_name").alias("products_purchased")
            ).filter(
                col("total_spent") > 100
            ).select(
                col("window.start").alias("window_start"),
                "user_id",
                "total_spent",
                "purchase_count",
                "products_purchased"
            )

    def calculate_conversion_funnel(self, df):
        """Track conversion funnel metrics"""

        funnel_metrics = df \
            .withWatermark("event_time", "10 minutes") \
            .groupBy(
                window(col("event_time"), "5 minutes", "1 minute")
            ).agg(
                countDistinct(when(col("event_type") == "page_view", col("user_id"))).alias("unique_visitors"),
                countDistinct(when(col("event_type") == "add_to_cart", col("user_id"))).alias("cart_users"),
                countDistinct(when(col("event_type") == "purchase", col("user_id"))).alias("purchasers")
            ).select(
                col("window.start").alias("window_start"),
                col("window.end").alias("window_end"),
                "unique_visitors",
                "cart_users",
                "purchasers",
                (col("cart_users") / col("unique_visitors") * 100).alias("cart_conversion_rate"),
                (col("purchasers") / col("cart_users") * 100).alias("purchase_conversion_rate")
            )

        return funnel_metrics

    def write_to_console(self, df, query_name):
        """Output stream to console for debugging"""
        return df.writeStream \
            .outputMode("update") \
            .format("console") \
            .option("truncate", False) \
            .trigger(processingTime="10 seconds") \
            .queryName(query_name) \
            .start()

    def write_to_kafka(self, df, topic, query_name):
        """Write processed data back to Kafka"""
        return df.selectExpr("to_json(struct(*)) AS value") \
            .writeStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("topic", topic) \
            .option("checkpointLocation", f"/tmp/checkpoint/{query_name}") \
            .outputMode("update") \
            .trigger(processingTime="10 seconds") \
            .queryName(query_name) \
            .start()

    def run_pipeline(self):
        """Execute the complete streaming pipeline"""
        logger.info("Starting streaming pipeline...")

        # Read from Kafka
        raw_stream = self.read_from_kafka()

        # Parse events
        parsed_stream = self.parse_events(raw_stream)

        # Branch 1: Revenue metrics
        revenue_metrics = self.calculate_metrics(parsed_stream)
        query1 = self.write_to_console(revenue_metrics, "revenue_metrics")

        # Branch 2: High-value user detection
        high_value_users = self.detect_high_value_users(parsed_stream)
        query2 = self.write_to_console(high_value_users, "high_value_users")

        # Branch 3: Conversion funnel
        funnel_metrics = self.calculate_conversion_funnel(parsed_stream)
        query3 = self.write_to_console(funnel_metrics, "conversion_funnel")

        # Wait for all queries
        query1.awaitTermination()

if __name__ == "__main__":
    pipeline = StreamingPipeline()
    pipeline.run_pipeline()
Enter fullscreen mode Exit fullscreen mode

⚠️ Warning: Not setting watermarks properly can lead to unbounded state growth. Always define watermarks before windowed aggregations to tell Spark how late data can arrive.

Advanced Stream Processing Patterns

Stateful Operations with State Store

Let's implement session tracking to understand user behavior patterns:

# session_tracking.py
from pyspark.sql.functions import *
from pyspark.sql.types import *

def track_user_sessions(df):
    """
    Track user sessions with stateful processing
    Sessions timeout after 30 minutes of inactivity
    """

    # Define the state schema
    state_schema = StructType([
        StructField("user_id", StringType(), False),
        StructField("session_start", TimestampType(), False),
        StructField("last_activity", TimestampType(), False),
        StructField("event_count", IntegerType(), False),
        StructField("total_revenue", DoubleType(), True),
        StructField("products_viewed", ArrayType(StringType()), True)
    ])

    def update_session_state(key, values, state):
        """
        Update function for mapGroupsWithState
        key: user_id
        values: iterator of events for this user
        state: current state for this user
        """
        import datetime

        user_id = key[0]
        events_list = list(values)

        if not events_list:
            return []

        # Sort events by timestamp
        events_list.sort(key=lambda x: x.event_time)

        # Get or initialize state
        if state.exists:
            current_state = state.get
            session_start = current_state.session_start
            last_activity = current_state.last_activity
            event_count = current_state.event_count
            total_revenue = current_state.total_revenue or 0
            products_viewed = list(current_state.products_viewed or [])
        else:
            session_start = events_list[0].event_time
            last_activity = events_list[0].event_time
            event_count = 0
            total_revenue = 0
            products_viewed = []

        # Process new events
        session_events = []
        for event in events_list:
            # Check for session timeout (30 minutes)
            time_diff = (event.event_time - last_activity).total_seconds()

            if time_diff > 1800:  # 30 minutes
                # End current session
                session_events.append({
                    'user_id': user_id,
                    'session_start': session_start,
                    'session_end': last_activity,
                    'duration_seconds': (last_activity - session_start).total_seconds(),
                    'event_count': event_count,
                    'total_revenue': total_revenue,
                    'unique_products': len(set(products_viewed)),
                    'session_type': 'completed'
                })

                # Start new session
                session_start = event.event_time
                event_count = 0
                total_revenue = 0
                products_viewed = []

            # Update session metrics
            event_count += 1
            last_activity = event.event_time

            if event.event_type == 'page_view':
                products_viewed.append(event.product_id)
            elif event.event_type == 'purchase':
                total_revenue += event.revenue or 0

        # Update state
        new_state = (
            user_id,
            session_start,
            last_activity,
            event_count,
            total_revenue,
            products_viewed
        )
        state.update(new_state)

        # Set timeout for state cleanup
        state.setTimeoutDuration(1800000)  # 30 minutes in milliseconds

        return session_events

    # Apply stateful processing
    session_df = df \
        .groupBy("user_id") \
        .flatMapGroupsWithState(
            outputMode="append",
            stateFunc=update_session_state,
            timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
        )

    return session_df
Enter fullscreen mode Exit fullscreen mode

Stream-to-Stream Joins

Join multiple streams to correlate events:

def correlate_user_behavior(clicks_df, purchases_df):
    """
    Join click events with purchase events to analyze behavior
    """

    # Add watermarks to both streams
    clicks_with_watermark = clicks_df \
        .filter(col("event_type") == "page_view") \
        .withWatermark("event_time", "10 minutes") \
        .select(
            col("user_id").alias("click_user_id"),
            col("product_id").alias("clicked_product"),
            col("event_time").alias("click_time"),
            col("device_type")
        )

    purchases_with_watermark = purchases_df \
        .filter(col("event_type") == "purchase") \
        .withWatermark("event_time", "10 minutes") \
        .select(
            col("user_id").alias("purchase_user_id"),
            col("product_id").alias("purchased_product"),
            col("event_time").alias("purchase_time"),
            col("revenue")
        )

    # Join streams within time window
    correlated = clicks_with_watermark.join(
        purchases_with_watermark,
        expr("""
            click_user_id = purchase_user_id AND
            clicked_product = purchased_product AND
            click_time < purchase_time AND
            purchase_time <= click_time + interval 30 minutes
        """),
        "inner"
    ).select(
        "click_user_id",
        "clicked_product",
        "click_time",
        "purchase_time",
        (col("purchase_time").cast("long") - col("click_time").cast("long")).alias("time_to_purchase_seconds"),
        "device_type",
        "revenue"
    )

    return correlated
Enter fullscreen mode Exit fullscreen mode

Best Practice: When joining streams, always define watermarks on both streams to prevent unbounded state accumulation and enable automatic state cleanup.

Performance Optimization and Tuning

Optimizing Kafka Configuration

# optimized_kafka_config.py
kafka_read_config = {
    # Parallelism settings
    "kafka.bootstrap.servers": "localhost:9092",
    "subscribe": "ecommerce-events",
    "startingOffsets": "latest",

    # Performance tuning
    "maxOffsetsPerTrigger": 10000,  # Limit records per micro-batch
    "kafkaConsumer.pollTimeoutMs": 512,  # Reduce poll timeout for lower latency
    "fetchOffset.numRetries": 3,
    "fetchOffset.retryIntervalMs": 10,

    # Consumer configuration
    "kafka.consumer.group.id": "spark-streaming-app",
    "kafka.session.timeout.ms": 30000,
    "kafka.max.poll.records": 500,
    "kafka.fetch.min.bytes": 1024,
    "kafka.fetch.max.wait.ms": 500,
}

optimized_stream = spark.readStream \
    .format("kafka") \
    .options(**kafka_read_config) \
    .load()
Enter fullscreen mode Exit fullscreen mode

Spark Streaming Optimizations

# spark_optimizations.py
from pyspark.sql import SparkSession

def create_optimized_spark_session():
    """Create Spark session with optimized configurations"""

    return SparkSession.builder \
        .appName("OptimizedStreaming") \
        .config("spark.sql.streaming.stateStore.rocksdb.changelog.enabled", "true") \
        .config("spark.sql.streaming.stateStore.providerClass", 
                "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.skewJoin.enabled", "true") \
        .config("spark.streaming.backpressure.enabled", "true") \
        .config("spark.streaming.kafka.maxRatePerPartition", "1000") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.default.parallelism", "200") \
        .config("spark.sql.streaming.metricsEnabled", "true") \
        .config("spark.eventLog.enabled", "true") \
        .config("spark.eventLog.dir", "/tmp/spark-events") \
        .getOrCreate()
Enter fullscreen mode Exit fullscreen mode

💡 Note: RocksDB state store provides better performance for large stateful operations compared to the default in-memory state store, especially when dealing with millions of keys.

Monitoring and Observability

Creating a Monitoring Dashboard

# monitoring.py
from pyspark.sql.streaming import StreamingQueryListener
import time
import json

class StreamingMetricsListener(StreamingQueryListener):
    """Custom listener for collecting streaming metrics"""

    def __init__(self):
        self.metrics = []

    def onQueryStarted(self, event):
        """Called when a query starts"""
        print(f"Query started: {event.name} at {event.timestamp}")

    def onQueryProgress(self, event):
        """Called when there's progress in the query"""
        progress = event.progress

        metrics = {
            "timestamp": event.timestamp,
            "batchId": progress.batchId,
            "inputRowsPerSecond": progress.inputRowsPerSecond,
            "processedRowsPerSecond": progress.processedRowsPerSecond,
            "durationMs": progress.durationMs,
            "sources": []
        }

        for source in progress.sources:
            metrics["sources"].append({
                "description": source.description,
                "startOffset": source.startOffset,
                "endOffset": source.endOffset,
                "numInputRows": source.numInputRows
            })

        # Extract batch duration details
        if hasattr(progress, 'durationMs'):
            batch_duration = json.loads(progress.durationMs)
            metrics["addBatch"] = batch_duration.get("addBatch", 0)
            metrics["getBatch"] = batch_duration.get("getBatch", 0)
            metrics["commitOffsets"] = batch_duration.get("commitOffsets", 0)

        self.metrics.append(metrics)

        # Log key metrics
        print(f"Batch {progress.batchId}: "
              f"Input rate: {progress.inputRowsPerSecond:.2f} rows/sec, "
              f"Processing rate: {progress.processedRowsPerSecond:.2f} rows/sec")

        # Alert on slow batches
        if progress.durationMs and progress.durationMs.get("addBatch", 0) > 1000:
            print(f"⚠️ SLOW BATCH DETECTED: {progress.durationMs.get('addBatch')}ms")

    def onQueryTerminated(self, event):
        """Called when a query terminates"""
        print(f"Query terminated: {event.id}")
        if event.exception:
            print(f"Exception: {event.exception}")

# Register the listener
spark = create_optimized_spark_session()
spark.streams.addListener(StreamingMetricsListener())
Enter fullscreen mode Exit fullscreen mode

Conclusion

Congratulations! You've built a production-ready real-time analytics pipeline using Apache Kafka and Spark. The architecture you've created can scale to handle millions of events while providing insights with sub-second latency.

Let's recap what we've accomplished:

  • Set up a complete Kafka and Spark streaming environment
  • Created a realistic data producer with proper partitioning
  • Built a multi-branch streaming application for real-time analytics
  • Implemented stateful processing for session tracking
  • Optimized performance with proper configuration
  • Added monitoring for production readiness

Next Steps:

  1. Extend the pipeline with machine learning models for real-time recommendations
  2. Add a visualization layer with Grafana or Kibana
  3. Implement schema evolution strategies with Avro or Protobuf
  4. Deploy to a production environment using Kubernetes

Resources


Found this helpful? Leave a comment below with your streaming use cases or questions!

Top comments (0)