This is the fourth and last article of the series "Streaming data into Kafka" series. In the first three articles, we saw how it's fairly easy to use Kafka Connect to load records from CSV, XML and JSON files into Apache Kafka without writing a single line of code. For doing this, we have used the Kafka Connect FilePulse connector which packs with a lot of nice features to parse and transform data.
- Streaming data into Kafka S01/E01 - Loading CSV file
- Streaming data into Kafka S01/E02 - Loading XML file
- Streaming data into Kafka S01/E03 - Loading JSON file
In this last article, we are going to see how to parse unstructured logs files from an NGINX web server into structured data fields.
Kafka Connect File Pulse connector
If you have already read the previous articles go directly to the next section (i.e Ingesting Data).
The Kafka Connect FilePulse connector is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).
For a broad overview of FilePulse, I suggest you read this article:
For more information, you can check-out the documentation here.
How to use the connector
The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.
$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
You can download the docker-compose.yml
file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.
$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on http://localhost:8083
.
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Note: You can also install the connector either from GitHub Releases Page or from Confluent Hub.
Starting NGINX
First, let's start an NGINX instance to serve a single HTML page, using Docker.
- Let's create some directories for the sake of this demo.
$ mkdir -p demo/content demo/logs
- ...and a simple HTML page called
ìndex.html
:
$ cat <<EOF > demo/content/index.html
<!DOCTYPE html>
<html>
<head>
<title>Hi!</title>
</head>
<body>
<h1>Hello World - Kafka Connect FilePulse</h1>
<strong>You can add a Star to this repository to support us! Thank You<a href="https://github.com/streamthoughts/kafka-connect-file-pulse">GitHub</a></strong>
</body>
</html>
EOF
- Then, start the NGINX web server, by running:
$ docker run --name nginx \
-p 8080:80 \
-v `pwd`/demo/content:/usr/share/nginx/html:ro -d nginx
- Check that the server is running properly:
$ curl -X GET http://localhost:8080
- Finally, and to simplify things for the rest of the article, we will redirect stderr and stdout of the container to the
./demo/logs/nginx.log
file.
$ docker logs -f nginx > ./demo/logs/nginx.log 2>&1 &
Ingesting Data
First, let's stop the container running Kafka Connect that was started using docker-compose.
$ docker stop connect && docker rm connect
Then, start a new one with a mounted volume for accessing the nginx.log file.
- Create a file to define the environment variables that must be set for Kafka Connect container.
$ cat <<EOF > connect-file-pulse-env.list
CONNECT_BOOTSTRAP_SERVERS=localhost:9092
CONNECT_REST_ADVERTISED_HOST_NAME=connect
CONNECT_REST_PORT=8083
CONNECT_GROUP_ID=compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://localhost:8081
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT=localhost:2181
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
EOF
- Start the container running the Kafka Connect FilePulse connector:
$ docker run -it \
--network=host \
--name=connect \
--env-file connect-file-pulse-env.list \
-v `pwd`/demo/logs:/tmp/connect-data \
streamthoughts/kafka-connect-file-pulse:latest
Then, create a new connector with the below configuration:
$ curl \
-sX PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-01/config \
-d '{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path": "/tmp/connect-data",
"fs.scan.interval.ms": "10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.log$",
"internal.kafka.reporter.bootstrap.servers": "localhost:9092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"offset.strategy": "name",
"read.max.wait.ms": "900000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"topic": "connect-file-pulse-nginx-raw-logs",
"tasks.max": 1
}' | jq
Finally, you can consume the Topic named connect-file-pulse-nginx-raw-logs
to verify that the connector has detected the .log file:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t connect-file-pulse-nginx-raw-logs \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"message": {
"string": "172.17.0.1 - - [05/Jan/2021:10:56:52 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
}
}
- Eventually, you can generate more access logs by running:
$ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done &
Note: In the example above, we have used kafkacat to consume the topics. The option -o-1
is used to only consume the latest message
Ok, let's take a moment to describe the configuration we've just used.
First, the connector will periodically scan the input directory /tmp/connect-data
that we set through the property fs.scan.directory.path
and will lookup for files matching the pattern .*\\.log$
.
Then, each file will be uniquely identified and tracked depending on the value of the offset.strategy
property. Here, the configuration specifies that a file must be identified by its name
. Alternatively, we could, for example, have chosen to use the inode
of the file as an identifier. Connect FilePulse supports multiple identifiers that can be combined (e.g: name+hash
).
In addition, the connector is configured to use the RowFileInputReader
(see: task.reader.class
) that allows creating one Kafka record per line.
One of the characteristics of the RowFileInputReader
is that will not immediately complete the processing after hitting the end of the file but will wait until reaching a timeout for more bytes to be written to the file. This behavior is configured through the read.max.wait.ms
property. Here we are waiting 15 minutes before finishing the file processing.
Parsing Data using Grok Expressions
So far, we have been able to read the NGINX logs continuously. Each time a new line is added to the file a new record is sent to Kafka containing a single text field called message
. But, it would be preferable to be able to parse each line to extract useful data and to produce structured messages into Kafka.
The Elastic/ELK Stack and in particular the Logstash solution has popularised the use of Grok expressions to parse and transform unstructured data into meaningful fields. Grok sits on top of Regular Expression (regex) to match relevant data using text patterns.
Connect FilePulse brings the power of Grok Expression directly to Kafka Connect with the GrokFilter
that under the hood uses the Joni library the Java port of Oniguruma regexp library. It also provides a lot of predefined and reusable grok patterns. See the complete list of patterns.
Let's define a custom grok pattern to match lines of the NGINX access log file.
$ cat <<EOF > nginx
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{DATA:request}\" %{INT:status} %{NUMBER:bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\"
EOF
Note: The syntax for a grok pattern is %{SYNTAX:SEMANTIC} or %{SYNTAX:SEMANTIC:TYPE}.
Then, we have to make the pattern available to the connector by copying the nginx
file, previously created, into the container :
$ docker exec -it connect mkdir -p /tmp/grok-patterns
$ docker cp nginx connect://tmp/grok-patterns/nginx
After that, we can create a new connector with the following configuration :
$ curl \
-sX PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-02/config \
-d '{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path": "/tmp/connect-data",
"fs.scan.interval.ms": "10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.log$",
"internal.kafka.reporter.bootstrap.servers": "localhost:9092",
"internal.kafka.reporter.topic": "connect-file-pulse-status",
"offset.strategy": "name",
"read.max.wait.ms": "120000",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"topic": "connect-file-pulse-nginx-parsed-logs",
"tasks.max": 1,
"filters": "ParseLog",
"filters.ParseLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
"filters.ParseLog.match": "%{NGINX_ACCESS}",
"filters.ParseLog.overwrite": "message",
"filters.ParseLog.source": "message",
"filters.ParseLog.ignoreFailure": "true",
"filters.ParseLog.patternsDir": "/tmp/grok-patterns"
}' | jq
Finally, let's consume the output Topic connect-file-pulse-nginx-parsed-logs to observe the extracted fields.
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t connect-file-pulse-nginx-parsed-logs \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"message": {
"string": "172.17.0.1 - - [05/Jan/2021:13:14:54 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
},
"remote_addr": {
"string": "172.17.0.1"
},
"remote_user": {
"string": "-"
},
"time_local": {
"string": "05/Jan/2021:13:14:54 +0000"
},
"request": {
"string": "GET / HTTP/1.1"
},
"status": {
"string": "200"
},
"bytes_sent": {
"string": "306"
},
"http_referer": {
"string": "-"
},
"http_user_agent": {
"string": "curl/7.58.0"
}
}
As previously, you can Eventually generate more access logs by running:
$ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done
Et voilà, It's as simple as that!
Using Grok Expression with Kafka Connect SMT
In the previous example, we use the processing filter chain feature provided by Connect FilePulse.
But, Kafka Connect already ships with a mechanism called Single Message Transforms (SMTs) that was added in Apache Kafka 0.10 (KIP-66). SMTs can be used to modify the data of each record that flow through Kafka Connect pipeline.
Good news ! We have externalized the work done with the GrokFilter
to a dedicated SMT called: Kafka Connect Grok Transformation.
Conclusion
We have seen in this article that it is fairly easy to continuously read and parse log files. The Connect File Pulse connector ships with the GrokFilter
to parse unstructured data using Grok expressions as you would have done with Logstash.
More generally, Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka. Please do not hesitate to share this article. If you like this project then add a ⭐ to the GitHub repository to support us. Thank you.
Top comments (2)
Hi Florian,
Really cool feature ! I have one question, is it possible to add a k/v during the parsing process or this need to be done after, separately ?
Thanks !
Hi,
If your need is to add a simple K/V field this can be done using the AppendFilter provided by FilePulse. FilePulse allows you to chain multiple processing filters similar to SMTs.
I hope I've answered to your question