For whatever reason, CSV still exists as a ubiquitous data interchange format. It doesn’t get much simpler: chuck some plaintext with fields separated by commas into a file and stick .csv
on the end. If you’re feeling helpful you can include a header row with field names in.
order_id,customer_id,order_total_usd,make,model,delivery_city,delivery_company,delivery_address
1,535,190899.73,Dodge,Ram Wagon B350,Sheffield,DuBuque LLC,2810 Northland Avenue
2,671,33245.53,Volkswagen,Cabriolet,Edinburgh,Bechtelar-VonRueden,1 Macpherson Crossing
In this article we’ll see how to load this CSV data into Kafka, without even needing to write any code
Importantly, we’re not going to reinvent the wheel by trying to write some code to do it ourselves - Kafka Connect (which is part of Apache Kafka) already exists to do all of this for us; we just need the appropriate connector.
Schemas?
Yeah, schemas. CSV files might not care about them much, but the users of your data in Kafka will. Ideally we want a way to define the schema of the data that we ingest so that it can be stored and read by anyone who wants to use the data. To understand why this is such a big deal check out:
Streaming Microservices: Contracts & Compatibility (InfoQ talk)
Confluent Platform Now Supports Protobuf, JSON Schema, and Custom Formats (blog)
If you are going to define a schema at ingest (and I hope you do), use Avro, Protobuf, or JSON Schema, as described here.
Kafka Connect SpoolDir connector
The Kafka Connect SpoolDir connector supports various flatfile formats, including CSV. Get it from Confluent Hub, and check out the docs here. Once you’ve installed it in your Kafka Connect worker make sure you restart the worker for it to pick it up. You can check by running:
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'SpoolDir'
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector"
Loading data from CSV into Kafka and applying a schema
If you have a header row with field names you can take advantage of these to define the schema at ingestion time (which is a good idea).
Create the connector:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_00",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\\.csv",
"schema.generation.enabled":"true",
"csv.first.row.as.header":"true"
}'
Now head over to a Kafka consumer and observe our data. Here I’m using kafkacat cos it’s great :)
$ docker exec kafkacat \
kafkacat -b kafka:29092 -t orders_spooldir_00 \
-C -o-1 -J \
-s key=s -s value=avro -r http://schema-registry:8081 | \
jq '.payload'
{
"order_id": {
"string": "500"
},
"customer_id": {
"string": "424"
},
"order_total_usd": {
"string": "160312.42"
},
"make": {
"string": "Chevrolet"
},
"model": {
"string": "Suburban 1500"
},
"delivery_city": {
"string": "London"
},
"delivery_company": {
"string": "Predovic LLC"
},
"delivery_address": {
"string": "2 Sundown Drive"
}
}
What’s more, in the header of the Kafka message is the metadata from the file itself:
$ docker exec kafkacat \
kafkacat -b kafka:29092 -t orders_spooldir_00 \
-C -o-1 -J \
-s key=s -s value=avro -r http://schema-registry:8081 | \
jq '.headers'
[
"file.name",
"orders.csv",
"file.path",
"/data/unprocessed/orders.csv",
"file.length",
"39102",
"file.offset",
"501",
"file.last.modified",
"2020-06-17T13:33:50.000Z"
]
Setting the message key
Assuming you have header row to provide field names, you can set schema.generation.key.fields
to the name of the field(s) you’d like to use for the Kafka message key. If you’re running this after the first example above remember that the connector relocates your file so you need to move it back to the input.path
location for it to be processed again.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-01/config \
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_01",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\\.csv",
"schema.generation.enabled":"true",
"schema.generation.key.fields":"order_id",
"csv.first.row.as.header":"true"
}'
The resulting Kafka message has the order_id
set as the message key:
docker exec kafkacat \
kafkacat -b kafka:29092 -t orders_spooldir_01 -o-1 \
-C -J \
-s key=s -s value=avro -r http://schema-registry:8081 | \
jq '{"key":.key,"payload": .payload}'
{
"key": "Struct{order_id=3}",
"payload": {
"order_id": {
"string": "3"
},
"customer_id": {
"string": "695"
},
"order_total_usd": {
"string": "155664.90"
},
"make": {
"string": "Toyota"
},
"model": {
"string": "Avalon"
},
"delivery_city": {
"string": "Brighton"
},
"delivery_company": {
"string": "Jacobs, Ebert and Dooley"
},
"delivery_address": {
"string": "4 Loomis Crossing"
}
}
}
Changing the schema field types
The connector does a fair job at setting the schema, but maybe you want to override it. You can declare the whole thing upfront using the value.schema
configuration, but perhaps you are happy with it inferring the whole schema except for a couple of fields. Here you can use Single Message Transform to munge it:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-02/config \
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_02",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\\.csv",
"schema.generation.enabled":"true",
"schema.generation.key.fields":"order_id",
"csv.first.row.as.header":"true",
"transforms":"castTypes",
"transforms.castTypes.type":"org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec":"order_id:int32,customer_id:int32,order_total_usd:float32"
}'
If you go and look at the schema that’s been created and stored in the Schema Registry you can see the field data types have been set as specified:
➜ curl --silent --location --request GET 'http://localhost:8081/subjects/orders_spooldir_02-value/versions/latest' |jq '.schema|fromjson'
{
"type": "record", "name": "Value", "namespace": "com.github.jcustenborder.kafka.connect.model",
"fields": [
{ "name": "order_id", "type": ["null", "int"], "default": null },
{ "name": "customer_id", "type": ["null", "int"], "default": null },
{ "name": "order_total_usd", "type": ["null", "float"], "default": null },
{ "name": "make", "type": ["null", "string"], "default": null },
{ "name": "model", "type": ["null", "string"], "default": null },
{ "name": "delivery_city", "type": ["null", "string"], "default": null },
{ "name": "delivery_company", "type": ["null", "string"], "default": null },
{ "name": "delivery_address", "type": ["null", "string"], "default": null }
],
"connect.name": "com.github.jcustenborder.kafka.connect.model.Value"
}
Just gimme the plain text! 😢
All of this schemas seems like a bunch of fuss really, doesn’t it? Well not really. But, if you absolutely must just have CSV in your Kafka topic then here’s how. Note that we’re using a different connector class and we’re using org.apache.kafka.connect.storage.StringConverter
to write the values. If you want to learn more about serialisers and converters see here.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-03/config \
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"topic": "orders_spooldir_03",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\\.csv"
}'
The result? Just CSV.
➜ docker exec kafkacat \
kafkacat -b kafka:29092 -t orders_spooldir_03 -o-5 -C -u -q
496,456,80466.80,Volkswagen,Touareg,Leeds,Hilpert-Williamson,96 Stang Junction
497,210,57743.67,Dodge,Neon,London,Christiansen Group,7442 Algoma Hill
498,88,211171.02,Nissan,370Z,York,"King, Yundt and Skiles",3 1st Plaza
499,343,126072.73,Chevrolet,Camaro,Sheffield,"Schiller, Ankunding and Schumm",8920 Hoffman Place
500,424,160312.42,Chevrolet,Suburban 1500,London,Predovic LLC,2 Sundown Drive
Side-bar: Schemas in action
So we’ve read some CSV data into Kafka. That’s not the end of its journey. It’s going to be used for something! Let’s do that.
Here’s ksqlDB, in which we declare the orders topic we wrote to with a schema as a stream:
ksql> CREATE STREAM ORDERS_02 WITH (KAFKA_TOPIC='orders_spooldir_02',VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
Having done that—and because there’s a schema that was created at ingestion time—we can see all of the fields available to us:
ksql> DESCRIBE ORDERS_02;
Name : ORDERS_02
Field | Type
-------------------------------------------
ROWKEY | VARCHAR(STRING) (key)
ORDER_ID | INTEGER
CUSTOMER_ID | INTEGER
ORDER_TOTAL_USD | DOUBLE
MAKE | VARCHAR(STRING)
MODEL | VARCHAR(STRING)
DELIVERY_CITY | VARCHAR(STRING)
DELIVERY_COMPANY | VARCHAR(STRING)
DELIVERY_ADDRESS | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>
and run queries against the data that’s in Kafka:
ksql> SELECT DELIVERY_CITY, COUNT(*) AS ORDER_COUNT, MAX(CAST(ORDER_TOTAL_USD AS DECIMAL(9,2))) AS BIGGEST_ORDER_USD FROM ORDERS_02 GROUP BY DELIVERY_CITY EMIT CHANGES;
+---------------+-------------+---------------------+
|DELIVERY_CITY |ORDER_COUNT |BIGGEST_ORDER_USD |
+---------------+-------------+---------------------+
|Bradford |13 |189924.47 |
|Edinburgh |13 |199502.66 |
|Bristol |16 |213830.34 |
|Sheffield |74 |216233.98 |
|London |160 |219736.06 |
What about our data that we just ingested into a different topic as straight-up CSV? Because, like, schemas aren’t important?
ksql> CREATE STREAM ORDERS_03 WITH (KAFKA_TOPIC='orders_spooldir_03',VALUE_FORMAT='DELIMITED');
No columns supplied.
Yeah, no columns supplied. No schema, no bueno. If you want to work with the data, whether to query in SQL, stream to a data lake, or do anything else with—at some point you’re going to have to declare that schema. Hence why CSV, as a schemaless-serialisation method, is a bad way to exchange data between systems.
If you really want to use your CSV data in ksqlDB, you can, you just need to enter the schema—which is error prone and tedious. You enter it each time to use the data, every other consumer of the data enters it each time too. Declaring it once at ingest and it being available for all to use makes a lot more sense.
Regex and JSON
If you’re using the REST API to submit configuration you might hit up against errors sending regex values within the JSON. For example, if you want to set input.file.pattern
to .*\.csv
and you put that in your JSON literally:
"input.file.pattern": ".*\.csv",
You’ll get this error back if you submit it as inline data with curl
:
com.fasterxml.jackson.core.JsonParseException: Unrecognized character escape '.' (code 46) at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 7, column: 36]
THe solution is to escape the escape character (the backslash):
"input.file.pattern": ".*\\.csv",
Streaming CSV data from Kafka to a database (or anywhere else…)
Since you’ve got a schema to the data, you can easily sink it to a database, such as Postgres:
curl -X PUT http://localhost:8083/connectors/sink-postgres-orders-00/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "postgres",
"tasks.max": "1",
"topics": "orders_spooldir_02",
"auto.create": "true",
"auto.evolve":"true",
"pk.mode":"record_value",
"pk.fields":"order_id",
"insert.mode": "upsert",
"table.name.format":"orders"
}'
postgres=# \dt
List of relations
Schema | Name | Type | Owner
--------+--------+-------+----------
public | orders | table | postgres
(1 row)
postgres=# \d orders;
Table "public.orders"
Column | Type | Collation | Nullable | Default
------------------+---------+-----------+----------+---------
order_id | integer | | not null |
customer_id | integer | | |
order_total_usd | real | | |
make | text | | |
model | text | | |
delivery_city | text | | |
delivery_company | text | | |
delivery_address | text | | |
Indexes:
"orders_pkey" PRIMARY KEY, btree (order_id)
postgres=# SELECT * FROM orders FETCH FIRST 10 ROWS ONLY;
order_id | customer_id | order_total_usd | make | model | delivery_city | delivery_company | delivery_address
----------+-------------+-----------------+------------+----------------+---------------+--------------------------+--------------------------
1 | 535 | 190899.73 | Dodge | Ram Wagon B350 | Sheffield | DuBuque LLC | 2810 Northland Avenue
2 | 671 | 33245.53 | Volkswagen | Cabriolet | Edinburgh | Bechtelar-VonRueden | 1 Macpherson Crossing
3 | 695 | 155664.9 | Toyota | Avalon | Brighton | Jacobs, Ebert and Dooley | 4 Loomis Crossing
4 | 366 | 149012.9 | Hyundai | Santa Fe | Leeds | Kiehn Group | 538 Burning Wood Alley
5 | 175 | 63274.18 | Kia | Sportage | Leeds | Miller-Hudson | 6 Kennedy Court
6 | 37 | 97790.04 | BMW | 3 Series | Bristol | Price Group | 21611 Morning Trail
7 | 644 | 76240.84 | Mazda | MPV | Leeds | Kihn and Sons | 9 Susan Street
8 | 973 | 216233.98 | Hyundai | Elantra | Sheffield | Feeney, Howe and Koss | 07671 Hazelcrest Terrace
9 | 463 | 162589.1 | Chrysler | Grand Voyager | York | Fay, Murazik and Schumm | 42080 Pawling Circle
10 | 863 | 111208.24 | Ford | Laser | Leeds | Boehm, Mohr and Doyle | 0919 International Trail
(10 rows)
To learn more about writing data from Kafka to a database see this tutorial.
For more tutorials on Kafka Connect see 🎥 this playlist.
Try it out!
All the code for this article is on GitHub, and you just need Docker and Docker Compose to spin it up and give it a try. The commandline examples quoted below are based on the Docker environment.
To spin it up, clone the repository, change to the correct folder, and launch the stack:
git clone https://github.com/confluentinc/demo-scene.git
cd csv-to-kafka
docker-compose up -d
Wait for Kafka Connect to launch and then off you go!
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200] ; do
echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
The examples in this article are based on the data
folder mapped to /data
on the Kafka Connect worker.
Top comments (0)