DEV Community

Cover image for Creating a Development Environment for Spark Structured Streaming, Kafka, and Prometheus
Nazli Ander
Nazli Ander

Posted on • Edited on

Creating a Development Environment for Spark Structured Streaming, Kafka, and Prometheus

Docker-compose allows us to simulate pretty complex programming setups in our local environments. It is very fun to test some hard-to-maintain technologies such as Kafka and Spark using Docker-compose.

A few months ago, I created a demo application while using Spark Structured Streaming, Kafka, and Prometheus within the same Docker-compose file. One can extend this list with an additional Grafana service. The codebase was in Python and I was ingesting live Crypto-currency prices into Kafka and consuming those through Spark Structured Streaming. In this write-up instead of talking about the Watermarks and Sinking types in Spark Structured Streaming, I will be only talking about the Docker-compose and how I set up my development environment using Spark, Kafka, Prometheus, and a Zookeeper. To have the whole codebase for my demo project, please refer to the Github repository.

Service Blocks

In the Docker-compose, I needed the following services to keep my streaming data producer and consumer live, at the same time monitor the ingestions into Kafka:

  • Spark standalone cluster: Consisting of one master and a worker code
    • Spark-master
    • Spark-worker
  • Zookeeper: A requirement for Kafka (soon it will not be a requirement) to maintain the brokers and topics. For instance, if a broker joins or dies, Zookeeper informs the cluster.
  • Kafka: A Message-oriented Middleware (MoM) for dealing with large streams of data. In this case, we have streams of crypto-currency prices.
  • Prometheus-JMX-Exporter: An exporter to connect Java Management Extensions (JMX) and translate into the language that Prometheus can understand. Remembering the Kafka is an example of a Java application, this will be a magic service that enables us to scrape Kafka metrics automatically.
  • Prometheus: Time-series database logging and modern alerting tool.

Spark Services

In the most basic setup for the standalone Spark cluster, we need one master and one worker node. You can use Docker-compose volumes for mounting folders. For Spark, perhaps the most common mounting reason is sharing the connectors (.jar files) or scripts.

For retrieving a Spark image from Docker Hub, as Big Data Europe has a very stable and extensive set of Spark Hadoop images, I preferred to use their images in my demo project. This prevented also some redundant work, like creating multiple Dockerfiles per Spark node.

I needed to take care of the Networking within the Docker-compose settings. Hence, I created a Bridge network with a custom naming as "crypto-network". The Bridge network enables us to run our standalone containers while communicating with each other. For more information about different network drivers in Docker containers, please refer to Docker documentation, very fun to read. While setting up I tried to give different forwarded host ports rather than using 8080 for the Web UI to prevent conflicts with JMX-Exporter. Besides, I wanted the worker nodes to be dependent on the master node to set up the order of container creations.

Lastly, following the BDE example, I override the SPARK_MASTER with environment variables. Here I am sharing the Spark component of the demo application.

---
version: "3.2"
services:

  spark-master:
    image: bde2020/spark-master:2.2.2-hadoop2.7
    container_name: spark-master
    networks:
      - crypto-network
    volumes:
      - ./connectors:/connectors
      - ./:/scripts/
    ports:
      - 8082:8080
      - 7077:7077
    environment:
      - INIT_DAEMON_STEP=false

  spark-worker-1:
    image: bde2020/spark-worker:2.2.2-hadoop2.7
    container_name: spark-worker-1
    networks:
      - crypto-network
    depends_on:
      - spark-master
    ports:
      - 8083:8081
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"


networks:
  crypto-network:
    driver: "bridge"

Enter fullscreen mode Exit fullscreen mode

You can start the services with:

docker-compose up
Enter fullscreen mode Exit fullscreen mode

Then you can see the Spark master node setup with:

docker exec -it spark-master bash
Enter fullscreen mode Exit fullscreen mode

Kafka Services

To run Kafka in a standalone mode, I needed Zookeeper and Kafka itself with some fancy environment variables. Basically, Kafka needs to find the Zookeeper client port and it needs to advertise the correct ports to Spark applications.

