<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Florian Hussonnois</title>
    <description>The latest articles on DEV Community by Florian Hussonnois (@fhussonnois).</description>
    <link>https://dev.to/fhussonnois</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F311682%2F3853263a-8226-4701-897d-056f88e70976.png</url>
      <title>DEV Community: Florian Hussonnois</title>
      <link>https://dev.to/fhussonnois</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/fhussonnois"/>
    <language>en</language>
    <item>
      <title>Streaming data into Kafka S01/E04 — Parsing log files using Grok Expressions</title>
      <dc:creator>Florian Hussonnois</dc:creator>
      <pubDate>Tue, 05 Jan 2021 14:36:21 +0000</pubDate>
      <link>https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e04-loading-log-files-using-grok-expression-59i2</link>
      <guid>https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e04-loading-log-files-using-grok-expression-59i2</guid>
      <description>&lt;p&gt;This is the fourth and last article of the series &lt;strong&gt;"Streaming data into Kafka"&lt;/strong&gt; series. In the first three articles, we saw how it's fairly easy to use Kafka Connect to load records from CSV, XML and JSON files into Apache Kafka without writing a single line of code. For doing this, we have used the &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse"&gt;Kafka Connect FilePulse connector&lt;/a&gt; which packs with a lot of nice features to parse and transform data. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/fhussonnois/streaming-csv-data-into-kafka-46a5"&gt;Streaming data into Kafka S01/E01 - Loading CSV file&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e02-loading-xml-file-529i"&gt;Streaming data into Kafka S01/E02 - Loading XML file&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e03-loading-json-file-3d76"&gt;Streaming data into Kafka S01/E03 - Loading JSON file&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In this last article, we are going to see how to parse unstructured logs files from an &lt;a href="https://www.nginx.com/"&gt;NGINX&lt;/a&gt; web server into structured data fields. &lt;/p&gt;

&lt;h1&gt;
  
  
  Kafka Connect File Pulse connector
&lt;/h1&gt;

&lt;p&gt;&lt;em&gt;If you have already read the previous articles go directly to the next section (i.e Ingesting Data).&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;
  &lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--y2qnAMdN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://github.com/streamthoughts/kafka-connect-file-pulse/raw/master/site/static/images/streamthoughts-connect-file-pule-logo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--y2qnAMdN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://github.com/streamthoughts/kafka-connect-file-pulse/raw/master/site/static/images/streamthoughts-connect-file-pule-logo.png"&gt;&lt;/a&gt;
&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse"&gt;Kafka Connect FilePulse connector&lt;/a&gt; is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).&lt;/p&gt;

&lt;p&gt;For a broad overview of FilePulse, I suggest you read this article: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://medium.com/streamthoughts/kafka-connect-filepulse-one-connector-to-ingest-them-all-faed018a725c"&gt;Kafka Connect FilePulse - One Connector to Ingest them All!&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For more information, you can check-out the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/"&gt;documentation here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to use the connector
&lt;/h2&gt;

&lt;p&gt;The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can download the &lt;code&gt;docker-compose.yml&lt;/code&gt; file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on &lt;code&gt;http://localhost:8083&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; localhost:8083/connector-plugins|jq &lt;span class="s1"&gt;'.[].class'&lt;/span&gt;|egrep FilePulse

&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: You can also install the connector either from &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse/releases"&gt;GitHub Releases Page&lt;/a&gt; or from &lt;a href="https://www.confluent.io/hub/streamthoughts/kafka-connect-file-pulse"&gt;Confluent Hub&lt;/a&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Starting NGINX
&lt;/h1&gt;

&lt;p&gt;First, let's start an &lt;a href="https://www.nginx.com/"&gt;NGINX&lt;/a&gt; instance to serve a single HTML page, using Docker.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Let's create some directories for the sake of this demo.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; demo/content demo/logs
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;...and a simple HTML page called &lt;code&gt;ìndex.html&lt;/code&gt;:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cat&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;EOF&lt;/span&gt;&lt;span class="sh"&gt; &amp;gt; demo/content/index.html
&amp;lt;!DOCTYPE html&amp;gt;
&amp;lt;html&amp;gt;
  &amp;lt;head&amp;gt;
    &amp;lt;title&amp;gt;Hi!&amp;lt;/title&amp;gt;
  &amp;lt;/head&amp;gt;
  &amp;lt;body&amp;gt;
    &amp;lt;h1&amp;gt;Hello World - Kafka Connect FilePulse&amp;lt;/h1&amp;gt;
    &amp;lt;strong&amp;gt;You can add a Star to this repository to support us! Thank You&amp;lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse"&amp;gt;GitHub&amp;lt;/a&amp;gt;&amp;lt;/strong&amp;gt;
  &amp;lt;/body&amp;gt;
&amp;lt;/html&amp;gt;
&lt;/span&gt;&lt;span class="no"&gt;EOF
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Then, start the NGINX web server, by running:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--name&lt;/span&gt; nginx &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-p&lt;/span&gt; 8080:80 &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-v&lt;/span&gt; &lt;span class="sb"&gt;`&lt;/span&gt;&lt;span class="nb"&gt;pwd&lt;/span&gt;&lt;span class="sb"&gt;`&lt;/span&gt;/demo/content:/usr/share/nginx/html:ro &lt;span class="nt"&gt;-d&lt;/span&gt; nginx
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Check that the server is running properly:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; GET http://localhost:8080
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Finally, and to simplify things for the rest of the article, we will redirect &lt;em&gt;stderr&lt;/em&gt; and &lt;em&gt;stdout&lt;/em&gt; of the container to the &lt;code&gt;./demo/logs/nginx.log&lt;/code&gt; file.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker logs &lt;span class="nt"&gt;-f&lt;/span&gt; nginx &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; ./demo/logs/nginx.log 2&amp;gt;&amp;amp;1 &amp;amp; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Ingesting Data
&lt;/h1&gt;

&lt;p&gt;First, let's stop the container running Kafka Connect that was started using docker-compose.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker stop connect &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; docker &lt;span class="nb"&gt;rm &lt;/span&gt;connect
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, start a new one with a mounted volume for accessing the &lt;em&gt;nginx.log&lt;/em&gt; file.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Create a file to define the environment variables that must be set for Kafka Connect container.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cat&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;EOF&lt;/span&gt;&lt;span class="sh"&gt; &amp;gt; connect-file-pulse-env.list
CONNECT_BOOTSTRAP_SERVERS=localhost:9092
CONNECT_REST_ADVERTISED_HOST_NAME=connect
CONNECT_REST_PORT=8083
CONNECT_GROUP_ID=compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://localhost:8081
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT=localhost:2181
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
&lt;/span&gt;&lt;span class="no"&gt;EOF
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Start the container running the Kafka Connect FilePulse connector:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;-it&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
 &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host &lt;span class="se"&gt;\&lt;/span&gt;
 &lt;span class="nt"&gt;--name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;connect &lt;span class="se"&gt;\&lt;/span&gt;
 &lt;span class="nt"&gt;--env-file&lt;/span&gt; connect-file-pulse-env.list &lt;span class="se"&gt;\&lt;/span&gt;
 &lt;span class="nt"&gt;-v&lt;/span&gt; &lt;span class="sb"&gt;`&lt;/span&gt;&lt;span class="nb"&gt;pwd&lt;/span&gt;&lt;span class="sb"&gt;`&lt;/span&gt;/demo/logs:/tmp/connect-data &lt;span class="se"&gt;\&lt;/span&gt;
 streamthoughts/kafka-connect-file-pulse:latest
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, create a new connector with the below configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-sX&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-log-filepulse-01/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
     "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
     "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
     "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
     "fs.scan.directory.path": "/tmp/connect-data",
     "fs.scan.interval.ms": "10000",
     "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
     "file.filter.regex.pattern":".*\\.log$",
     "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
     "internal.kafka.reporter.topic": "connect-file-pulse-status",
     "offset.strategy": "name",
     "read.max.wait.ms": "900000",
     "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
     "topic": "connect-file-pulse-nginx-raw-logs",
     "tasks.max": 1
}'&lt;/span&gt; | jq
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, you can consume the Topic named &lt;code&gt;connect-file-pulse-nginx-raw-logs&lt;/code&gt; to verify that the connector has detected the &lt;em&gt;.log&lt;/em&gt; file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; connect-file-pulse-nginx-raw-logs &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"172.17.0.1 - - [05/Jan/2021:10:56:52 +0000] &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;GET / HTTP/1.1&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt; 200 306 &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;-&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt; &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;curl/7.58.0&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt; &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;-&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Eventually, you can generate more access logs by running:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="k"&gt;for &lt;/span&gt;i &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;seq &lt;/span&gt;0 100&lt;span class="si"&gt;)&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do &lt;/span&gt;curl &lt;span class="nt"&gt;-sX&lt;/span&gt; GET http://localhost:8080 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;/dev/null&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="nb"&gt;sleep &lt;/span&gt;1 &lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;done&lt;/span&gt; &amp;amp;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Note: In the example above, we have used &lt;a href="https://github.com/edenhill/kafkacat"&gt;kafkacat&lt;/a&gt; to consume the topics. The option &lt;code&gt;-o-1&lt;/code&gt; is used to only consume the latest message&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Ok, let's take a moment to describe the configuration we've just used. &lt;/p&gt;

