DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

Skipping bad records in a Kafka Connect sink connector

The Kafka Connect framework provides generic error handling and dead-letter queue capabilities which are available for problems with [de]serialisation and Single Message Transforms. When it comes to errors that a connector may encounter doing the actual pull or put of data from the source/target system, it’s down to the connector itself to implement logic around that. For example, the Elasticsearch sink connector provides configuration (behavior.on.malformed.documents) that can be set so that a single bad record won’t halt the pipeline. Others, such as the JDBC Sink connector, don’t provide this yet. That means that if you hit this problem, you need to manually unblock it yourself. One way is to manually move the offset of the consumer on past the bad message.

TL;DR : You can use kafka-consumer-groups --reset-offsets --to-offset <x> to manually move the connector past a bad message

In this article I'll use the JDBC sink connector as an example, but the concept (changing the consumer group offset) applies to other sink connectors too.

Try it out!

Create the connector

curl -X PUT http://localhost:8083/connectors/sink_postgres_foo_00/config -H "Content-Type: application/json" -d '{
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url": "jdbc:postgresql://postgres:5432/",
      "connection.user": "postgres",
      "connection.password": "postgres",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "true",
      "tasks.max": "1",
      "topics": "foo",
      "auto.create": "true",
      "auto.evolve":"true",
      "pk.mode":"none"          
    }'

Send a message to the topic

