Kafka topics should not routinely be deleted and recreated or offsets reset. Should it be necessary, care must be taken how and when to update the offsets in Spark Structured Streaming's checkpoints, in order to avoid data loss.
Since such an offset reset happens outside of Spark, the offsets in the checkpoints are obviously not automatically updated to reflect the change. This may cause unexpected behaviour, because the offsets are not in the expected range.
When and how data loss may occur
Let's assume a Spark Structured Streaming query with a Once-Trigger consumed 500 records from Kafka topic test-topic
.
The checkpointed offset for microbatch 0 contains this
{checkpoint-dir}/offsets/0:
{"test-topic":{"0":500}}
On the Kafka topic, the beginning offset is 0 and the end offset is 500.
So far, so good.
New end offsets < checkpointed offset
Now, let's assume the Kafka topic has been deleted and recreated and 300 messages produced, such that the new end offset on the topic is 300. If the Spark query is restarted, it will fail with the following error message:
java.lang.IllegalStateException: Partition test-topic-0's offset was changed from 500 to 300, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
Based on its checkpoint from batch 0, Spark would have expected messages with offsets 500 and higher, but only found lower offsets. Spark does not know if all messages have been consumed before the offset reset and by default assumes that messages could have been lost, failing with the above exception.
What happens if the query is rerun with failOnDataLoss=false
?
This time, Spark only prints a warning
WARN KafkaMicroBatchReader: Partition test-topic-0's offset was changed from 500 to 300, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true".
In the checkpoint folder, a new file for batchId 1 is created, which contains
{checkpoint-dir}/offsets/1:
{"test-topic":{"0":300}}
But how many records were consumed?
Zero.
22/07/13 10:29:59 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "5762b467-2d58-4b62-937b-427f99b38659",
"runId" : "fc98564a-f2f0-42be-91e2-6d1f97446372",
"name" : null,
"timestamp" : "2022-07-13T08:29:57.863Z",
"batchId" : 1,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 709,
"getBatch" : 45,
"queryPlanning" : 301,
"triggerExecution" : 1372
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[test]]",
"startOffset" : {
"test-topic" : {
"0" : 500
}
},
"endOffset" : {
"test-topic" : {
"0" : 300
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider@43bbf133"
}
}
As you can see from the log, Spark takes 500 as the start offset for this microbatch. So it takes the offset of the previous checkpoint and not the current beginning offset of the topic-partition. From that point of view, it makes sense that no record is ingested, but messages 0-300 are lost nonetheless.
New end offsets > checkpointed offset
It's also possible that the new end offset is greater than the offset of the latest checkpoint. For example, let's assume that the new end offset is 800. In this case, the user probably expects to ingest the 800 new records. However, if the Spark query is restarted, it will succeed, but only ingest the 300 records from offsets 500-800. The log may look like this
22/07/13 16:39:46 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "5762b467-2d58-4b62-937b-427f99b38659",
"runId" : "1822bd34-269c-4df0-8dbe-b63e19df0e77",
"timestamp" : "2022-07-13T14:39:43.074Z",
"batchId" : 2,
"numInputRows" : 300,
"processedRowsPerSecond" : 96.32674030310815,
"durationMs" : {
"addBatch" : 2747,
"getBatch" : 7,
"getEndOffset" : 0,
"queryPlanning" : 315,
"setOffsetRange" : 331,
"triggerExecution" : 3892,
"walCommit" : 213
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[test]]",
"startOffset" : {
"test-topic" : {
"0" : 500
}
},
"endOffset" : {
"test-topic" : {
"0" : 800
}
},
"numInputRows" : 300,
"processedRowsPerSecond" : 96.32674030310815
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider@4554d7c4"
}
}
Spark will not even print a warning, because from Spark's perspective, it does not know that offsets have been reset, and it looks just like 300 new records have been added to the topic, without any offset reset happening in between.
How to avoid data loss
We have seen that in both cases, whether the new end offsets are smaller or larger than the checkpointed offset, data loss may occur. Fundamentally, Spark cannot distinguish between offsets before or after a recreation of a topic, so especially for the latter case, where the new end offset on the topic-partition is larger than the latest offset in the checkpoint, there is no general solution. One could try to manually modify the latest offset in the checkpoint or delete the checkpoints altogether, however this may require deleting the _spark_metadata
folder in case of a file sink.
A special case is if the new end offset is 0. In that case, there can be no data loss, because there is no new data on the topic yet. A possible strategy to perform a topic deletion and recreation could therefore be:
- Make sure all data has been ingested from the topic.
- Delete and recreate the topic.
- Restart the Spark Structured Streaming query that consumes from the topic. Spark will write a new checkpoint with offset 0.
- Only now start producing to the recreated topic.
- In the next microbatch, Spark will consume from offset 0.
Conclusion
- The offsets in Spark Structured Streaming's checkpoints are not automatically updated when a offset reset happens on a Kafka topic.
- Queries with Once-Triggers that are restarted periodically may be oblivious to an offset reset
- The best way to keep Spark's offsets up-to-date is to restart the query before any new data has been published on the reset topic.
Top comments (0)