&lt;p&gt;First, the connector will periodically scan the input directory &lt;code&gt;/tmp/connect-data&lt;/code&gt; that we set through the property &lt;code&gt;fs.scan.directory.path&lt;/code&gt; and will lookup for files matching the pattern &lt;code&gt;.*\\.log$&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;Then, each file will be uniquely identified and tracked depending on the value of the &lt;code&gt;offset.strategy&lt;/code&gt; property. Here, the configuration specifies that a file must be identified by its &lt;code&gt;name&lt;/code&gt;. Alternatively, we could, for example, have chosen to use the &lt;code&gt;inode&lt;/code&gt; of the file as an identifier. Connect FilePulse supports multiple identifiers that can be combined (e.g: &lt;code&gt;name+hash&lt;/code&gt;).&lt;/p&gt;

&lt;p&gt;In addition, the connector is configured to use the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/file-readers/#rowfileinputreader-default"&gt;&lt;code&gt;RowFileInputReader&lt;/code&gt;&lt;/a&gt; (see: &lt;code&gt;task.reader.class&lt;/code&gt;) that allows creating one Kafka record per line. &lt;/p&gt;

&lt;p&gt;One of the characteristics of the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/file-readers/#rowfileinputreader-default"&gt;&lt;code&gt;RowFileInputReader&lt;/code&gt;&lt;/a&gt; is that will not immediately complete the processing after hitting the end of the file but will wait until reaching a timeout for more bytes to be written to the file. This behavior is configured through the &lt;code&gt;read.max.wait.ms&lt;/code&gt; property. Here we are waiting 15 minutes before finishing the file processing.&lt;/p&gt;

&lt;h1&gt;
  
  
  Parsing Data using Grok Expressions
&lt;/h1&gt;

&lt;p&gt;So far, we have been able to read the NGINX logs continuously. Each time a new line is added to the file a new record is sent to Kafka containing a single text field called &lt;code&gt;message&lt;/code&gt;. But, it would be preferable to be able to parse each line to extract useful data and to produce structured messages into Kafka.&lt;/p&gt;

&lt;p&gt;The &lt;a href="http://www.elasticsearch.org/overview/"&gt;Elastic/ELK Stack&lt;/a&gt; and in particular the &lt;a href="https://www.elastic.co/logstash"&gt;Logstash&lt;/a&gt; solution has popularised the use of Grok expressions to parse and transform unstructured data into meaningful fields. Grok sits on top of Regular Expression (regex) to match relevant data using text patterns. &lt;/p&gt;

&lt;p&gt;Connect FilePulse brings the power of Grok Expression directly to Kafka Connect with the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#grokfilter"&gt;&lt;code&gt;GrokFilter&lt;/code&gt;&lt;/a&gt; that under the hood uses the &lt;a href="https://github.com/jruby/joni"&gt;Joni&lt;/a&gt; library the Java port of &lt;a href="https://github.com/kkos/oniguruma"&gt;Oniguruma&lt;/a&gt; regexp library. It also provides a lot of predefined and reusable grok patterns. See the complete list of patterns.&lt;/p&gt;

&lt;p&gt;Let's define a custom grok pattern to match lines of the NGINX access log file.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;cat&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;EOF&lt;/span&gt;&lt;span class="sh"&gt; &amp;gt; nginx
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} &lt;/span&gt;&lt;span class="se"&gt;\[&lt;/span&gt;&lt;span class="sh"&gt;%{HTTPDATE:time_local}&lt;/span&gt;&lt;span class="se"&gt;\]&lt;/span&gt;&lt;span class="sh"&gt; &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="sh"&gt;%{DATA:request}&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="sh"&gt; %{INT:status} %{NUMBER:bytes_sent} &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="sh"&gt;%{DATA:http_referer}&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="sh"&gt; &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="sh"&gt;%{DATA:http_user_agent}&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="sh"&gt;
&lt;/span&gt;&lt;span class="no"&gt;EOF
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: The syntax for a grok pattern is %{SYNTAX:SEMANTIC} or %{SYNTAX:SEMANTIC:TYPE}.&lt;/p&gt;

&lt;p&gt;Then, we have to make the pattern available to the connector by copying the &lt;code&gt;nginx&lt;/code&gt; file, previously created,  into the container :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; connect &lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; /tmp/grok-patterns
&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;nginx connect://tmp/grok-patterns/nginx
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;After that, we can create a new connector with the following configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-sX&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-log-filepulse-02/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
     "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
     "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
     "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
     "fs.scan.directory.path": "/tmp/connect-data",
     "fs.scan.interval.ms": "10000",
     "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
     "file.filter.regex.pattern":".*\\.log$",
     "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
     "internal.kafka.reporter.topic": "connect-file-pulse-status",
     "offset.strategy": "name",
     "read.max.wait.ms": "120000",
     "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
     "topic": "connect-file-pulse-nginx-parsed-logs",
     "tasks.max": 1,
     "filters": "ParseLog",
     "filters.ParseLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
     "filters.ParseLog.match": "%{NGINX_ACCESS}",
     "filters.ParseLog.overwrite": "message",
     "filters.ParseLog.source": "message",
     "filters.ParseLog.ignoreFailure": "true",
     "filters.ParseLog.patternsDir": "/tmp/grok-patterns"
}'&lt;/span&gt; | jq
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, let's consume the output Topic &lt;em&gt;connect-file-pulse-nginx-parsed-logs&lt;/em&gt; to observe the extracted fields.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; connect-file-pulse-nginx-parsed-logs &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;(output)&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"172.17.0.1 - - [05/Jan/2021:13:14:54 +0000] &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;GET / HTTP/1.1&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt; 200 306 &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;-&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt; &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;curl/7.58.0&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt; &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;-&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"remote_addr"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"172.17.0.1"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"remote_user"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"-"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"time_local"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"05/Jan/2021:13:14:54 +0000"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"request"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"GET / HTTP/1.1"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"status"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"200"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"bytes_sent"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"306"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"http_referer"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"-"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"http_user_agent"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"curl/7.58.0"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As previously, you can Eventually generate more access logs by running:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="k"&gt;for &lt;/span&gt;i &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;seq &lt;/span&gt;0 100&lt;span class="si"&gt;)&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do &lt;/span&gt;curl &lt;span class="nt"&gt;-sX&lt;/span&gt; GET http://localhost:8080 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;/dev/null&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="nb"&gt;sleep &lt;/span&gt;1 &lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Et voilà, It's as simple as that!&lt;/p&gt;

&lt;h1&gt;
  
  
  Using Grok Expression with Kafka Connect SMT
&lt;/h1&gt;

&lt;p&gt;In the previous example, we use the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters-chain-definition/"&gt;processing filter chain&lt;/a&gt; feature provided by Connect FilePulse.&lt;/p&gt;