To run this setting I used the Confluent images. Here, I am sharing the Kafka related services block. A Confluent image already allows us to set up:

  • Kafka topics by using the environment variables:
    • KAFKA_CREATE_TOPICS: Topic names to be created
    • KAFKA_AUTO_CREATE_TOPICS_ENABLE: Self-explaining perhaps
    • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: Self-explaining perhaps
  • Connection to Zookeeper using the environment variable KAFKA_ZOOKEEPER_CONNECT
  • With KAFKA_BROKER_ID giving a custom broker id for a particular node
  • Advertising the correct ports for the docker network internal services or external connections:
    • KAFKA_INTER_BROKER_LISTENER_NAME: Listener name for the setup
    • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: Listener setup with mapping
    • KAFKA_ADVERTISED_LISTENERS: Listener setup for internal and external networking. This is a bit tricky, so if I consume or produce any message in the internal Docker network, with the example below I need to connect to kafka:29092. From outside of Docker, I can use a consumer or producer via localhost:9092. For more information, here is an awesome explanation.
---
version: "3.2"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    networks:
      - crypto-network
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka
    container_name: kafka
    depends_on:
      - zookeeper
    networks:
      - crypto-network
    ports:
      - 9092:9092
      - 30001:30001
    environment:
      KAFKA_CREATE_TOPICS: crypto_raw,crypto_latest_trends,crypto_moving_average
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

networks:
  crypto-network:
    driver: "bridge"
Enter fullscreen mode Exit fullscreen mode

Prometheus Services

In this project, I wanted to scrape Kafka's logs automatically. Hence, apart from the Prometheus service itself, I needed to also use the JMX-Exporter. And I realized that it is the coolest kid in a Docker-compose.

For both Prometheus and it JMX-Exporter, I needed to use custom Dockerfiles as they require some templates to be aware of each other. I used a separate ./tools/ folder to keep my monitoring related settings. And within the ./tools/prometheus-jmx-exporter, I had a confd folder to make use of and configure Docker containers at run-time. Here the file structure is as follows:

.
├── prometheus
│   ├── Dockerfile
│   └── prometheus.yml
└── prometheus-jmx-exporter
    ├── Dockerfile
    ├── confd
    │   ├── conf.d
    │   │   ├── kafka.yml.toml
    │   │   └── start-jmx-scraper.sh.toml
    │   └── templates
    │       ├── kafka.yml.tmpl
    │       └── start-jmx-scraper.sh.tmpl
    └── entrypoint.sh
Enter fullscreen mode Exit fullscreen mode

Let's start with the Prometheus image as it is more straightforward. We need to use a custom Dockerfile to get the config with custom scraper settings.

The Dockerfile will be:

FROM prom/prometheus:v2.8.1

ADD ./prometheus.yml /etc/prometheus/prometheus.yml

CMD [ "--config.file=/etc/prometheus/prometheus.yml","--web.enable-admin-api" ]
Enter fullscreen mode Exit fullscreen mode

And the prometheus.yml would be pointing the following, with a scrape interval of 5 seconds. In prometheus.yml, Prometheus targets a service called kafka-jmx-exporter with port 8080. Hence, in the Docker-compose, I should be using the same container name for JMX-Exporter as the targeted service.

global:
  scrape_interval:     5s
  evaluation_interval: 5s

scrape_configs:
  - job_name: 'kafka'
    scrape_interval: 5s
    static_configs:
      - targets: ['kafka-jmx-exporter:8080']
Enter fullscreen mode Exit fullscreen mode

To create the JMX-Exporter image, I needed more tweaks. Let's start with the Dockerfile. The image for the JMX-Exporter uses a base image from Java. Then downloads from Maven repository JMX Prometheus .jar and writes to a file with the name /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar. Next it downloads the Confd and stores in /usr/local/bin/confd, gives execute permissions. Lastly, it copies the entrypoint into /opt/entrypoint.sh.

FROM java:8

RUN mkdir /opt/jmx_prometheus_httpserver && wget 'https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_httpserver/0.11.0/jmx_prometheus_httpserver-0.11.0-jar-with-dependencies.jar' -O /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar

ADD https://github.com/kelseyhightower/confd/releases/download/v0.16.0/confd-0.16.0-linux-amd64 /usr/local/bin/confd
COPY confd /etc/confd
RUN chmod +x /usr/local/bin/confd