kafkacat -b localhost:9092 -t foo -P <<EOF
{ "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
EOF

Confirm that the data’s on the topic:

$ kafkacat -b localhost:9092 -t foo -C -f 'Topic: %t\nPartition: %p\nOffset: %o\nKey: %k\nPayload: %S bytes: %s\n--\n'

Topic: foo
Partition: 0
Offset: 0
Key:
Payload: 543 bytes: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--

Check the connector status:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink | sink_postgres_foo_00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSinkConnector

Confirm there’s data in the target DB:

postgres=# \dt
        List of relations
 Schema | Name | Type | Owner
--------+------+-------+----------
 public | foo | table | postgres
(1 row)

postgres-# \d foo
                            Table "public.foo"
  Column | Type | Collation | Nullable | Default
-----------+-----------------------------+-----------+----------+---------
 update_ts | timestamp without time zone | | not null |
 create_ts | timestamp without time zone | | not null |
 c1 | integer | | not null |
 c2 | text | | not null |

postgres=# select * from "foo";
      update_ts | create_ts | c1 | c2
---------------------+---------------------+-------+-----
 2017-08-04 08:09:26 | 2017-08-04 08:09:26 | 10000 | bar
(1 row)

Here comes the problem

Let’s send another message to the topic, but omit one of the fields (c2):

kafkacat -b localhost:9092 -t foo -P <<EOF
{ "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "create_ts": 1501834166000, "update_ts": 1501834166000 } }
EOF

If we look at the status of the connector we can see that the task has FAILED:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink | sink_postgres_foo_00 | RUNNING | FAILED | io.confluent.connect.jdbc.JdbcSinkConnector

And the Kafka Connect worker log shows a problem:

[2019-10-15 08:30:34,412] ERROR [sink_postgres_foo_00|task-0] WorkerSinkTask{id=sink_postgres_foo_00-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "foo"("c1","create_ts","update_ts") VALUES(10000,'2017-08-04 08:09:26+00','2017-08-04 08:09:26+00') was aborted: ERROR: null value in column "c2" violates not-null constraint
  Detail: Failing row contains (2017-08-04 08:09:26, 2017-08-04 08:09:26, 10000, null). Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: null value in column "c2" violates not-null constraint
  Detail: Failing row contains (2017-08-04 08:09:26, 2017-08-04 08:09:26, 10000, null).
org.postgresql.util.PSQLException: ERROR: null value in column "c2" violates not-null constraint
  Detail: Failing row contains (2017-08-04 08:09:26, 2017-08-04 08:09:26, 10000, null).
   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
   ... 10 more
[2019-10-15 08:30:34,413] ERROR [sink_postgres_foo_00|task-0] WorkerSinkTask{id=sink_postgres_foo_00-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

Distilling this down gives us:

Batch entry 0 INSERT INTO "foo"("c1","create_ts","update_ts") VALUES(10000,'2017-08-04 08:09:26+00','2017-08-04 08:09:26+00') was aborted: 
ERROR: null value in column "c2" violates not-null constraint

Because we omitted field c2 from our payload, and it’s NOT NULL in the target schema, the message cannot be written, and the Connect sink task aborts.

What about if we send another, valid, message to the topic:

kafkacat -b localhost:9092 -t foo -P <<EOF
{ "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10001, "c2": "bar2", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
EOF

Restart the connector’s failed task:

curl -X POST http://localhost:8083/connectors/sink_postgres_foo_00/tasks/0/restart

It’s up…

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink | sink_postgres_foo_00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSinkConnector

but soon…it’s down

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink | sink_postgres_foo_00 | RUNNING | FAILED | io.confluent.connect.jdbc.JdbcSinkConnector

The worker log shows the same error as before - ERROR: null value in column "c2" violates not-null constraint.

Of the three messages on the topic, we’ve got a 'poison pill' which has broken our pipeline 😿. Each time we restart the connector, it will start from where it got to last time and so fall over again—regardless of how many 'good' messages may come after it. The connector will only consider a message actually read once it has successfully written it to the target, which makes sense if you think about it from a data integrity point of view—but does land us with this problem here.

What to do?

There are a few options:

  1. If we were using Avro then it would be harder to break things, because schema compatibility can be enforced and bad messages would be rejected when being produced on to the topic.

  2. We could write a stream processing job to take the source topic foo and write all valid messages from it to a new topic (e.g. foo_good) and hook our JDBC sink up to that instead.

  3. Use the consumer group mechanism to just skip the bad message for the connector

Which you use depends on how the problem arose. For example, one-off problems could be addressed by option #3, but it’s very manual and could be error-prone if you’re not careful. Option #2 is appropriate if you’re dealing with third-parties and you have on-going data quality issues. #1, using Avro, is always a good idea, regardless!

Manually skipping bad messages

Each sink connector in Kafka Connect has its own consumer group, with the offset persisted in Kafka itself (pretty clever, right). This is also why if you delete a connector and recreate it with the same name you’ll find it starts from where the previous instance got to.

You can view consumer groups using the kafka-consumer-groups command:

$ kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --list
connect-sink_postgres_00
_confluent-ksql-confluent_rmoff_01query_CSAS_JDBC_POSTGRES_TRANSACTIONS_GBP_2
_confluent-ksql-confluent_rmoff_01query_CSAS_JDBC_POSTGRES_TRANSACTIONS_NO_CUSTOMERID_1
connect-sink_postgres_foo_00
connect-SINK_ES_04
_confluent-ksql-confluent_rmoff_01transient_2925897355317205962_1571058964212
_confluent-controlcenter-5-4-0-1
connect-SINK_ES_03
_confluent-controlcenter-5-4-0-1-command
connect-SINK_ES_02
connect-SINK_ES_01

There are various ones there, but we’re interested in the one with a connect- prefix that matches our connector name, connect-sink_postgres_foo_00

$ kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --describe \
    --group connect-sink_postgres_foo_00

Consumer group 'connect-sink_postgres_foo_00' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-sink_postgres_foo_00 foo 0 1 3 2 - - -

You can see from this that the current offset is 1, and there are two more messages to be read (one of which is the 'poison-pill').

kafkacat is a fantastic tool for this kind of debugging, because we can directly relate offsets with the messages themselves:

$ kafkacat -b localhost:9092 -t foo -C -f 'Offset: %o\nPayload: %s\n--\n'
Offset: 0
Payload: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--
Offset: 1
Payload: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--
Offset: 2
Payload: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10001, "c2": "bar2", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--
% Reached end of topic foo [0] at offset 3

So at offset 0 is the good message which Connect read, thus the current offset is 1. When the connector restarts from its failure it will be at offset 1, which is the 'bad' message. The end of the topic currently is offset 3, i.e. the position after the third message which is at offset 2 (zero-based offsets).

What we want to do is tell Kafka Connect to resume from the next-good message, which we can see from kafkacat above is at offset 2.

kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --group connect-sink_postgres_foo_00 \
    --reset-offsets \
    --topic foo \
    --to-offset 2 \
    --execute

GROUP TOPIC PARTITION NEW-OFFSET
connect-sink_postgres_foo_00 foo 0 2

Now we can restart the failed task:

curl -X POST http://localhost:8083/connectors/sink_postgres_foo_00/tasks/0/restart

and this time the connector stays running:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink | sink_postgres_foo_00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSinkConnector

and in Postgres we get the new rows of data (except for the bad one, which is lost to us):

postgres=# select * from "foo";
      update_ts | create_ts | c1 | c2
---------------------+---------------------+-------+------
 2017-08-04 08:09:26 | 2017-08-04 08:09:26 | 10000 | bar
 2017-08-04 08:09:26 | 2017-08-04 08:09:26 | 10001 | bar2
(2 rows)

Top comments (0)