&lt;p&gt;But, Kafka Connect already ships with a mechanism called &lt;strong&gt;&lt;em&gt;Single Message Transforms&lt;/em&gt; (SMTs)&lt;/strong&gt; that was added in Apache Kafka 0.10 (&lt;a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect"&gt;KIP-66&lt;/a&gt;). SMTs can be used to modify the data of each record that flow through Kafka Connect pipeline.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Good news&lt;/strong&gt; ! We have externalized the work done with the &lt;code&gt;GrokFilter&lt;/code&gt; to a dedicated SMT called: &lt;a href="https://github.com/streamthoughts/kafka-connect-transform-grok"&gt;Kafka Connect Grok Transformation&lt;/a&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;We have seen in this article that it is fairly easy to continuously read and parse log files. The Connect File Pulse connector ships with the &lt;code&gt;GrokFilter&lt;/code&gt; to parse unstructured data using Grok expressions as you would have done with Logstash.&lt;/p&gt;

&lt;p&gt;More generally, Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka. Please do not hesitate to share this article. If you like this project then add a ⭐ to the GitHub repository to support us. Thank you.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>kafkaconnect</category>
      <category>grokparsing</category>
      <category>log</category>
    </item>
    <item>
      <title>Streaming data into Kafka S01/E03 - Loading JSON file</title>
      <dc:creator>Florian Hussonnois</dc:creator>
      <pubDate>Thu, 10 Sep 2020 12:36:00 +0000</pubDate>
      <link>https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e03-loading-json-file-3d76</link>
      <guid>https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e03-loading-json-file-3d76</guid>
      <description>&lt;p&gt;This is the third article in the &lt;strong&gt;"Streaming data into Kafka"&lt;/strong&gt; series. In the first two, we saw how it's fairly easy to use Kafka Connect to load records from CSV and XML files into Apache Kafka without writing a single line of code. For doing this, we have used the &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse" rel="noopener noreferrer"&gt;Kafka Connect FilePulse connector&lt;/a&gt; which packs with a lot of nice features to parse and transform data. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/fhussonnois/streaming-csv-data-into-kafka-46a5"&gt;Streaming data into Kafka S01/E01 - Loading CSV file&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e02-loading-xml-file-529i"&gt;Streaming data into Kafka S01/E02 - Loading XML file&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now, let's see how to integrate JSON data, another file format that is widely used on most projects (and much more appreciated than XML for web-based applications).&lt;/p&gt;

&lt;h1&gt;
  
  
  Kafka Connect File Pulse connector
&lt;/h1&gt;

&lt;p&gt;&lt;em&gt;If you have already read the previous articles go directly to the next section (i.e Ingesting Data).&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;
  &lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fstreamthoughts%2Fkafka-connect-file-pulse%2Fraw%2Fmaster%2Fsite%2Fstatic%2Fimages%2Fstreamthoughts-connect-file-pule-logo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fgithub.com%2Fstreamthoughts%2Fkafka-connect-file-pulse%2Fraw%2Fmaster%2Fsite%2Fstatic%2Fimages%2Fstreamthoughts-connect-file-pule-logo.png"&gt;&lt;/a&gt;
&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse" rel="noopener noreferrer"&gt;Kafka Connect FilePulse connector&lt;/a&gt; is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).&lt;/p&gt;

&lt;p&gt;For a broad overview of FilePulse, I suggest you read this article : &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://medium.com/streamthoughts/kafka-connect-filepulse-one-connector-to-ingest-them-all-faed018a725c" rel="noopener noreferrer"&gt;Kafka Connect FilePulse - One Connector to Ingest them All!&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For more information, you can check-out the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/" rel="noopener noreferrer"&gt;documentation here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to use the connector
&lt;/h2&gt;

&lt;p&gt;The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can download the &lt;code&gt;docker-compose.yml&lt;/code&gt; file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on &lt;code&gt;http://localhost:8083&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; localhost:8083/connector-plugins|jq &lt;span class="s1"&gt;'.[].class'&lt;/span&gt;|egrep FilePulse

&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note : You can also install the connector either from &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse/releases" rel="noopener noreferrer"&gt;GitHub Releases Page&lt;/a&gt; or from &lt;a href="https://www.confluent.io/hub/streamthoughts/kafka-connect-file-pulse" rel="noopener noreferrer"&gt;Confluent Hub&lt;/a&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Ingesting Data
&lt;/h1&gt;

&lt;p&gt;To read a file containing a single JSON document, we will use the &lt;code&gt;BytesArrayInputReader&lt;/code&gt;. This reader allows us to create a single record per source file. Each record produced by this reader will have a single field of type byte[] named &lt;code&gt;message&lt;/code&gt;. The &lt;code&gt;byte[]&lt;/code&gt; value is the full content of the source file (i.e the JSON document).&lt;/p&gt;

&lt;p&gt;Then, to parse this field, we will use the processing filter mechanism provided by the FilePulse connector and more particularly the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#jsonfilter" rel="noopener noreferrer"&gt;JSONFilter&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;So lets's create the connector with this minimal configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/tracks-json-filepulse-00/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-json-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Note:&lt;/em&gt; The Connect FilePulse connector periodically scan the input directory that we set using the property &lt;code&gt;fs.scan.directory.path&lt;/code&gt;. Then, it will lookup for files matching the pattern &lt;code&gt;.*\\.json$&lt;/code&gt;. Each file is uniquely identified and tracked depending on the value of the &lt;code&gt;offset.strategy&lt;/code&gt;. Here, the configuration specifies that a file is identified by its name.&lt;/p&gt;

&lt;p&gt;Create a valid JSON file that looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;cat&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;&amp;lt;&amp;lt;EOF&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;track.json&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; 
  &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Star Wars (Main Theme)"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Star Wars"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"10:52"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt;EOF&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then copy this file from your host to the Docker container which runs the connector. You can run the following commands:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;// Create the target directory
&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; connect &lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; /tmp/kafka-connect/examples

// Copy host file to docker-container
&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;track.json connect://tmp/kafka-connect/examples/track-00.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, consume the topic named &lt;code&gt;tracks-filepulse-json-00&lt;/code&gt; and verify that the connector has detected and processed the JSON file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; tracks-filepulse-json-00 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"bytes"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"{ &lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s2"&gt;  &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;track&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;: {&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s2"&gt;     &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;title&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;Star Wars (Main Theme)&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s2"&gt;     &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;artist&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;John Williams, London Symphony Orchestra&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s2"&gt;     &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;album&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;Star Wars&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;,&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s2"&gt;     &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;duration&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;:&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;10:52&lt;/span&gt;&lt;span class="se"&gt;\"\n&lt;/span&gt;&lt;span class="s2"&gt;  }&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s2"&gt;}&lt;/span&gt;&lt;span class="se"&gt;\n&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars (Main Theme)"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"10:52"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;Note: In the example above, we have used &lt;a href="https://github.com/edenhill/kafkacat" rel="noopener noreferrer"&gt;kafkacat&lt;/a&gt; to consume the topics. The option &lt;code&gt;-o-1&lt;/code&gt; is used to only consume the latest message&lt;/em&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  Excluding Field
&lt;/h1&gt;

&lt;p&gt;The &lt;code&gt;JSONFilter&lt;/code&gt; does not automatically delete the original field containing the raw JSON string (i.e. the &lt;code&gt;message&lt;/code&gt;). If you do not want to keep this field, you can remove it using the &lt;code&gt;ExcludeFilter&lt;/code&gt; as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;curl&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;\&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="err"&gt;-i&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;-X&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;PUT&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;-H&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;\&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="err"&gt;-H&lt;/span&gt;&lt;span class="w"&gt;  &lt;/span&gt;&lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;http://localhost:&lt;/span&gt;&lt;span class="mi"&gt;8083&lt;/span&gt;&lt;span class="err"&gt;/connectors/tracks-json-filepulse&lt;/span&gt;&lt;span class="mi"&gt;-00&lt;/span&gt;&lt;span class="err"&gt;/config&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;\&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="err"&gt;-d&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;'&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"fs.scan.directory.path"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"/tmp/kafka-connect/examples/"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"fs.scan.interval.ms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"10000"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"fs.scan.filters"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"file.filter.regex.pattern"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;".*&lt;/span&gt;&lt;span class="se"&gt;\\&lt;/span&gt;&lt;span class="s2"&gt;.json$"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"task.reader.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"offset.strategy"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"tracks-filepulse-json-01"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"internal.kafka.reporter.bootstrap.servers"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"broker:29092"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"internal.kafka.reporter.topic"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"connect-file-pulse-status"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"fs.cleanup.policy.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"filters"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"ParseJSON, ExcludeFieldMessage"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"filters.ParseJSON.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"filters.ParseJSON.source"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"filters.ParseJSON.merge"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"true"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"filters.ExcludeFieldMessage.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"filters.ExcludeFieldMessage.fields"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"tasks.max"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="err"&gt;'&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Copy the JSON file to the Docker container as previously:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;track.json &lt;span class="se"&gt;\&lt;/span&gt;
connect://tmp/kafka-connect/examples/track-01.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then consume the output topic &lt;code&gt;tracks-filepulse-json-01&lt;/code&gt; by running :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; tracks-filepulse-json-01 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars (Main Theme)"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"10:52"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;That's it! We have successfully produced a clean structured message similar to the one contained in our input file.&lt;/p&gt;

&lt;p&gt;Now, let's go a step further.&lt;/p&gt;

&lt;h1&gt;
  
  
  Handling Null values
&lt;/h1&gt;

&lt;p&gt;Sometimes you may have to process JSON documents with null values. By default, if we take the configuration used so far, the null values will be ignored during the serialization.&lt;/p&gt;

&lt;p&gt;The main reason for this is that the connector cannot infer the type of a field containing a &lt;code&gt;null&lt;/code&gt; value.&lt;/p&gt;

&lt;p&gt;However, we can combine the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#appendfilter" rel="noopener noreferrer"&gt;AppendFilter&lt;/a&gt; and the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/" rel="noopener noreferrer"&gt;Simple Connect Expression Language (SCEL)&lt;/a&gt; to both define the type of null value and set a default value.  &lt;/p&gt;

&lt;p&gt;&lt;em&gt;Note:  &lt;a href="(https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/)"&gt;Simple Connect Expression Language (SCEL)&lt;/a&gt; is a basic expression language provided by the Connect FilePulse connector to access and manipulate records fields&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;Let's update the connector configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/tracks-json-filepulse-02/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-json-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage, SetDefaultRank",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "filters.SetDefaultRank.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.SetDefaultRank.field":"$value.track.rank",
        "filters.SetDefaultRank.value":"{{ converts(nlv($value.track.rank, 0), '&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;'INTEGER'&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;') }}",
        "filters.SetDefaultRank.overwrite": "true",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a second JSON document with the following content :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;cat&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;&amp;lt;&amp;lt;EOF&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;track-with-&lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="err"&gt;.json&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; 
  &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Duel of the Fates"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"Star Wars"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="s2"&gt;"4:14"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
     &lt;/span&gt;&lt;span class="nl"&gt;"rank"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt;EOF&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Copy it to the Docker container as previously:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;track-with-null.json &lt;span class="se"&gt;\&lt;/span&gt;
connect://tmp/kafka-connect/examples/track-02.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next consume the output topic &lt;code&gt;tracks-filepulse-json-01&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t tracks-filepulse-json-02 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Duel of the Fates"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"4:14"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"rank"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"int"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, you should get an output message containing the field &lt;code&gt;rank&lt;/code&gt; of type int and initialize with the default value &lt;code&gt;0&lt;/code&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Spliting JSON Array
&lt;/h1&gt;

&lt;p&gt;Finally, it's also common to have to process JSON files containing a JSON array of records.&lt;/p&gt;

&lt;p&gt;To produce one record per element in the array you have to set &lt;code&gt;explode.array&lt;/code&gt; property of the &lt;code&gt;JSONFilter&lt;/code&gt; to &lt;code&gt;true&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Lets' update the connector configuration with the following :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/tracks-json-filepulse-00/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.json$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-json-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
        "filters": "ParseJSON, ExcludeFieldMessage",
        "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
        "filters.ParseJSON.source":"message",
        "filters.ParseJSON.merge":"true",
        "filters.ParseJSON.explode.array":"true",
        "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
        "filters.ExcludeFieldMessage.fields":"message",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a file containing two JSON objects :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="err"&gt;$&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;cat&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;&amp;lt;&amp;lt;EOF&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;&amp;gt;&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;tracks.json&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars (Main Theme)"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt;  &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"10:52"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt;  &lt;/span&gt;&lt;span class="s2"&gt;"Duel of the Fates"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"4:14"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="err"&gt;EOF&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Copy it to the Docker container as previously.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;tracks.json &lt;span class="se"&gt;\&lt;/span&gt;
connect://tmp/kafka-connect/examples/tracks-00.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then consume the output topic &lt;code&gt;tracks-filepulse-json-02&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; tracks-filepulse-json-03 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o0&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars (Main Theme)"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"10:52"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Duel of the Fates"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"4:14"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And voilà! Now you know how to process JSON files with Kafka Connect.&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;We have seen in this article that it is very easy to load records from JSON files into Apache Kafka without writing a single line of code using Kafka Connect. The Connect File Pulse connector is a very powerful solution that allows you to easily manipulate your data before loading it into Apache Kafka.&lt;/p&gt;

&lt;p&gt;Please, share this article if you like this project. You can even add a ⭐ to the GitHub repository to support us.&lt;/p&gt;

&lt;p&gt;Thank you very much.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>kafkaconnect</category>
      <category>json</category>
      <category>etl</category>
    </item>
    <item>
      <title>Streaming data into Kafka S01/E02 - Loading XML file</title>
      <dc:creator>Florian Hussonnois</dc:creator>
      <pubDate>Wed, 19 Aug 2020 11:26:06 +0000</pubDate>
      <link>https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e02-loading-xml-file-529i</link>
      <guid>https://dev.to/fhussonnois/streaming-data-into-kafka-s01-e02-loading-xml-file-529i</guid>
      <description>&lt;p&gt;In the previous blog post &lt;strong&gt;&lt;a href="https://dev.to/fhussonnois/streaming-csv-data-into-kafka-46a5"&gt;Streaming data into Kafka S01/E01- Loading CSV file&lt;/a&gt;&lt;/strong&gt;, I've illustrated how it can be easy to integrate data into &lt;a href="https://kafka.apache.org"&gt;Apache Kafka&lt;/a&gt; using the &lt;a href="https://kafka.apache.org/documentation/#connect"&gt;Kafka Connect&lt;/a&gt; framework. &lt;/p&gt;

&lt;p&gt;In particular, we saw how to parse and transform CSV files to produce clean records into Kafka by using the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/"&gt;Kafka Connect FilePulse connector&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;XML(Extensible Markup Language) is another well-known data format. Usually, the XML format is not very appreciated by most developers because of its heaviness (or complexity). However, it's still used by many organizations to make systems interact with each other.&lt;/p&gt;

&lt;p&gt;In this second article, we will see how to read records from XML files and load them into Kafka. To do this, we will once again use the Kafka Connect FilePulse connector, which offers native support for reading XML files.&lt;/p&gt;

&lt;h1&gt;
  
  
  Kafka Connect File Pulse connector
&lt;/h1&gt;

&lt;p&gt;
  &lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--y2qnAMdN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://github.com/streamthoughts/kafka-connect-file-pulse/raw/master/site/static/images/streamthoughts-connect-file-pule-logo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--y2qnAMdN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://github.com/streamthoughts/kafka-connect-file-pulse/raw/master/site/static/images/streamthoughts-connect-file-pule-logo.png"&gt;&lt;/a&gt;
&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse"&gt;Kafka Connect FilePulse connector&lt;/a&gt; is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).&lt;/p&gt;

&lt;p&gt;For a broad overview of FilePulse, I suggest you read this article : &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://medium.com/streamthoughts/kafka-connect-filepulse-one-connector-to-ingest-them-all-faed018a725c"&gt;Kafka Connect FilePulse - One Connector to Ingest them All!&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/fhussonnois/streaming-csv-data-into-kafka-46a5"&gt;Streaming data into Kafka S01/E01- Loading CSV file&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For more information, you can check-out the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/"&gt;documentation here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to use the connector
&lt;/h2&gt;

&lt;p&gt;The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can download the &lt;code&gt;docker-compose.yml&lt;/code&gt; file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on &lt;code&gt;http://localhost:8083&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; localhost:8083/connector-plugins|jq &lt;span class="s1"&gt;'.[].class'&lt;/span&gt;|egrep FilePulse

&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note : You can also install the connector either from &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse/releases"&gt;GitHub Releases Page&lt;/a&gt; or from &lt;a href="https://www.confluent.io/hub/streamthoughts/kafka-connect-file-pulse"&gt;Confluent Hub&lt;/a&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Ingesting Data
&lt;/h1&gt;

&lt;p&gt;We will start by creating a first connector with the following configuration. It specifies that the connector's tasks must use the &lt;code&gt;XMLFileInputReader&lt;/code&gt; to read the files that will be scheduled by the connector.&lt;/p&gt;

&lt;p&gt;The connector will periodically scan the input directory that we set using the property &lt;code&gt;fs.scan.directory.path&lt;/code&gt;. Then, it will lookup for files matching the pattern &lt;code&gt;.*\\.xml$&lt;/code&gt;. Each file is uniquely identified and tracked depending on the value of the &lt;code&gt;offset.strategy&lt;/code&gt;. Here, the configuration specifies that a file is identified by its &lt;code&gt;name&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-xml-filepulse-00/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "offset.strategy":"name",
        "topic":"playlists-filepulse-xml-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the connector is created, you can check that it is properly started by executing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
localhost:8083/connectors/source-xml-filepulse-00/status &lt;span class="se"&gt;\&lt;/span&gt;
| jq  &lt;span class="s1"&gt;'.connector.state'&lt;/span&gt;

&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, let's create an XML files called &lt;code&gt;playlists.xml&lt;/code&gt; with the following content :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;$ cat &lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nt"&gt;&amp;lt;EOF&lt;/span&gt; &lt;span class="nt"&gt;&amp;gt;&lt;/span&gt; playlists.xml
&lt;span class="cp"&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;playlists&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;playlist&lt;/span&gt; &lt;span class="na"&gt;name=&lt;/span&gt;&lt;span class="s"&gt;"BestOfStarWars"&lt;/span&gt;&lt;span class="nt"&gt;&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;track&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;title&amp;gt;&lt;/span&gt;Duel of the Fates&lt;span class="nt"&gt;&amp;lt;/title&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;artist&amp;gt;&lt;/span&gt;John Williams, London Symphony Orchestra&lt;span class="nt"&gt;&amp;lt;/artist&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;album&amp;gt;&lt;/span&gt;Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)&lt;span class="nt"&gt;&amp;lt;/album&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;duration&amp;gt;&lt;/span&gt;4:14&lt;span class="nt"&gt;&amp;lt;/duration&amp;gt;&lt;/span&gt;   
        &lt;span class="nt"&gt;&amp;lt;/track&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;track&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;title&amp;gt;&lt;/span&gt;Star Wars (Main Theme)&lt;span class="nt"&gt;&amp;lt;/title&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;artist&amp;gt;&lt;/span&gt;John Williams, London Symphony Orchestra&lt;span class="nt"&gt;&amp;lt;/artist&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;album&amp;gt;&lt;/span&gt;Star Wars: The Empire Strikes Back (Original Motion Picture Soundtrack)&lt;span class="nt"&gt;&amp;lt;/album&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;duration&amp;gt;&lt;/span&gt;10:52&lt;span class="nt"&gt;&amp;lt;/duration&amp;gt;&lt;/span&gt;  
        &lt;span class="nt"&gt;&amp;lt;/track&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/playlist&amp;gt;&lt;/span&gt; 
