loading...
Microsoft Azure

Tutorial: How to Ingest data from Kafka into Azure Data Explorer

abhirockzz profile image Abhishek Gupta Updated on ・10 min read

This blog will cover data ingestion from Kafka to Azure Data Explorer (Kusto) using Kafka Connect.

Azure Data Explorer is a fast and scalable data exploration service that lets you collect, store, and analyze large volumes of data from any diverse sources, such as websites, applications, IoT devices, and more. Kafka Connect platform allows you to stream data between Apache Kafka and external systems in a scalable and reliable manner. The Kafka Connect Sink connector for Azure Data Explorer allows you to move data in Kafka topics to Azure Data Explorer tables which you can later query and analyze.

Here is the GitHub repo for this blog - https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial

The goal is to get started quickly, so we will keep things simple and Docker-ize everything! This includes Kafka, Zookeeper, Kafka Connect worker and the event generator application - defined in docker-compose.yaml

Over the course of this tutorial, you will:

  • Get an overview of the individual components
  • Configure and setup Azure Data Explorer and install the connector
  • Run the end to end demo

If you're looking for a comprehensive coverage of data ingestion with Azure Data Explorer, Kafka and Kubernetes and like a hands-on learning experience, please check out this workshop! https://github.com/Azure/azure-kusto-labs

Pre-requisites

Overview

As previously mentioned, all the components are defined inside docker-compose.yaml file. Let's go over it bit by bit:

The Kafka and Zookeeper part is pretty straightforward - using the debezium images

  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

The events-producer service is a simple application that sends Storm Events data to a Kafka topic. Storm Events data is a canonical example used throughout the Azure Data Explorer documentation (for example, check this Quickstart and the complete CSV file). The producer app uses the original CSV, but only includes selected fields (such as start and end time, state, source etc.) rather than the entire row (which has more than 20 columns). Here is the sample data:

2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9487,NEW YORK,Winter Weather,Department of Highways
...

The service component in Docker Compose is defined as such:

  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

The sink connector is where a lot of the magic happens! Let's explore it:

Kafka Sink Connector for Azure Data Explorer

Here is the kusto-connect service in docker compose file:

  kusto-connect:
    build:
      context: ./connector
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses

The container is built from a Dockerfile - this makes it easier for you to run it locally as opposed to pulling it from an external Docker registry

FROM debezium/connect:1.2
WORKDIR $KAFKA_HOME/connect
ARG KUSTO_KAFKA_SINK_VERSION
RUN curl -L -O https://github.com/Azure/kafka-sink-azure-kusto/releases/download/v$KUSTO_KAFKA_SINK_VERSION/kafka-sink-azure-kusto-$KUSTO_KAFKA_SINK_VERSION-jar-with-dependencies.jar

It's based on top of the Debezium Kafka Connect image. Simply download the Kusto Connector JAR (version 1.0.1 at the time of writing) and place it in the Kafka Connect plugins directory. That's it!

Here is what the sink connector configuration file looks like:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 50000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

The process of loading/importing data into a table in Azure Data Explorer is known as Ingestion. This is how the the connector operates as well.

Azure Data Explore documentation

Behind the scenes, it uses the following modules in the Java SDK for Azure Data Explorer

  • data: to connect, issue (control) commands and query data
  • ingest: to ingest data

At the time of writing, the data formats supported by the connector are: csv, json, txt, avro, apacheAvro, tsv, scsv, sohsv and psv. Data in the Kafka topics is written to files on disk. These are then sent to Azure Data Explorer based on the following connector configurations - when file has reached flush.size.bytes or the flush.interval.ms interval has passed.

The only exception to the above mechanism is the avro and apacheAvro data types which are handled as byte arrays

By "sent to Azure Data Explorer", what I really mean that the file is queued for Ingestion (using IngestClient.ingestFromFile)

Alright, lots of theory so far...

.. let's try it out!

Clone this repo:

git clone https://github.com/abhirockzz/kafka-kusto-ingestion-tutorial
cd kafka-kusto-ingestion-tutorial

Start off creating an Azure Data Explorer cluster and database using Azure Portal, Azure CLI or any of the client SDKs such as Python.

Once that's done, create a table (Storms) and respective mapping (Storms_CSV_Mapping):

.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)

.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'

Start containers and install the connector

Before installing the connector, we need to create a Service Principal in order for the connector to authenticate and connect to Azure Data Explorer service.

Use az ad sp create-for-rbac command:

az ad sp create-for-rbac -n "kusto-sp"

You will get a JSON response as such - please note down the appId, password and tenant as you will be using them in subsequent steps

{
  "appId": "fe7280c7-5705-4789-b17f-71a472340429",
  "displayName": "kusto-sp",
  "name": "http://kusto-sp",
  "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
  "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
}

Add permissions to your database

Provide appropriate role to the Service principal you just created. To assign the admin role, follow this guide to use the Azure portal or use the following command in your Data Explorer cluster

.add database <database name> admins  ('aadapp=<service principal AppID>;<service principal TenantID>') 'AAD App'

Start the containers:

docker-compose up

The producer application will start sending events to the storm-events topic. You should see logs similar to:

....
events-producer_1  | sent message to partition 0 offset 0
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
events-producer_1  | 
events-producer_1  | sent message to partition 0 offset 1
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
events-producer_1  | 
events-producer_1  | sent message to partition 0 offset 2
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23357,WISCONSIN,Winter Storm,COOP Observer
events-producer_1  | 
events-producer_1  | sent message to partition 0 offset 3
events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9494,NEW YORK,Winter Weather,Department of Highways
events-producer_1  | 
events-producer_1  | sent message to partition 0 offset 4
events-producer_1  | 2020/08/20 16:51:35 event  2007-01-01 00:00:00.0000000,2007-01-01 06:00:00.0000000,9488,NEW YORK,Winter Weather,Department of Highways
....

We can now install the sink connector to consume these events and ingest them into Azure Data Explorer

Replace the values for following attributes in adx-sink-config.json: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (the database name) and kusto.url

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 50000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

In a different terminnal, keep a track of the connector service logs:

docker-compose logs -f | grep kusto-connect

Install the connector:

curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors

//check status
curl http://localhost:8083/connectors/storm/status

The connector should spring into action. Meanwhile in the other terminal, you should see logs similar to:

kusto-connect_1    | INFO   ||  Refreshing Ingestion Resources   [com.microsoft.azure.kusto.ingest.ResourceManager]
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_0.csv.gz) of size (9192) at current offset (93)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 1: {storm-events-0=OffsetAndMetadata{offset=94, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
ct.runtime.WorkerSinkTask]
kusto-connect_1    | INFO   ||  Kusto ingestion: file (/tmp/kusto-sink-connector-0a8a9fa2-9e4b-414d-bae1-5d01f3969522/kafka_storm-events_0_94.csv.gz) of size (1864) at current offset (111)   [com.microsoft.azure.kusto.kafka.connect.sink.TopicPartitionWriter]
kusto-connect_1    | INFO   ||  WorkerSinkTask{id=storm-0} Committing offsets asynchronously using sequence number 2: {storm-events-0=OffsetAndMetadata{offset=112, leaderEpoch=null, metadata=''}}   [org.apache.kafka.connect.runtime.WorkerSinkTask]
....

Wait for sometime before data ends up in the Storms table. To confirm, check the row count and confirm that there are no failures in the ingestion process:

Storms | count

. show ingestion failures

Once there is some data, try out a few queries. To see all the records:

Storms

Use where and project to filter specific data

Storms
| where EventType == 'Drought' and State == 'TEXAS'
| project StartTime, EndTime, Source, EventId

Use the summarize operator

Storms
| summarize event_count=count() by State
| where event_count > 10
| project State, event_count
| render columnchart

These are just few examples. Dig into the Kusto Query Language documentation or explore tutorials about how to ingest JSON formatted sample data into Azure Data Explorer, using scalar operators, timecharts etc.

If you want to re-start from scratch, simply stop the containers (docker-compose down -v), delete (.drop table Storms) and re-create the Storms table (along with the mapping) and re-start containers (docker-compose up)

Clean up

To delete the Azure Data Explorer cluster/database, use az cluster delete or az kusto database delete

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

That's a wrap!

I hope this helps you get started building data ingestion pipelines from Kafka to Azure Data Explorer using the Kafka Connect sink connector. This is not the only way to ingest data into Azure Data Explorer (of course!). You're welcome to explore the documentation and explore other techniques such as One-click Ingestion, using Event Grid, IoT Hub and much more!

Until next time, Happy Exploring!

Posted on by:

abhirockzz profile

Abhishek Gupta

@abhirockzz

Currently working with Kafka, Databases, Azure, Kubernetes and related open source projects | Confluent Community Catalyst (for Kafka)

Microsoft Azure

Any language. Any platform.

Discussion

markdown guide