This tutorial explains how to build a data ingestion pipeline using three open-source tools: Apache Kafka, Flink, and CrateDB.
Kafka is the front line of the stack, used to queue messages received from (for example) IoT sensors and devices. CrateDB will query and store the data. And between CrateDB and Kafka, it lives Apache Flink, a data processing engine. These three tools are all distributed systems that provide elastic scaling, fault tolerance, high-throughput, and low-latency performance via parallel processing.
This tutorial uses Kafka 6.1.1, Apache Flink 1.12, and CrateDB 4.5.0 on a macOS system.
Preliminary notes
This guide references the example job published at https://github.com/crate/cratedb-flink-jobs. This example job brings together three software components: the Kafka connector for Flink, the JDBC connector for Flink, and the CrateDB JDBC driver. It uses a sample dataset including a subset of trip records completed in NYC taxis during 2017. Explore the repository for more insights into it.
This guide uses Docker with Docker Compose. Please, make sure you're using recent versions, like Docker 20.10.5 and Docker Compose 1.29.0, so the infrastructure sandbox can be invoked successfully.
Apart from Docker, this guide assumes you have Git, Homebrew, and Wget installed. If you don't have/don't want to install these components in your machine, you can always use alternatives, but the steps on this guide will follow more smoothly if you have them installed.
CrateDB uses a
mmapfs
directory by default to store its indices. In a real use case, the default operating system limits onmmap
counts are likely to be too low, which may result in out of memory exceptions. If you have to, you can increase the limits by running the commandsudo sysctl -w vm.max_map_count=262144
. To set this value permanently, update thevm.max_map_count
setting in/etc/sysctl.conf
. You can verify it after rebooting by runningsysctl vm.max_map_count
.This guide is built so you can run the complete suite of technologies locally. It does not address aspects like high-availability, fault-tolerance, or scalability.
Starting Kafka, Flink and CrateDB
The simplest possible way to setup and start all software components at once is to use Docker with Docker Compose. To do so, first set up a sandbox directory and navigate to it with your terminal:
# Set up a directory to host your containers
mkdir -p sandbox/kafka-flink-cratedb
# Navigate to it
cd ./sandbox/kafka-flink-cratedb/
Next, start Kafka, Flink and CrateDB through docker-compose
:
# Clone the repository cratedb-examples
git clone https://github.com/crate/cratedb-examples
# Navigate to the kafka-flink directory
cd cratedb-examples/spikes/kafka-flink
# Start the containers
docker compose up
Once everything is ready, you will be able to access both Flink and CrateDB through their graphical user interfaces.
To access the CrateDB UI, open http://localhost:4200/ in your browser:
To access the Apache Flink UI, open http://localhost:8081/ in your browser:
Communicating with Kafka through Kafkacat
To communicate with Kafka, you can use Kafkacat, a command-line tool that allows to produce and consume Kafka messages using a very simple syntax. It also allows you to view the topics' metadata.
You can install Kafkacat through Homebrew. To do so, use:
brew install kafkacat
For some quick examples of how to use Kafkacat, take a look at the commands below. If you want to dive deeper, this post summarizes the basic Kafkacat parameters very well.
# List all the Kafka topics and partitions
kafkacat -L -b localhost:9094
# Write a message
export MESSAGE="The quick brown fox jumps over the lazy dog"
# Publish message to the topic test
echo $MESSAGE | kafkacat -b localhost:9094 -P -t test
# Consume messages from the topic test
kafkacat -b localhost:9094 -C -t test -o end
Get some data
As we mentioned in the preliminary notes, in this guide we will be using a small subset (with 5000 records) of a public dataset that includes information about NYC taxi trips published by the NYC Taxi and Limousine commission.
To obtain the sample data, use:
# Acquire the NYC taxi dataset in JSON format
wget https://gist.github.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz
# Extract archive
tar -xvf nyc-yellow-taxi-2017.tar.gz
# Create a subset of the data (5000 records) for the purpose of this tutorial
cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.json
Create a Kafka topic
To create the Kafka topic that will eventually ingest the data we just downloaded, run the code below. It creates a topic called "rides".
docker run -it --network=scada-demo confluentinc/cp-kafka:6.1.1 \
kafka-topics --bootstrap-server kafka-broker:9092 --create --replication-factor 1 --partitions 1 --topic rides
Invoking the Flink job
Before setting up the Flink job, we need to create a table in CrateDB to host the data. We will call it taxi_rides
. Use this command:
docker run -it --network=scada-demo westonsteimel/httpie \
http "cratedb:4200/_sql?pretty" stmt='CREATE TABLE "taxi_rides" ("payload" OBJECT(DYNAMIC))'
The table will show in the CrateDB UI:
Now, it's time to invoke the Flink job that subcribes to the topic rides
. To do so, follow these steps:
# Acquire Flink job
VERSION=0.2
JARFILE="cratedb-flink-jobs-${VERSION}.jar"
wget https://github.com/crate/cratedb-flink-jobs/releases/download/${VERSION}/${JARFILE}
# Invoke Flink job
docker run -it --network=scada-demo --volume=$(pwd)/${JARFILE}:/${JARFILE} flink:1.12 \
flink run --jobmanager=flink-jobmanager:8081 /${JARFILE} \
--kafka.servers kafka-broker:9092 \
--kafka.topic rides \
--crate.hosts cratedb:5432 \
--crate.table taxi_rides
The running job will show in the Flink UI:
Publish data to the Kafka topic
Now that we have everything set up, you are ready to publish the NYC taxi data into Kafka. Run:
# Subscribe to the topic to receive messages
docker run -it --network=scada-demo edenhill/kafkacat:1.6.0 kafkacat -b kafka-broker -C -t rides -o end
# Publish data to the Kafka topic
cat nyc-yellow-taxi-2017-subset.json | docker run -i --network=scada-demo confluentinc/cp-kafka:6.1.1 \
kafka-console-producer --bootstrap-server kafka-broker:9092 --topic rides
You are all set! You can now query the data from the CrateDB UI. Keeping it simple:
Hope this was useful 🤙 See you around!
Questions? Ask the Crate.io engineers through our community page 💫
Top comments (4)
Let me please forward you to this repo then: github.com/crate/cratedb-examples/...
The readmes include these instructions with a bit more in-depth. You can also create a detailed issue on this repo for us to look up :) Thanks a lot in advance!
ERROR: The Compose file './docker-compose.yml' is invalid because:
Unsupported config option for services: 'kafka-zookeeper'
Unsupported config option for networks: 'scada-demo'
Hi @jainhemant163 , thanks for trying it out!
You might need to update your
docker-compose
version. If you rundocker-compose --version
and it shows <1.27.0, you'll have to upgrade to at least that version. Is that your case?Hii @carlos soto,
I have tried to replicate the same steps on my machine and even AWS EC2 Instances, but nothing worked out as of now.
Can you suggest me the which OS to use, and docker and docker-compose version to use in to replicate the same steps.