&lt;span class="nt"&gt;&amp;lt;/playlists&amp;gt;&lt;/span&gt;
EOF
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, copy this file from your host to the Docker container which runs the connector. You can run the following commands:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;// Copy CSV file to docker-container
&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; connect &lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; /tmp/kafka-connect/examples

&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;playlists.xml connect://tmp/kafka-connect/examples/playlists-00.xml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, you can consume the &lt;code&gt;playlists-filepulse-xml-00&lt;/code&gt; topic to verify that the connector has detected and processed the XML file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t playlists-filepulse-xml-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

{
  "playlists": {
    "Playlists": {
      "playlist": {
        "Playlist": {
          "name": {
            "string": "BestOfStarWars"
          },
          "track": {
            "array": [
              {
                "Track": {
                  "title": {
                    "string": "Duel of the Fates"
                  },
                  "artist": {
                    "string": "John Williams, London Symphony Orchestra"
                  },
                  "album": {
                    "string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
                  },
                  "duration": {
                    "string": "4:14"
                  }
                }
              },
              {
                "Track": {
                  "title": {
                    "string": "Star Wars (Main Theme)"
                  },
                  "artist": {
                    "string": "John Williams, London Symphony Orchestra"
                  },
                  "album": {
                    "string": "Star Wars: The Empire Strikes Back (Original Motion Picture Soundtrack)"
                  },
                  "duration": {
                    "string": "10:52"
                  }
                }
              }
            ]
          }
        }
      }
    }
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: In the example above, we are using &lt;a href="https://github.com/edenhill/kafkacat"&gt;kafkacat&lt;/a&gt; to consume messages. The option -o-1 is used to only consume the latest message.&lt;/p&gt;

&lt;p&gt;As you have noticed from the above example, the Connect FilePulse has automatically inferred the schema from the input XML file.&lt;/p&gt;

&lt;p&gt;So, let's check the produced Avro schema by using the HTTP SchemaRegistry endpoint :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; GET &lt;span class="se"&gt;\&lt;/span&gt;
http://localhost:8081/subjects/playlists-filepulse-xml-00-value/versions/latest/schema &lt;span class="se"&gt;\&lt;/span&gt;
| jq
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Forcing Array-type
&lt;/h1&gt;

&lt;p&gt;Let's create a second XML file named &lt;code&gt;single-track-playlist.xml&lt;/code&gt; with the following content :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;$ cat &lt;span class="err"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nt"&gt;&amp;lt;EOF&lt;/span&gt; &lt;span class="nt"&gt;&amp;gt;&lt;/span&gt; single-track-playlists.xml
&lt;span class="cp"&gt;&amp;lt;?xml version="1.0" encoding="UTF-8"?&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;playlists&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;playlist&lt;/span&gt; &lt;span class="na"&gt;name=&lt;/span&gt;&lt;span class="s"&gt;"BestOfJWilliams"&lt;/span&gt;&lt;span class="nt"&gt;&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;track&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;title&amp;gt;&lt;/span&gt;Theme From Jurassic Park&lt;span class="nt"&gt;&amp;lt;/title&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;artist&amp;gt;&lt;/span&gt;John Williams, London Symphony Orchestra&lt;span class="nt"&gt;&amp;lt;/artist&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;album&amp;gt;&lt;/span&gt;Jurassic Park&lt;span class="nt"&gt;&amp;lt;/album&amp;gt;&lt;/span&gt;
            &lt;span class="nt"&gt;&amp;lt;duration&amp;gt;&lt;/span&gt;3:27&lt;span class="nt"&gt;&amp;lt;/duration&amp;gt;&lt;/span&gt;   
        &lt;span class="nt"&gt;&amp;lt;/track&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;/playlist&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/playlists&amp;gt;&lt;/span&gt;
EOF
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then copy it to the Docker container as previously.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;single-track-playlists.xml &lt;span class="se"&gt;\&lt;/span&gt;
connect://tmp/kafka-connect/examples/playlists-01.xml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, let's consume the topic &lt;code&gt;playlists-filepulse-xml-00&lt;/code&gt;. You will see that the field named &lt;code&gt;track&lt;/code&gt; is of type &lt;code&gt;Track&lt;/code&gt;, whereas in the first example it was of type array.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; playlists-filepulse-xml-00 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"playlists"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"Playlists"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="s2"&gt;"playlist"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="s2"&gt;"Playlist"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
          &lt;span class="s2"&gt;"name"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"BestOfJWilliams"&lt;/span&gt;
          &lt;span class="o"&gt;}&lt;/span&gt;,
          &lt;span class="s2"&gt;"track"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="s2"&gt;"Track"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
              &lt;span class="s2"&gt;"title"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"Theme From Jurassic Park"&lt;/span&gt;
              &lt;span class="o"&gt;}&lt;/span&gt;,
              &lt;span class="s2"&gt;"artist"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;
              &lt;span class="o"&gt;}&lt;/span&gt;,
              &lt;span class="s2"&gt;"album"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"Jurassic Park"&lt;/span&gt;
              &lt;span class="o"&gt;}&lt;/span&gt;,
              &lt;span class="s2"&gt;"duration"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"3:27"&lt;/span&gt;
              &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
          &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is a common problem when trying to infer a data schema from an XML file. It's a bit difficult to identify that a field must be an array when only one element is present.&lt;/p&gt;