COPY entrypoint.sh /opt/entrypoint.sh
ENTRYPOINT ["/opt/entrypoint.sh"]
Enter fullscreen mode Exit fullscreen mode

In the entrypoint.sh, I had only the execution of Confd, then running the start-jmx-scraper.sh. Hence, after the Confd sets up the source and destination files for both Kafka and JMX Scrapers with .toml, we run the downloaded jmx_prometheus_httpserver.jar file. The entrypoint.sh looks like this:

#!/bin/bash
/usr/local/bin/confd -onetime -backend env
/opt/start-jmx-scraper.sh
Enter fullscreen mode Exit fullscreen mode

And the start-jmx-scraper.shis as follows, the environment variables in Docker-compose define each of the key (JMX_PORT, JMX_HOST, HTTP_PORT, JMX_EXPORTER_CONFIG_FILE) mentioned in the command:

#!/bin/bash
java \
    -Dcom.sun.management.jmxremote.ssl=false \
    -Djava.rmi.server.hostname={{ getv "/jmx/host" }} \
    -Dcom.sun.management.jmxremote.authenticate=false \
    -Dcom.sun.management.jmxremote.port={{ getv "/jmx/port" }} \
    -jar /opt/jmx_prometheus_httpserver/jmx_prometheus_httpserver.jar \
    {{ getv "/http/port" }} \
    /opt/jmx_prometheus_httpserver/{{ getv "/jmx/exporter/config/file" }}
Enter fullscreen mode Exit fullscreen mode

With the given custom Docker images for Prometheus automatically scraping Kafka, the full Docker-compose file for the demo project is as follows:

---
version: "3.2"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    networks:
      - crypto-network
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka
    container_name: kafka
    depends_on:
      - zookeeper
    networks:
      - crypto-network
    ports:
      - 9092:9092
      - 30001:30001
    environment:
      KAFKA_CREATE_TOPICS: crypto_raw,crypto_latest_trends,crypto_moving_average
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      KAFKA_JMX_PORT: 30001
      KAFKA_JMX_HOSTNAME: kafka

  kafka-jmx-exporter:
    build: ./tools/prometheus-jmx-exporter
    container_name: jmx-exporter
    ports:
      - 8080:8080
    links:
      - kafka
    networks:
      - crypto-network
    environment:
      JMX_PORT: 30001
      JMX_HOST: kafka
      HTTP_PORT: 8080
      JMX_EXPORTER_CONFIG_FILE: kafka.yml

  prometheus:
    build: ./tools/prometheus
    container_name: prometheus
    networks:
      - crypto-network
    ports:
      - 9090:9090

  spark-master:
    image: bde2020/spark-master:2.2.2-hadoop2.7
    container_name: spark-master
    networks:
      - crypto-network
    volumes:
      - ./connectors:/connectors
      - ./:/scripts/
    ports:
      - 8082:8080
      - 7077:7077
    environment:
      - INIT_DAEMON_STEP=setup_spark

  spark-worker-1:
    image: bde2020/spark-worker:2.2.2-hadoop2.7
    container_name: spark-worker-1
    networks:
      - crypto-network
    depends_on:
      - spark-master
    ports:
      - 8083:8081
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"

  producer:
    build:
      context: .
      dockerfile: ./Dockerfile.producer
    container_name: producer
    depends_on:
      - kafka
    networks:
      - crypto-network

networks:
  crypto-network:
    driver: "bridge"
Enter fullscreen mode Exit fullscreen mode

As the Docker-compose contains an additional Producer service when we run the following, we can test our Kafka topic messages per minute by checking the <IP_LOCAL>:9000:

docker-compose up
Enter fullscreen mode Exit fullscreen mode

Here the output of the Prometheus UI will be as follows:

Prometheus Web UI Example

Last Words

This was a demo project that I made for studying Watermarks and Windowing functions in Streaming Data Processing. Therefore I needed to create a custom producer for Kafka, and consume those using Spark Structured Streaming. Although the development phase of the project was super fun, I also enjoyed creating this pretty long Docker-compose example.

In case more detail is needed, I am sharing the Github repository.

Top comments (0)