DEV Community

Carlota Soto for CrateDB

Posted on • Edited on

Build a data ingestion pipeline using Kafka, Flink, and CrateDB

This tutorial explains how to build a data ingestion pipeline using three open-source tools: Apache Kafka, Flink, and CrateDB.

Kafka is the front line of the stack, used to queue messages received from (for example) IoT sensors and devices. CrateDB will query and store the data. And between CrateDB and Kafka, it lives Apache Flink, a data processing engine. These three tools are all distributed systems that provide elastic scaling, fault tolerance, high-throughput, and low-latency performance via parallel processing.

This tutorial uses Kafka 6.1.1, Apache Flink 1.12, and CrateDB 4.5.0 on a macOS system.

Preliminary notes

  1. This guide references the example job published at https://github.com/crate/cratedb-flink-jobs. This example job brings together three software components: the Kafka connector for Flink, the JDBC connector for Flink, and the CrateDB JDBC driver. It uses a sample dataset including a subset of trip records completed in NYC taxis during 2017. Explore the repository for more insights into it.

  2. This guide uses Docker with Docker Compose. Please, make sure you're using recent versions, like Docker 20.10.5 and Docker Compose 1.29.0, so the infrastructure sandbox can be invoked successfully.

  3. Apart from Docker, this guide assumes you have Git, Homebrew, and Wget installed. If you don't have/don't want to install these components in your machine, you can always use alternatives, but the steps on this guide will follow more smoothly if you have them installed.

  4. CrateDB uses a mmapfs directory by default to store its indices. In a real use case, the default operating system limits on mmap counts are likely to be too low, which may result in out of memory exceptions. If you have to, you can increase the limits by running the command sudo sysctl -w vm.max_map_count=262144. To set this value permanently, update the vm.max_map_count setting in /etc/sysctl.conf. You can verify it after rebooting by running sysctl vm.max_map_count.

  5. This guide is built so you can run the complete suite of technologies locally. It does not address aspects like high-availability, fault-tolerance, or scalability.

Starting Kafka, Flink and CrateDB

The simplest possible way to setup and start all software components at once is to use Docker with Docker Compose. To do so, first set up a sandbox directory and navigate to it with your terminal:



# Set up a directory to host your containers
mkdir -p sandbox/kafka-flink-cratedb

# Navigate to it
cd ./sandbox/kafka-flink-cratedb/


Enter fullscreen mode Exit fullscreen mode

Next, start Kafka, Flink and CrateDB through docker-compose:



# Clone the repository cratedb-examples
git clone https://github.com/crate/cratedb-examples

# Navigate to the kafka-flink directory 
cd cratedb-examples/spikes/kafka-flink

# Start the containers 
docker compose up


Enter fullscreen mode Exit fullscreen mode

Once everything is ready, you will be able to access both Flink and CrateDB through their graphical user interfaces.

To access the CrateDB UI, open http://localhost:4200/ in your browser:

Alt Text

To access the Apache Flink UI, open http://localhost:8081/ in your browser:

Alt Text

Communicating with Kafka through Kafkacat

To communicate with Kafka, you can use Kafkacat, a command-line tool that allows to produce and consume Kafka messages using a very simple syntax. It also allows you to view the topics' metadata.

You can install Kafkacat through Homebrew. To do so, use:



brew install kafkacat


Enter fullscreen mode Exit fullscreen mode

For some quick examples of how to use Kafkacat, take a look at the commands below. If you want to dive deeper, this post summarizes the basic Kafkacat parameters very well.



# List all the Kafka topics and partitions 
kafkacat -L -b localhost:9094

# Write a message
export MESSAGE="The quick brown fox jumps over the lazy dog"

# Publish message to the topic test
echo $MESSAGE | kafkacat -b localhost:9094 -P -t test

# Consume messages from the topic test
kafkacat -b localhost:9094 -C -t test -o end


Enter fullscreen mode Exit fullscreen mode

Get some data

As we mentioned in the preliminary notes, in this guide we will be using a small subset (with 5000 records) of a public dataset that includes information about NYC taxi trips published by the NYC Taxi and Limousine commission.

To obtain the sample data, use:



# Acquire the NYC taxi dataset in JSON format
wget https://gist.github.com/kovrus/328ba1b041dfbd89e55967291ba6e074/raw/7818724cb64a5d283db7f815737c9e198a22bee4/nyc-yellow-taxi-2017.tar.gz

# Extract archive
tar -xvf nyc-yellow-taxi-2017.tar.gz

# Create a subset of the data (5000 records) for the purpose of this tutorial

cat nyc-yellow-taxi-2017.json | head -n 5000 > nyc-yellow-taxi-2017-subset.json


Enter fullscreen mode Exit fullscreen mode

Create a Kafka topic

To create the Kafka topic that will eventually ingest the data we just downloaded, run the code below. It creates a topic called "rides".



docker run -it --network=scada-demo confluentinc/cp-kafka:6.1.1 \
    kafka-topics --bootstrap-server kafka-broker:9092 --create --replication-factor 1 --partitions 1 --topic rides


Enter fullscreen mode Exit fullscreen mode

Invoking the Flink job

Before setting up the Flink job, we need to create a table in CrateDB to host the data. We will call it taxi_rides. Use this command:



docker run -it --network=scada-demo westonsteimel/httpie \
    http "cratedb:4200/_sql?pretty" stmt='CREATE TABLE "taxi_rides" ("payload" OBJECT(DYNAMIC))'


Enter fullscreen mode Exit fullscreen mode

The table will show in the CrateDB UI:

Alt Text

Now, it's time to invoke the Flink job that subcribes to the topic rides. To do so, follow these steps:



# Acquire Flink job
VERSION=0.2
JARFILE="cratedb-flink-jobs-${VERSION}.jar"
wget https://github.com/crate/cratedb-flink-jobs/releases/download/${VERSION}/${JARFILE}

# Invoke Flink job
docker run -it --network=scada-demo --volume=$(pwd)/${JARFILE}:/${JARFILE} flink:1.12 \
    flink run --jobmanager=flink-jobmanager:8081 /${JARFILE} \
        --kafka.servers kafka-broker:9092 \
        --kafka.topic rides \
        --crate.hosts cratedb:5432 \
        --crate.table taxi_rides


Enter fullscreen mode Exit fullscreen mode

The running job will show in the Flink UI:

Alt Text

Publish data to the Kafka topic

Now that we have everything set up, you are ready to publish the NYC taxi data into Kafka. Run:



# Subscribe to the topic to receive messages
docker run -it --network=scada-demo edenhill/kafkacat:1.6.0 kafkacat -b kafka-broker -C -t rides -o end

# Publish data to the Kafka topic
cat nyc-yellow-taxi-2017-subset.json | docker run -i --network=scada-demo confluentinc/cp-kafka:6.1.1 \
    kafka-console-producer --bootstrap-server kafka-broker:9092 --topic rides


Enter fullscreen mode Exit fullscreen mode

You are all set! You can now query the data from the CrateDB UI. Keeping it simple:

Alt Text

Hope this was useful 🤙 See you around!


Questions? Ask the Crate.io engineers through our community page 💫

Top comments (4)

Collapse
 
carlotasoto profile image
Carlota Soto

Let me please forward you to this repo then: github.com/crate/cratedb-examples/...

The readmes include these instructions with a bit more in-depth. You can also create a detailed issue on this repo for us to look up :) Thanks a lot in advance!

Collapse
 
jainhemant163 profile image
Avid_Learner

ERROR: The Compose file './docker-compose.yml' is invalid because:
Unsupported config option for services: 'kafka-zookeeper'
Unsupported config option for networks: 'scada-demo'

Collapse
 
carlotasoto profile image
Carlota Soto • Edited

Hi @jainhemant163 , thanks for trying it out!

You might need to update your docker-composeversion. If you run docker-compose --versionand it shows <1.27.0, you'll have to upgrade to at least that version. Is that your case?

Collapse
 
jainhemant163 profile image
Avid_Learner

Hii @carlos soto,
I have tried to replicate the same steps on my machine and even AWS EC2 Instances, but nothing worked out as of now.
Can you suggest me the which OS to use, and docker and docker-compose version to use in to replicate the same steps.