&lt;p&gt;To solve this problem, it's possible to configure the &lt;code&gt;XMLFileInputReader&lt;/code&gt; to force some fields to be of type array.&lt;/p&gt;

&lt;p&gt;Let's update the connector with the given configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-xml-filepulse-00/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "force.array.on.fields": "track",
        "offset.strategy":"name",
        "topic":"playlists-filepulse-xml-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then recopy the XML file with a new name, so that it will be pickup by the connector.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;single-track-playlists.xml connect://tmp/kafka-connect/examples/playlist-03.xml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, consume the output topic to observe the effect of the new configuration.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\ &lt;/span&gt;                      
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; playlists-filepulse-xml-00 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"playlists"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"Playlists"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"playlist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"Playlist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"BestOfJWilliams"&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="nl"&gt;"track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="nl"&gt;"array"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
              &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="nl"&gt;"Track"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Theme From Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
                    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"3:27"&lt;/span&gt;&lt;span class="w"&gt;
                  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
                &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
              &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
            &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
          &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;track&lt;/code&gt; field is now an array field.&lt;/p&gt;

&lt;h1&gt;
  
  
  Splitting XML using XPath expression
&lt;/h1&gt;

&lt;p&gt;So far we have only used the "XMLFileInputReader" to read a single record from each XML file.&lt;/p&gt;

&lt;p&gt;But we could just as well produce one record for each &lt;code&gt;track&lt;/code&gt; element present in the input XML files. To do this, we will simply override the property &lt;code&gt;xpath.expression&lt;/code&gt;, whose default value is &lt;code&gt;/&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;So, let's create a new connector with the following configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-xml-filepulse-01/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "xpath.expression": "//playlist/track",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-xml-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, consume the messages from the output topic &lt;code&gt;tracks-filepulse-xml-00&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; tracks-filepulse-xml-00 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Theme From Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"artist"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"3:27"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You should get one record per &lt;code&gt;track&lt;/code&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Extracting string values
&lt;/h1&gt;

&lt;p&gt;By default, the &lt;code&gt;XMLFileInputReader&lt;/code&gt; expects the XPath expression to return a result of type &lt;code&gt;NodeSet&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;If your goal is to extract a single text element from an input XML files you will have to set the property &lt;code&gt;xpath.result.type=STRING&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;For example, if we configure a connector with the following XPath expression &lt;code&gt;//playlist/track/title/text()&lt;/code&gt; then the connector will produce messages in the form :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Theme From Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, you can use &lt;strong&gt;Single Message Transformations (SMT)&lt;/strong&gt; such as &lt;a href="https://docs.confluent.io/current/connect/transforms/extractfield.html#extractfield"&gt;ExtractField&lt;/a&gt; to replace the entire value with the extracted field. &lt;/p&gt;

&lt;h1&gt;
  
  
  Renaming fields
&lt;/h1&gt;

&lt;p&gt;It's sometimes useful to be able to change the name of a field. This can be because you need to further contextualize a field or because you are not satisfied with the field names present in the XML input file.&lt;/p&gt;

&lt;p&gt;For example, we can rename the field &lt;code&gt;artist&lt;/code&gt; because the field contains a comma-separated list of &lt;code&gt;artists&lt;/code&gt; (not a single one).&lt;/p&gt;

&lt;p&gt;The File Pulse connector allows us to define complex pipelines to parse, transform, and enrich data through the use of processing &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters-chain-definition/"&gt;Filters&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Thus, to rename the field &lt;code&gt;artist&lt;/code&gt; into &lt;code&gt;artists&lt;/code&gt; we will use the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#renamefilter"&gt;RenameFilter&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Let's create a new connector with this new configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-xml-filepulse-02/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "force.array.on.fields": "track",
        "xpath.expression": "//playlist/track",
        "filters": "RenameArtist",
        "filters.RenameArtist.type": "io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter",
        "filters.RenameArtist.field": "artist",
        "filters.RenameArtist.target": "artists",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-xml-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, consume the produced messages from the output topic &lt;code&gt;tracks-filepulse-xml-02&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; tracks-filepulse-xml-02 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Theme From Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"3:27"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"artists"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams, London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Splitting fields into an array
&lt;/h1&gt;

&lt;p&gt;Finally, I'm going to show you how you can combine the filters to build a complete processing pipeline. &lt;/p&gt;

&lt;p&gt;For this last example we will split the "artists" field into a table using the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#splitfilter"&gt;&lt;code&gt;SplitFilter&lt;/code&gt;&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Let's create a final connector with the following configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-xml-filepulse-03/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "force.array.on.fields": "track",
        "xpath.expression": "//playlist/track",
        "filters": "RenameArtists, SplitArtists",
        "filters.RenameArtists.type": "io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter",
        "filters.RenameArtists.field": "artist",
        "filters.RenameArtists.target": "artists",
        "filters.SplitArtists.type": "io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter",
        "filters.SplitArtists.split": "artists",
        "filters.SplitArtists.separator": ",",
        "offset.strategy":"name",
        "topic":"tracks-filepulse-xml-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, consume the output topic &lt;code&gt;tracks-filepulse-xml-03&lt;/code&gt; to observe the result of our filter-chain.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; tracks-filepulse-xml-03 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;(output)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"title"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Theme From Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"album"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"Jurassic Park"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"duration"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"3:27"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"artists"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"array"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"John Williams"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"string"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;" London Symphony Orchestra"&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And voila!&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;We have seen in this article that it can be fairly easy to load records from XML files into Apache Kafka without writing a single line of code using Kafka Connect. Also, the Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before loading it into Apache Kafka. &lt;/p&gt;

&lt;p&gt;Please, share this article if you like this project. You can even add a ⭐ to the GitHub repository to support us. &lt;/p&gt;

&lt;p&gt;Thank you very much.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>kafkaconnect</category>
      <category>xml</category>
      <category>bigdata</category>
    </item>
    <item>
      <title>Streaming data into Kafka S01/E01 - Loading CSV file</title>
      <dc:creator>Florian Hussonnois</dc:creator>
      <pubDate>Wed, 12 Aug 2020 17:52:07 +0000</pubDate>
      <link>https://dev.to/fhussonnois/streaming-csv-data-into-kafka-46a5</link>
      <guid>https://dev.to/fhussonnois/streaming-csv-data-into-kafka-46a5</guid>
      <description>&lt;p&gt;Ingesting data files in Apache Kafka is a very common task. Among all the various file formats that we can find, CSV is probably the most popular one to move data between different systems. This is due to its simplicity and to the fact that it can be used to export or import data from one (small) database to another. &lt;/p&gt;

