Warning: The described procedures have been tested on Spark 2.4.3 and 3.0.1, but otherwise not on all possible environments. Be mindful of what you're doing on your system. Having said that, I'd be grateful for any feedback if you find caveats.
Introduction
Spark Structured Streaming guarantees exactly-once processing for file outputs. One element to maintain that guarantee is a folder called _spark_metadata
which is located in the output folder. The folder _spark_metadata
is also known as the "Metadata Log" and its files "Metadata log files". It may look like this:
/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
A metadata log file may look like this:
v1
{"path":"file:///tmp/destination/part-00000-5ee05bb5-3c65-4028-9c9e-dbc99f5fdbca.c000.snappy.parquet","size":3919,"isDir":false,"modificationTime":1615462080000,"blockReplication":1,"blockSize":33554432,"action":"add"}
When Spark writes a file to the output folder, it writes the absolute path of the added file to the metadata log file of the current micro-batch.
If a partial write occurs, that filename will not be added to the metadata log, and that's how Spark can maintain exactly-once semantics.
When Spark reads a file from the output folder, it only reads from files that are referenced in the metadata log. At least that's the idea. For more details on that topic, see https://dev.to/kevinwallimann/is-structured-streaming-exactly-once-well-it-depends-noe
Deleting the _spark_metadata
folder
I hope it's clear by now that this folder should not be deleted. It should not be deleted!
Anyway, let's see what happens if we delete it nonetheless.
For this scenario, let's assume we have a structured streaming query, writing to a folder called /tmp/destination
and a checkpoint folder called /tmp/checkpoint-location
. After two micro-batches, the folder structure for the checkpoint-folder and the _spark_metadata
folder looks like this:
/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0
/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
Now for some reason, the _spark_metadata
folder in the destination is deleted or moved, but not the corresponding checkpoints folder.
The following exception will be thrown sooner or later:
Caused by: java.lang.IllegalStateException: /tmp/destination/_spark_metadata/0 doesn't exist when compacting batch 9 (compactInterval: 10)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$3(CompactibleFileStreamLog.scala:187)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2(CompactibleFileStreamLog.scala:185)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2$adapted(CompactibleFileStreamLog.scala:183)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$1(CompactibleFileStreamLog.scala:183)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:181)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:75)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
Looking at the checkpoint folder, we see the following files
/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/commits/2
/tmp/checkpoint-location/commits/3
/tmp/checkpoint-location/commits/4
/tmp/checkpoint-location/commits/5
/tmp/checkpoint-location/commits/6
/tmp/checkpoint-location/commits/7
/tmp/checkpoint-location/commits/8
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/offsets/2
/tmp/checkpoint-location/offsets/3
/tmp/checkpoint-location/offsets/4
/tmp/checkpoint-location/offsets/5
/tmp/checkpoint-location/offsets/6
/tmp/checkpoint-location/offsets/7
/tmp/checkpoint-location/offsets/8
/tmp/checkpoint-location/offsets/9
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0
Meanwhile, the destination folder contains
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
As we can see, the _spark_metadata
folder is missing the files 0
and 1
, that were previously deleted.
Instead of simply writing /tmp/destination/_spark_metadata/9
, Spark tries to concatenate the files 0
, 1
, ..., 8
to a file called 9.compact
to improve reading efficiency and to avoid the small files problem. This process is called log compaction. That's when the exception is thrown because the files 0
and 1
unexpectedly don't exist. Log compaction doesn't happen in every micro-batch, but the frequency is determined by the compactInterval
which is 10 by default.
How to fix the problem
1. Restore the files of the removed _spark_metadata
folder
If the deleted _spark_metadata
folder has only been moved and can be restored, its files should be restored.
The files of the deleted _spark_metadata
folder should be moved into the new _spark_metadata
folder. There should be no overlapping filenames.
After restoring the files, the _spark_metadata
folder should look like this
/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
Now, the query can be restarted and should finish without errors.
2. Create dummy log files
If the metadata log files are irrecoverable, we could create dummy log files for the missing micro-batches.
In our example, this could be done like this
for i in {0..1}; do echo v1 > "/tmp/destination/_spark_metadata/$i"; done
or on HDFS
for i in {0..1}; do echo v1 > "/tmp/$i"; hdfs dfs -copyFromLocal "/tmp/$i" "/tmp/destination/_spark_metadata/$i"; done
This will create the files
/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
Now, the query can be restarted and should finish without errors.
Note that the information from the metadata log files 0
and 1
will definitely be lost, hence the exactly-once guarantee is lost for micro-batches 0 and 1, and you need to address this problem separately, but at least the query can continue.
3. Deferring compaction
If it's the middle of the night and you simply need that query to continue, or you have no write access to the filesystem, you can buy yourself some time by deferring
the compaction. However, this solution does not solve the root cause.
By default, the compactInterval
is 10. You can increase it to e.g. 100 by restarting the query with this additional config
spark-submit --conf spark.sql.streaming.fileSink.log.compactInterval=100
The same exception will be thrown in 100 micro-batches, so this is really just a very temporary fix to keep the query running for a few more micro-batches.
Eventually, the missing log files have to be recreated.
Top comments (3)
Great article.. two cents I would like to add:
In any of the methods mentioned here, It only removes/defers the error for the spark producer job (one writing data on s3). But any consumer job who want to read the data already written on s3, will still face one of the issues mentioned below:
1. If you create the blank 0 file
2. If you don't create the blank file:
Hi @gupash
Thanks for your comment.
Indeed if you create a blank 0 file, it will throw the error that you posted. However, the dummy log file that I described in the article contains the string "v1". In that case, no error should be thrown on the reader's side. Maybe I could have pointed out this fact more clearly.
I was missing files 0 through 5 and I just copied 6 and renamed to 0 to 5 and that worked.