DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

A quick and dirty way to monitor data arriving on Kafka

I’ve been poking around recently with capturing Wi-Fi packet data and streaming it into Apache Kafka, from where I’m processing and analysing it. Kafka itself is rock-solid - because I’m using ☁️Confluent Cloud and someone else worries about provisioning it, scaling it, and keeping it running for me. But whilst Kafka works just great, my side of the setup—tshark running on a Raspberry Pi—is less than stable. For whatever reason it sometimes stalls and I have to restart the Raspberry Pi and restart the capture process.

I’m sure there’s a reason and I’m sure if I spent long enough looking and debugging I’d find it - but for now the time invested in this is better spent just knowing when I need to go and bounce it. I don’t mind losing a few minutes of data, or even a few hours, but this kind of gap of a week makes me sad:

kib01

So what I want is a very simple way to look at the most recent timestamp on a Kafka message, and send me an alert if it’s outside a threshold.

alert01

Herewith a very dirty hacky way to do just this…

Parts List

  • kafkacat polls the most recent message on the topic

  • jq extracts the timestamp

  • bash compares the timestamp to our given threshold and also handles errors in calling kafkacat

  • curl makes a REST call to Telegram to send the status message

Setting up Telegram

Create a new Telegram bot per the instructions.

telegram01

Make a note of the access token because you’ll need this later.

What do all hack projects need? A geeky avatar of course. It may be a dirty hack, but it’s going to be a well-presented one ;-)

telegram02

To get our bot to work we need to start it first, so click on its link from Botfather (or just start a Telegram chat with it directly). This will send it the /start command:

telegram04

Getting the recipient’s Chat ID

We need to get the id of the recipient of messages that our bot is going to send. This can either be a direct message to you, or you can set up a group (which other real people can be members of an see the same message from the bot). If you want to use a group then make sure you start the bot (/start per above) and then invite it to the group. Using a group is also more convenient because you could create multiple alert routes with a single bot, instead of having to create a new bot for each purpose.

telegram03

Having started the bot, and optionally added it to a group and sent a message to the group, now invoke the getUpdates API:

curl -s https://api.telegram.org/bot<bot access token>/getUpdates
Enter fullscreen mode Exit fullscreen mode

Replace <bot access token> with (you guessed it!) the bot access token that the Botfather gave you above. The API is a bit funky here - note that the bot is hardcoded part of the URL and should not be changed - you append your bot access token to this. So if Botfather gave you an access token of 99999:XXXXX you would invoke:

curl -s https://api.telegram.org/bot99999:XXXXX/getUpdates
Enter fullscreen mode Exit fullscreen mode

From this you’ll get one, or more, messages that the bot has received. This might just be the single /start that you invoked, or it could also be group messages if you’ve added it to one. Regardless, identify the message instance corresponding to the recipient that you want for the bot and make a note of the chat.id value. Here it’s -468250841:

{
  "message_id": 3,
  "from": {
    "id": 218419044,
    "is_bot": false,
    "first_name": "Robin",
    "last_name": "Moffatt",
    "username": "rmoff",
    "language_code": "en"
  },
  "chat": {
    "id": -468250841,
    "title": "pcap ingest monitoring",
    "type": "group",
    "all_members_are_administrators": true
  },
  "date": 1586894082,
  "group_chat_created": true
}

Enter fullscreen mode Exit fullscreen mode

You can use jq to return just the chat ID and associated recipient information too. Here it shows the group chat message quoted above, plus the DM that I sent the bot previously (/start).

$ curl -s https://api.telegram.org/bot99999:XXXXX/getUpdates | jq -c '.result[].message.chat | [.id , .title, .username]'

[218419044,null,"rmoff"]
[-468250841,"pcap ingest monitoring",null]
Enter fullscreen mode Exit fullscreen mode

However you do it, you should now have:

  • Bot access token (e.g. 99999:XXXXX)

  • Chat ID (e.g. -468250841)

Sending a test message

Let’s send a test message! We’ll use the sendMessage API to do this:

curl -s -X POST https://api.telegram.org/bot<BOT ACCESS TOKEN>/sendMessage \
    -d chat_id=<CHAT ID> \
    -d text="Did you ever play tic-tac-toe?"
Enter fullscreen mode Exit fullscreen mode

and as if by magic…

It works!

Getting the latest message from Kafka

As long-time readers of my blog will know, one of my favourite tools in my Kafka toolbox is kafkacat. Here we’ll not assume that it’s installed, and instead run it using Docker. We’re also going to connect to Confluent Cloud.

docker run --rm edenhill/kafkacat:1.5.0 kafkacat \
  -b $CCLOUD_BROKER_HOST \
  -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \\
  -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
  -X api.version.request=true \
  -C -c1 -o -1 -u -f %T -t pcap 
