The FilePulse connector from Florian Hussonnois is a really useful connector for Kafka Connect which enables you to ingest flat files including CSV, JSON, XML, etc into Kafka. You can read more it in its overview here. Other connectors for ingested CSV data include kafka-connect-spooldir (which I wrote about previously), and kafka-connect-fs.
Here I’ll show how to use it to stream CSV data into a topic in Confluent Cloud. You can apply the same config pattern to any other secured Kafka cluster.
Run your Kafka Connect worker. Whilst Confluent Cloud offers many different managed connectors FilePulse isn’t yet available on it so you’ll need to run your own Kafka Connect worker connecting to Confluent Cloud which is what I did here
-
You need to pre-create the topic(s) to which the connector is going to write, and also the internal reporting topic that the connector uses.
ccloud kafka topic create --config cleanup.policy=compact,retention.ms=-1,retention.bytes=-1 connect-file-pulse-status ccloud kafka topic create target_csv_topic
Create the connector. This has several points which are worth explaining.
-
This is what I used to specify a given CSV filename in a given folder:
"fs.scan.filters" : "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter", "fs.scan.directory.path" : "/data/ingest/locations/", "fs.scan.interval.ms" : "10000", "file.filter.regex.pattern" : "openraildata-talk-carl-partridge-ukrail_locations.csv",
-
The broker security config needs specifying in the connector (not just the worker). The FilePulse connector uses its own topics to hold information about the file ingest (these are different from the topics that the Kafka Connect distributed workers use) and it needs some configuration for the producer and consumer to connect to the Kafka broker. This is prefixed with
internal.kafka.reporter
. The two standard configuration items are easy enough and apply regardless of whether your Kafka cluster is secured:
"internal.kafka.reporter.bootstrap.servers" "internal.kafka.reporter.topic"
But you also need to specify the security config:
"internal.kafka.reporter.producer.security.protocol" "internal.kafka.reporter.producer.ssl.endpoint.identification.algorithm" "internal.kafka.reporter.producer.sasl.mechanism" "internal.kafka.reporter.producer.sasl.jaas.config" "internal.kafka.reporter.consumer.security.protocol" "internal.kafka.reporter.consumer.ssl.endpoint.identification.algorithm" "internal.kafka.reporter.consumer.sasl.mechanism" "internal.kafka.reporter.consumer.sasl.jaas.config"
-
The connector can infer the schema of the CSV file which is neat. Do this by setting
"filters.ParseLine.type" : "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter", "filters.ParseLine.extractColumnName": "headers", "filters.ParseLine.trimColumn" : "true", "filters.ParseLine.separator" : ",",
-
You can specify a value from the payload to set as the message key, which is very useful. Here I’m using the field called
location_id
:
"filters.setKey.type" : "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter", "filters.setKey.field" : "$key", "filters.setKey.value" : "$value.location_id",
Here’s the full connector configuration REST call. Substitute the CSV details and broker config (CCLOUD_HOST
, CCLOUD_API_KEY
, CCLOUD_API_SECRET
).
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-01/config \
-d '{
"connector.class" : "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"task.reader.class" : "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"fs.scan.filters" : "io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"fs.scan.directory.path" : "/data/ingest/locations/",
"fs.scan.interval.ms" : "10000",
"file.filter.regex.pattern" : "openraildata-talk-carl-partridge-ukrail_locations.csv",
"offset.strategy" : "name",
"skip.headers" : "1",
"topic" : "ukrail-locations",
"fs.cleanup.policy.class" : "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max" : 1,
"filters" : "ParseLine,setKey",
"filters.ParseLine.type" : "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
"filters.ParseLine.extractColumnName": "headers",
"filters.ParseLine.trimColumn" : "true",
"filters.ParseLine.separator" : ",",
"filters.setKey.type" : "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.setKey.field" : "$key",
"filters.setKey.value" : "$value.location_id",
"internal.kafka.reporter.bootstrap.servers" : "CCLOUD_HOST:9092",
"internal.kafka.reporter.topic" : "connect-file-pulse-status",
"internal.kafka.reporter.producer.security.protocol" : "SASL_SSL",
"internal.kafka.reporter.producer.ssl.endpoint.identification.algorithm": "https",
"internal.kafka.reporter.producer.sasl.mechanism" : "PLAIN",
"internal.kafka.reporter.producer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_API_KEY\" password=\"CCLOUD_API_SECRET\";",
"internal.kafka.reporter.producer.request.timeout.ms" : "20000",
"internal.kafka.reporter.producer.retry.backoff.ms" : "500",
"internal.kafka.reporter.consumer.security.protocol" : "SASL_SSL",
"internal.kafka.reporter.consumer.ssl.endpoint.identification.algorithm": "https",
"internal.kafka.reporter.consumer.sasl.mechanism" : "PLAIN",
"internal.kafka.reporter.consumer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CCLOUD_API_KEY\" password=\"CCLOUD_API_SECRET\";",
"internal.kafka.reporter.consumer.request.timeout.ms" : "20000",
"internal.kafka.reporter.consumer.retry.backoff.ms" : "500"
}'
The connector is really powerful, and all the options can make it a bit daunting to get started with - but it’s worth persevering 😄 (and the documentation is comprehensive).
Top comments (0)