&lt;p&gt;A CSV file is nothing more than a text file (with a &lt;code&gt;.csv&lt;/code&gt; extension). Each line of the file represents a data record and each record consists of one or more fields, separated by a comma (or another separator).&lt;/p&gt;

&lt;p&gt;Here is a chunk of example :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;40;War;02:38;1983;U2;Rock
Acrobat;Achtung Baby;04:30;1991;U2;Rock
Sunday Bloody Sunday;War;04:39;1983;U2;Rock
With or Without You;The Joshua Tree;04:55;1987; U2;Rock
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In Addition, a CSV file can contain a header line to indicate the name of each field.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;title;album;duration;release;artist;type
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this article, we will see how to integrate such a file in Apache Kafka. Of course, we are not going to reinvent the wheel. Many tools already exist to do this and are available in open-source.&lt;/p&gt;

&lt;p&gt;We will use the &lt;a href="https://kafka.apache.org/documentation.html#connect"&gt;Kafka Connect&lt;/a&gt; framework, which is part of the &lt;a href="https://kafka.apache.org"&gt;Apache Kafka project&lt;/a&gt;. Kafka Connect has been designed to move data in and out of Kafka using connectors.&lt;/p&gt;

&lt;h1&gt;
  
  
  Kafka Connect File Pulse connector
&lt;/h1&gt;

&lt;p&gt;
  &lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--y2qnAMdN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://github.com/streamthoughts/kafka-connect-file-pulse/raw/master/site/static/images/streamthoughts-connect-file-pule-logo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--y2qnAMdN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://github.com/streamthoughts/kafka-connect-file-pulse/raw/master/site/static/images/streamthoughts-connect-file-pule-logo.png"&gt;&lt;/a&gt;
&lt;/p&gt;

&lt;p&gt;The &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse"&gt;Kafka Connect File Pulse connector&lt;/a&gt; makes it easy to parse, transform, and stream data file into Kafka. It supports several formats of files, but we will focus on CSV. &lt;/p&gt;

&lt;p&gt;For a broad overview of FilePulse, I suggest you read this article : &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://medium.com/streamthoughts/kafka-connect-filepulse-one-connector-to-ingest-them-all-faed018a725c"&gt;Kafka Connect FilePulse - One Connector to Ingest them All!&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For more information, you can check-out the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/"&gt;documentation here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to use the connector
&lt;/h2&gt;

&lt;p&gt;The easiest and fastest way to get started with Connect the FilePulse connector is to use the Docker image available on Docker Hub.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can download the &lt;code&gt;docker-compose.yml&lt;/code&gt; file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on &lt;code&gt;http://localhost:8083&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; localhost:8083/connector-plugins|jq &lt;span class="s1"&gt;'.[].class'&lt;/span&gt;|egrep FilePulse

&lt;span class="s2"&gt;"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can also install the connector either from &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse/releases"&gt;GitHub Releases Page&lt;/a&gt; or from &lt;a href="https://www.confluent.io/hub/streamthoughts/kafka-connect-file-pulse"&gt;Confluent Hub&lt;/a&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Ingesting data
&lt;/h1&gt;

&lt;p&gt;Let's create a first connector instance :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-csv-filepulse-00/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;You can check the connector is properly started by executing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; localhost:8083/connectors/source-csv-filepulse-00/status|jq  &lt;span class="s1"&gt;'.connector.state'&lt;/span&gt;

&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Right now our connector is not processing any data at all. Let's copy a CSV file into the input directory &lt;code&gt;/tmp/kafka-connect/examples/&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;// Download CSV file
&lt;span class="nv"&gt;$ &lt;/span&gt;&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;GITHUB_REPO_MASTER&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/
&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-sSL&lt;/span&gt; &lt;span class="nv"&gt;$GITHUB_REPO_MASTER&lt;/span&gt;/examples/quickstart-musics-dataset.csv &lt;span class="nt"&gt;-o&lt;/span&gt; musics-dataset.csv

// Copy CSV file to docker-container
&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; connect &lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; /tmp/kafka-connect/examples
&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;cp &lt;/span&gt;musics-dataset.csv connect://tmp/kafka-connect/examples/musics-dataset-00.csv
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's check if we have some data into the output topic &lt;code&gt;musics-filepulse-csv-00&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; musics-filepulse-csv-00 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault

&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"message"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"Zoo Station;Achtung Baby;04:36;1991;U2;Rock"&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;,
  &lt;span class="s2"&gt;"headers"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"array"&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;
      &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"title;album;duration;release;artist;type"&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;]&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: In the example above, we are using &lt;a href="https://github.com/edenhill/kafkacat"&gt;kafkacat&lt;/a&gt; to consume messages. The option &lt;code&gt;-o-1&lt;/code&gt; is used to only consume the latest message.&lt;/p&gt;

&lt;p&gt;As we can see, our topic contains one Avro message for each line of the CSV file. The reason for this is because we have configured our connector to use the &lt;code&gt;RowFileInputReader&lt;/code&gt; (see: &lt;code&gt;task.reader.class&lt;/code&gt; configuration)&lt;/p&gt;

&lt;p&gt;Each record contains two fields: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;message&lt;/code&gt;: This field is of type &lt;code&gt;string&lt;/code&gt; and represents a single line of the input file.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;headers&lt;/code&gt;: This field is of type &lt;code&gt;array&lt;/code&gt; and contains the first header line of the input file. This field is automatically added by the &lt;code&gt;RowFileInputReader&lt;/code&gt; because we have configured it with &lt;code&gt;skip.headers=1&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h1&gt;
  
  
  Headers
&lt;/h1&gt;

&lt;p&gt;The FilePulse connector will add headers to each record containing metadata about the source file from which the data was extracted. This can be useful for debugging but also for data lineage.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt;  &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; musics-filepulse-csv-00 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .headers

&lt;span class="o"&gt;[&lt;/span&gt;
  &lt;span class="s2"&gt;"connect.file.name"&lt;/span&gt;,
  &lt;span class="s2"&gt;"musics-dataset-00.csv"&lt;/span&gt;,
  &lt;span class="s2"&gt;"connect.file.path"&lt;/span&gt;,
  &lt;span class="s2"&gt;"/tmp/kafka-connect/examples"&lt;/span&gt;,
  &lt;span class="s2"&gt;"connect.file.hash"&lt;/span&gt;,
  &lt;span class="s2"&gt;"1466679696"&lt;/span&gt;,
  &lt;span class="s2"&gt;"connect.file.size"&lt;/span&gt;,
  &lt;span class="s2"&gt;"6588"&lt;/span&gt;,
  &lt;span class="s2"&gt;"connect.file.lastModified"&lt;/span&gt;,
  &lt;span class="s2"&gt;"1597337097000"&lt;/span&gt;,
  &lt;span class="s2"&gt;"connect.hostname"&lt;/span&gt;,
  &lt;span class="s2"&gt;"57a2fb6213f9"&lt;/span&gt;
&lt;span class="o"&gt;]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Parsing data
&lt;/h1&gt;

&lt;p&gt;The File Pulse connector allows us to define complex pipelines to parse, transform, and enrich data through the use of processing &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters-chain-definition/"&gt;Filters&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;You can use the built-in &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#delimitedrowfilter"&gt;DelimitedRowFilter&lt;/a&gt; to parse each line. Also, because the first line of the CSV file is a header you can set the property &lt;code&gt;extractColumnName&lt;/code&gt; to name the record's fields based on the &lt;code&gt;headers&lt;/code&gt; field.&lt;/p&gt;

&lt;p&gt;Let's create a new connector with this new configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-csv-filepulse-01/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, let's consume the topic &lt;code&gt;musics-filepulse-csv-01&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; musics-filepulse-csv-01 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault

&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"title"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"Zoo Station"&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;,
  &lt;span class="s2"&gt;"album"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"Achtung Baby"&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;,
  &lt;span class="s2"&gt;"duration"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"04:36"&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;,
  &lt;span class="s2"&gt;"release"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"1991"&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;,
  &lt;span class="s2"&gt;"artist"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"U2"&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;,
  &lt;span class="s2"&gt;"type"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"Rock"&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Filtering data
&lt;/h1&gt;

&lt;p&gt;Sometimes you may want to keep only the lines in a file that have a field with a particular value. For this article, let's imagine we need to only keep the song from the band AC/DC. &lt;/p&gt;

&lt;p&gt;For doing this we will extend our filter chain to use the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#dropfilter"&gt;&lt;code&gt;DropFilter&lt;/code&gt;&lt;/a&gt; and create a new connector (so that our file can be reprocessed):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-csv-filepulse-02/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine,KeepACDC",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
        "filters.KeepACDC.if":"{{ equals($value.artist, '&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;'AC/DC'&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;') }}",
        "filters.KeepACDC.invert":"true"
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The propery &lt;code&gt;if&lt;/code&gt; is set with a &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/accessing-data-and-metadata/"&gt;Simple Connect Expression Language&lt;/a&gt; (SCEL) which is a basic expression language provided by the Connect FilePulse connector to access and manipulate records fields. &lt;/p&gt;

&lt;p&gt;Finally, you can check that you get the expected result by executing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt;  &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; musics-filepulse-csv-02 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault.artist

&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"string"&lt;/span&gt;: &lt;span class="s2"&gt;"AC/DC"&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h1&gt;
  
  
  Changing the field types
&lt;/h1&gt;

&lt;p&gt;You have probably noticed that at no time have we defined the type of our fields. By default, the connector assumes that all fields are of type string and maybe you are happy with that. But in most cases, you will want to convert a few fields. For example, we can convert the field &lt;code&gt;year&lt;/code&gt; to be of type "Integer".&lt;/p&gt;