Enter fullscreen mode Exit fullscreen mode

Aside from the broker details (-b) and various authentication and security settings (all the -X parameters), what we’re doing here is :

  • -C run as a consumer, and consume one message (-c1)

  • Read from the penultimate offset -o 1

  • -u don’t buffer output

  • -t read from the pcap topic

  • -f %T tells kafkacat just to return the timestamp from the Kafka message’s metadata

We’re going to compare this timestamp to our threshold, which is ten minutes ago, from date:

docker run --rm ubuntu date --date '-10 min' "+%s"
1586992072
Enter fullscreen mode Exit fullscreen mode

WHY would you invoke date using docker? Because date is one of those delightful *nix commands which has a different implementation across Linux, MacOS etc and is completely incompatible in options - so this way at least it works. I told you this was a dirty hack…

Note that the timestamp that’s returned from kafkacat is the unix epoch in milliseconds , whilst date is in seconds. No problem. Let’s continue this dirty hack by just truncating the last three digits!

➜ echo 1586993170473
1586993170473

➜ echo 1586993170473|sed 's/[0-9][0-9][0-9]$//g'
1586993170
Enter fullscreen mode Exit fullscreen mode

So we can get the timestamp of the latest Kafka message, and the local timestamp (minus a threshold) - now to compare them. That’s easy enough with a bit of shell scripting. First we store the Kafka timestamp in a variable:

latest_ts=$(docker run --rm edenhill/kafkacat:1.5.0 kafkacat -b $CCLOUD_BROKER_HOST -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" -X api.version.request=true -C -c1 -o -1 -t pcap -u -f %T| sed 's/[0-9][0-9][0-9]$//g' )
Enter fullscreen mode Exit fullscreen mode

Then we store the timestamp against which we want to compare it:

ten_minutes_ago=$(docker run --rm ubuntu date --date '-10 min' "+%s")
Enter fullscreen mode Exit fullscreen mode

Finally we compare the two:

if [$latest_ts -lt $ten_minutes_ago]; then 
        echo "Last Kafka message was received over ten minutes ago"
fi
Enter fullscreen mode Exit fullscreen mode

Putting it all together

Now we take the logic from above to determine if Kafka ingest has stalled, and combine it with the Telegram REST API that we explored above.

telegram06

#!/bin/bash
#
# @rmoff 16 April 2020
#
# -------------

# .env should look like: 
# CCLOUD_BROKER_HOST=xxxxxx
# CCLOUD_API_KEY=xxxxxx
# CCLOUD_API_SECRET=xxxxxx
# TELEGRAM_BOT_TOKEN=xxxx

source .env
CHAT_ID=-468250841

#---------

echo 'Now : ' $(docker run --rm ubuntu date)
ten_minutes_ago=$(docker run --rm ubuntu date --date '-10 min' "+%s")
echo 'Ten minutes ago : ' $(docker run --rm ubuntu date -d @$ten_minutes_ago)

latest_ts=$(docker run --rm edenhill/kafkacat:1.5.0 kafkacat -b $CCLOUD_BROKER_HOST \
    -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
    -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
    -X api.version.request=true \
    -C -c1 -o -1 -t my_kafka_topic -u -f %T| sed 's/[0-9][0-9][0-9]$//g' )

if [-z $latest_ts]; then
    echo "TS is empty"
    echo '{"chat_id": "'$CHAT_ID'", "text": "❌my_kafka_topic ingest check failed. Latest ingest time is empty", "disable_notification": false}"' |\
    curl -s -X POST \
         -H 'Content-Type: application/json' \
         -d @- \
         https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendMessage | jq '.'
else
    echo 'Latest timestamp : ' $(docker run --rm ubuntu date -d @$latest_ts)

    if [$latest_ts -lt $ten_minutes_ago]; then 
        echo "Ingest has stalled"
        echo '{"chat_id": "'$CHAT_ID'", "text": "❌my_kafka_topic ingest has stalled. Latest ingest time is ' $(docker run --rm ubuntu date -d @$latest_ts)'", "disable_notification": false}"' |\
        curl -s -X POST \
             -H 'Content-Type: application/json' \
             -d @- \
             https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendMessage | jq '.'
    else
        echo '{"chat_id": "'$CHAT_ID'", "text": "✅my_kafka_topic ingest looks good. Latest ingest time is ' $(docker run --rm ubuntu date -d @$latest_ts)'", "disable_notification": true}"' |\
        curl -s -X POST \
             -H 'Content-Type: application/json' \
             -d @- \
             https://api.telegram.org/bot$TELEGRAM_BOT_TOKEN/sendMessage | jq '.'
    fi
fi
Enter fullscreen mode Exit fullscreen mode

Complaints?

Please send all complaints to /dev/null ;-)

Top comments (0)