&lt;p&gt;For doing this, you can use the &lt;a href="https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/#appendfilter"&gt;&lt;code&gt;AppendFilter&lt;/code&gt;&lt;/a&gt; with the Simple connect Expression.&lt;/p&gt;

&lt;p&gt;Lets' create our final configuration :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-i&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Accept:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt;  &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; http://localhost:8083/connectors/source-csv-filepulse-03/config &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine,KeepACDC,ReleaseToInt",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
        "filters.KeepACDC.if":"{{ equals($value.artist, '&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;'AC/DC'&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;') }}",
        "filters.KeepACDC.invert":"true",
        "filters.ReleaseToInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.ReleaseToInt.field": "$value.release",
        "filters.ReleaseToInt.value": "{{ converts($value.release, '&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;'INTEGER'&lt;/span&gt;&lt;span class="se"&gt;\'&lt;/span&gt;&lt;span class="s1"&gt;') }}",
        "filters.ReleaseToInt.overwrite": "true"
    }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, you can check converted field :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker run &lt;span class="nt"&gt;--tty&lt;/span&gt; &lt;span class="nt"&gt;--network&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;host edenhill/kafkacat:1.6.0 kafkacat &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-b&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-t&lt;/span&gt; musics-filepulse-csv-03 &lt;span class="se"&gt;\&lt;/span&gt;
        &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-J&lt;/span&gt; &lt;span class="nt"&gt;-q&lt;/span&gt; &lt;span class="nt"&gt;-o-1&lt;/span&gt; &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;s &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nv"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;avro &lt;span class="nt"&gt;-r&lt;/span&gt; http://localhost:8081 | jq .payload.ConnectDefault.release

&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"release"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"int"&lt;/span&gt;: 1980
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;AppendFilter&lt;/code&gt; is a very handy filter that allows us to quickly modify a record. For example, we could also use it to set the key of each record to be equal to the album name by adding the following configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;"filters.SetKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetKey.field": "$key",
"filters.SetKey.value": "{{ uppercase($value.album)}}"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note: For this article, we have used here the filter chain mechanism provided by the Connect File Pulse connector. But it is also possible to use the Kafka Connect &lt;strong&gt;Single Message Transformations&lt;/strong&gt; (SMT) to perform the same task using the &lt;code&gt;org.apache.kafka.connect.transforms.Cast&lt;/code&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;We have seen in this article that it is fairly easy to load a CSV file into Kafka without writing a single line of code using Kafka Connect. Also, the Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka. Please do not hesitate to share this article. If you like this project then add a ⭐ to the &lt;a href="https://github.com/streamthoughts/kafka-connect-file-pulse"&gt;GitHub repository&lt;/a&gt; to support us. Thank you.&lt;/p&gt;

</description>
      <category>apachekafka</category>
      <category>kafkaconnect</category>
      <category>data</category>
      <category>csv</category>
    </item>
    <item>
      <title>Create Kafka Streams applications faster than ever before with Azkarra Streams</title>
      <dc:creator>Florian Hussonnois</dc:creator>
      <pubDate>Thu, 09 Jan 2020 16:02:06 +0000</pubDate>
      <link>https://dev.to/fhussonnois/create-kafka-streams-applications-faster-than-ever-before-via-azkarra-streams-3nng</link>
      <guid>https://dev.to/fhussonnois/create-kafka-streams-applications-faster-than-ever-before-via-azkarra-streams-3nng</guid>
      <description>&lt;p&gt;&lt;a href="https://kafka.apache.org/documentation/streams/"&gt;Kafka Streams&lt;/a&gt; is a powerful library for writing streaming applications and microservices on top of Apache Kafka in Java and Scala.&lt;/p&gt;

&lt;p&gt;When writing a Kafka Streams application, developers must not only define their topology, i.e. the sequence of operations to be applied to the consumed messages, but also the code needed to execute it.&lt;/p&gt;

&lt;p&gt;Furthermore, to write a production-ready application, you will have to know; how to handle processing failures and bad records, how to monitor and operate instances. And, if you plan to expose some internal states, using the Kafka Streams built-in feature so-called &lt;strong&gt;"Interactive Queries"&lt;/strong&gt;, you will also have to write code to get access to your data (for example via REST APIs).&lt;/p&gt;

&lt;p&gt;As a result of this, your application can quickly become complex with boilerplate code that has no direct business value but that you will have to maintain and duplicate and other projects.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://streamthoughts.github.io/azkarra-streams/"&gt;Azkarra Streams&lt;/a&gt; is an open-source lightweight Java framework which makes easy to develop and operate Kafka Streams applications (Azkarra is Basque word for "&lt;em&gt;Fast&lt;/em&gt;").&lt;/p&gt;

&lt;h2&gt;
  
  
  Key Features
&lt;/h2&gt;

&lt;p&gt;Azkarra Streams provides a set of features to quickly debug and build production-ready Kafka Streams applications. This includes, among other things:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Lifecycle management of Kafka Streams instances&lt;/strong&gt; (no more KafkaStreams#start()).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Easy externalization of Topology and KafkaStreams configurations&lt;/strong&gt; (using &lt;a href="https://github.com/lightbend/config"&gt;Typesafe Config&lt;/a&gt;).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Embedded HTTP server for querying state store&lt;/strong&gt; (Undertow).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;HTTP endpoint to monitor streams application metrics&lt;/strong&gt; (e.g : JSON, Prometheus).&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;Web UI for topologies visualization.&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;Encryption and Authentication with SSL or Basic Auth.
Etc.&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Getting Started
&lt;/h2&gt;

&lt;p&gt;Since Azkarra v0.5.0, one way to get started with Azkarra is to use the official Docker image (&lt;a href="https://hub.docker.com/r/streamthoughts/azkarra-streams-worker"&gt;streamthoughts/azkarra-streams-worker&lt;/a&gt;) that allows running a standalone Azkarra worker to execute one or many Kafka Streams applications. &lt;/p&gt;

&lt;p&gt;Azkarra Worker follows the same mechanism used by the Kafka Connect project, i.e. that Kafka Streams topologies are provided as external components that can be started and stopped either via REST calls or an embedded UI.&lt;/p&gt;

&lt;p&gt;Let's start an Azkarra worker instance and a broker-single node cluster using the &lt;code&gt;docker-compose.yml&lt;/code&gt; available on the &lt;a href="https://github.com/streamthoughts/azkarra-streams/blob/master/docker-compose.yml"&gt;GitHub repository&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;1 ) Run the following command to download and run containers :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; https://raw.githubusercontent.com/streamthoughts/azkarra-streams/master/docker-compose.yml &lt;span class="nt"&gt;--output&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
docker-compose.yml &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;2 ) Check that Azkarra worker is up and running :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-sX&lt;/span&gt; GET http://localhost:8080 | jq
&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"azkarraVersion"&lt;/span&gt;: &lt;span class="s2"&gt;"0.5.0"&lt;/span&gt;,
  &lt;span class="s2"&gt;"commitId"&lt;/span&gt;: &lt;span class="s2"&gt;"d2bc2fdc24e68eb143f4388960881974604093ca"&lt;/span&gt;,
  &lt;span class="s2"&gt;"branch"&lt;/span&gt;: &lt;span class="s2"&gt;"master"&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;3 ) Finally, you can access the Azkarra Web UI is available on: &lt;a href="http://localhost:8080/ui"&gt;http://localhost:8080/ui&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--zomTqHAt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/43hkrw5s4o7l9yw67me0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--zomTqHAt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/43hkrw5s4o7l9yw67me0.png" alt="Azkarra WebUI - Overview"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As we can see, for the moment our worker is doing absolutely nothing since we have not yet deployed a topology. So, let's write a simple Kafka Streams application.&lt;/p&gt;

&lt;h2&gt;
  
  
  Writing A First Kafka Streams Topology
&lt;/h2&gt;

&lt;p&gt;For demonstrating the use of Azkarra API, we will rewrite the standard &lt;a href="https://kafka.apache.org/23/documentation/streams/quickstart"&gt;WordCountTopology&lt;/a&gt; example.&lt;/p&gt;

&lt;p&gt;First, let's create a simple Java project and add Azkarra Streams to the dependency of your project.&lt;/p&gt;

&lt;p&gt;For Maven (&lt;code&gt;pom.xml&lt;/code&gt;):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight xml"&gt;&lt;code&gt;&lt;span class="nt"&gt;&amp;lt;dependencies&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.apache.kafka&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;kafka-streams&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;2.4.0&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;scope&amp;gt;&lt;/span&gt;provided&lt;span class="nt"&gt;&amp;lt;/scope&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;

    &lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;io.streamthoughts&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;azkarra-streams&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;0.5.0&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
        &lt;span class="nt"&gt;&amp;lt;scope&amp;gt;&lt;/span&gt;provided&lt;span class="nt"&gt;&amp;lt;/scope&amp;gt;&lt;/span&gt;
    &lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
&lt;span class="nt"&gt;&amp;lt;/dependencies&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Note that when using Azkarra Worker, your project should never contain any libraries that are provided by Azkarra Worker’s runtime (i.e azkarra-*, kafka-streams).&lt;/p&gt;

&lt;p&gt;Secondly, let's define our Kafka Streams Topology by creating a new file &lt;code&gt;WordCountTopology.java&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;azkarra&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.streamthoughts.azkarra.api.annotations.*&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.streamthoughts.azkarra.api.streams.TopologyProvider&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.common.serialization.Serdes&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.streams.StreamsBuilder&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.streams.Topology&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.streams.kstream.*&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Arrays&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="nd"&gt;@TopologyInfo&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;description&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"WordCount topology example"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;WordCountTopology&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;TopologyProvider&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Topology&lt;/span&gt; &lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StreamsBuilder&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;textLines&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"streams-plaintext-input"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;textLines&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flatMapValues&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;asList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toLowerCase&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;split&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"\\W+"&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupBy&lt;/span&gt;&lt;span class="o"&gt;((&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;count&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;as&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"WordCount"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="s"&gt;"streams-wordcount-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; 
             &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;String&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="nf"&gt;version&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="s"&gt;"1.0"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;As you can see, we have implemented the &lt;code&gt;TopologyProvider&lt;/code&gt; interface to provide the &lt;code&gt;Topology&lt;/code&gt; object. Azkarra enforces you to version each provided topology. This is useful, for example, to execute multiple versions of the same topology or to automatically generate meaningful &lt;code&gt;application.id&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;The annotation &lt;code&gt;@Component&lt;/code&gt; is required to let Azkarra detects this class.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;And that's all!&lt;/strong&gt; Azkarra will be responsible to create and manage the &lt;code&gt;KafkaStreams&lt;/code&gt; instance that will run the provided &lt;code&gt;Topology&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Deploying A Streams Topology
&lt;/h2&gt;

&lt;p&gt;Now, we need to make our &lt;code&gt;WordCountTopology&lt;/code&gt; available to the worker.&lt;/p&gt;

&lt;p&gt;For doing this, we have to package and install our component into one of the directories configured via the property &lt;code&gt;azkarra.component.paths&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;If you look at the &lt;code&gt;docker-compose.yml&lt;/code&gt; you will see that this property is set to &lt;code&gt;/tmp/azkarra/components&lt;/code&gt; using an environment variable.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;azkarra.component.paths&lt;/code&gt; property should define the list of locations (separated by a comma) from which the components will be scanned.&lt;/p&gt;

&lt;p&gt;Each configured directories may contain:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;an uber JAR&lt;/strong&gt; containing all of the classes and third-party dependencies for the component (e.g., topology).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;a directory&lt;/strong&gt; containing all JARs for the component&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Usually, with Maven, you will use the &lt;code&gt;maven-assembly-plugin&lt;/code&gt; or &lt;code&gt;maven-shade-plugin&lt;/code&gt; to build your project to an uber JAR.&lt;/p&gt;

&lt;p&gt;After packaging your application, you can copy the &lt;code&gt;.jar&lt;/code&gt; into the local directory &lt;code&gt;/tmp/azkarra/components&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Then, restart the docker containers as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose restart
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now, you should be able to list the available topologies via the REST API :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-sX&lt;/span&gt; GET http://localhost:8080/api/v1/topologies | jq 
&lt;span class="o"&gt;[&lt;/span&gt;
  &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"name"&lt;/span&gt;: &lt;span class="s2"&gt;"azkarra.WordCountTopology"&lt;/span&gt;,
    &lt;span class="s2"&gt;"version"&lt;/span&gt;: &lt;span class="s2"&gt;"1.0"&lt;/span&gt;,
    &lt;span class="s2"&gt;"description"&lt;/span&gt;: &lt;span class="s2"&gt;"WordCount topology example"&lt;/span&gt;,
    &lt;span class="s2"&gt;"aliases"&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;
      &lt;span class="s2"&gt;"WordCount"&lt;/span&gt;,
      &lt;span class="s2"&gt;"WordCountTopology"&lt;/span&gt;
    &lt;span class="o"&gt;]&lt;/span&gt;,
    &lt;span class="s2"&gt;"config"&lt;/span&gt;: &lt;span class="o"&gt;{}&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;]&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Finally, let's start a new Kafka Streams instance by submitting the following JSON config :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type:application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-X&lt;/span&gt; POST http://localhost:8080/api/v1/streams &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--data&lt;/span&gt; &lt;span class="s1"&gt;'{"type": "azkarra.WordCountTopology", "version": "1.0",  "env": "__default", "config": {} }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In the command above, we are specifying the type and version of the topology to deploy and the target environment.&lt;/p&gt;

&lt;p&gt;Indeed, Azkarra has a concept of &lt;code&gt;StreamsExecutionEnvironment&lt;/code&gt; which acts as a container for executing streams instances. By default, an environment named &lt;code&gt;__default&lt;/code&gt; is created.&lt;/p&gt;

&lt;p&gt;Note that Azkarra will automatically create any source and sink topics defined by the topology (&lt;code&gt;azkarra.context.auto.create.topics.enable=true&lt;/code&gt;).&lt;/p&gt;

&lt;h2&gt;
  
  
  Exploring Azkarra Web UI
&lt;/h2&gt;

&lt;p&gt;Azkarra ships with an embedded Web UI that lets you get information about the running Kafka Streams applications.&lt;/p&gt;

&lt;p&gt;For example, you can : &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Get details about the threads and tasks of a running streams instance : &lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Q2Qnfniy--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/h2n8472xcuodzil83yyo.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Q2Qnfniy--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/h2n8472xcuodzil83yyo.png" alt="Azkarra WebUI Streams Overview"&gt;&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Visualize the streams topology DAG:&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KYn5M3Xu--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/xde1pi7zx4wksaf8a3ld.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KYn5M3Xu--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/xde1pi7zx4wksaf8a3ld.png" alt="Azkarra WebUI Streams DAG"&gt;&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;List the Kafka Streams metrics:&lt;br&gt;
&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--4P2aci7l--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/gyxndyf61zn4xmg2h3cs.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--4P2aci7l--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/gyxndyf61zn4xmg2h3cs.png" alt="Azkarra WebUI Streams Metrics"&gt;&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Furthermore, the Azkarra Web UI allows you to stop, restart and delete local streams instances.&lt;/p&gt;

&lt;h2&gt;
  
  
  Querying states stores
&lt;/h2&gt;

&lt;p&gt;Finally, Kafka Streams has a great mechanism to query the states materialized by streams applications via REST API calls.&lt;/p&gt;

&lt;p&gt;Let's produce some messages as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;docker &lt;span class="nb"&gt;exec&lt;/span&gt; &lt;span class="nt"&gt;-it&lt;/span&gt; broker /usr/bin/kafka-console-producer &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--topic&lt;/span&gt; streams-plaintext-input &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--broker-list&lt;/span&gt; broker:9092

Azkarra Streams
WordCount
I Heart Logs   
Kafka Streams
Making Sense of Stream Processing
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The following is an example to query the state &lt;code&gt;WordCount&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-sX&lt;/span&gt; POST http://localhost:8080/api/v1/applications/word-count-topology-1-0/stores/WordCount &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;--data&lt;/span&gt; &lt;span class="s1"&gt;'{"query":{"get":{"key": "streams"}},"type":"key_value", "set_options":{}}'&lt;/span&gt; | jq
&lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s2"&gt;"took"&lt;/span&gt;: 1,
  &lt;span class="s2"&gt;"timeout"&lt;/span&gt;: &lt;span class="nb"&gt;false&lt;/span&gt;,
  &lt;span class="s2"&gt;"server"&lt;/span&gt;: &lt;span class="s2"&gt;"azkarra:8080"&lt;/span&gt;,
  &lt;span class="s2"&gt;"status"&lt;/span&gt;: &lt;span class="s2"&gt;"SUCCESS"&lt;/span&gt;,
  &lt;span class="s2"&gt;"result"&lt;/span&gt;: &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="s2"&gt;"success"&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;
      &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="s2"&gt;"server"&lt;/span&gt;: &lt;span class="s2"&gt;"azkarra:8080"&lt;/span&gt;,
        &lt;span class="s2"&gt;"remote"&lt;/span&gt;: &lt;span class="nb"&gt;false&lt;/span&gt;,
        &lt;span class="s2"&gt;"records"&lt;/span&gt;: &lt;span class="o"&gt;[&lt;/span&gt;
          &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="s2"&gt;"key"&lt;/span&gt;: &lt;span class="s2"&gt;"streams"&lt;/span&gt;,
            &lt;span class="s2"&gt;"value"&lt;/span&gt;: 2
          &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="o"&gt;]&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;]&lt;/span&gt;,
    &lt;span class="s2"&gt;"total"&lt;/span&gt;: 1
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;You can also query a state directly through the Azkarra WebUI.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--4MfEL1ek--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/th1xg5202g30sch8519w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--4MfEL1ek--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://thepracticaldev.s3.amazonaws.com/i/th1xg5202g30sch8519w.png" alt="Azkarra Web UI - Interactive Query"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Going further
&lt;/h2&gt;

&lt;p&gt;If you want to read more about using Azkarra Streams, the documentation can be found on &lt;a href="https://streamthoughts.github.io/azkarra-streams/"&gt;GitHub Page&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;The documentation contains a step by step getting started to learn basic concepts of Azkarra.&lt;/p&gt;

&lt;p&gt;The project also contains some &lt;a href="https://github.com/streamthoughts/azkarra-streams/tree/master/azkarra-examples"&gt;examples&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Azkarra Streams is an initiative to enrich the Kafka Streams ecosystem and facilitate its adoption by developers through a lightweight micro-framework.&lt;/p&gt;

&lt;p&gt;We hope this project will be well received by the open-source and Kafka community. Azkarra is still evolving and some features need improvements.&lt;br&gt;
To support Azkarra Streams project, please ⭐ the Github repository or tweet if this project helps you!&lt;/p&gt;

&lt;p&gt;Thank you very much!&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkastreams</category>
      <category>opensource</category>
      <category>java</category>
    </item>
  </channel>
</rss>
