TLDR
- Yes, but only for the file sink. Not for the Kafka sink nor the Foreach sink
- It depends on how you read from the sink, whether you get exactly-once semantics or not
- If you read using a globbed path or read directly from partition subdirectory, exactly-once semantics is not applied.
Exactly-once semantics depends on the reader
One of the key features of Spark Structured Streaming is its support for exactly-once semantics, meaning that no row will be missing or duplicated in the sink after recovery from failure.
As per the documentation, the feature is only available for the file sink, while the Kafka sink and Foreach sink only support at-least-once semantics (https://spark.apache.org/docs/3.0.0/structured-streaming-programming-guide.html#output-sinks)
Let's demonstrate exactly-once semantics using a spark-shell:
First, we'll write some streaming data to a destination. We add a literal column and partition by it just for the sake of having a partition subdirectory. Finally, we repartition the dataframe just to get multiple parquet files in the output.
scala> import org.apache.spark.sql.execution.streaming.MemoryStream
scala> import org.apache.spark.sql.streaming.Trigger
scala> import org.apache.spark.sql.functions._
scala> val input = MemoryStream[Int](1, spark.sqlContext)
scala> input.addData(1 to 100)
scala> val df = input.toDF().
| withColumn("partition1", lit("value1")).
| repartition(4)
scala> val query = df.writeStream.
| partitionBy("partition1")
| trigger(Trigger.Once).
| option("checkpointLocation", "/tmp/checkpoint").
| format(source="parquet").
| start("/tmp/destination")
We can go ahead and count those values
scala> query.awaitTermination()
scala> spark.read.parquet("/tmp/destination").count
res1: Long = 100
As expected, we get 100 as the result
We should now see 4 parquet files in the destination and one file in the metadata log, e.g. like this
% ls -R /tmp/destination
_spark_metadata partition1=value1
/tmp/destination/_spark_metadata:
0
/tmp/destination/partition1=value1:
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet
part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet
part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet
part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet
The metadata log file should reference exactly the four files, like this:
% cat /tmp/destination/_spark_metadata/0
v1
{"path":"file:///tmp/destination/partition1=value1/part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
We can simulate a partial write by copying one of the four dataframes. Now, we have 5 parquet files in the destination:
% cd /tmp/destination/partition1=value1
% cp part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000-copy.snappy.parquet
% ls /tmp/destination/partition1=value1
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000-copy.snappy.parquet
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet
part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet
part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet
part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet
Reading with globbed paths
Exactly-once semantics guarantees that we will still only read 100 rows. Let's check that
scala> spark.read.parquet("/tmp/destination").count
res2: Long = 100
As expected, we get 100.
What about this query (notice the star)
scala> spark.read.parquet("/tmp/destination/*").count
res3: Long = 125
Well, that's why exactly-once semantics depends on how you read. As shown above, the destination directory does contain 5 parquet files. When reading without the globbed path, spark consults the _spark_metadata directory (aka metadata log) and only reads from the parquet files that are listed there. That's not the case with globbed paths. The metadata log is not consulted, hence exactly-once semantics does not apply when reading with globbed paths and we read duplicated data.
Reading from partition subdirectory
What about filtering by the partition? All of our values are in the same partition, so we should count 100 elements when we filter for it:
scala> spark.read.parquet("/tmp/destination").filter("partition1='value1'").count
res4: Long = 100
And indeed, it works as expected. Now, in non-streaming Spark you could also read directly from the partition subdirectory and arrive at the same result. Does this work with streaming as well?
scala> spark.read.parquet("/tmp/destination/partition1=value1").count
res5: Long = 125
No. It's the same reason as above, Spark does not consider the metadata log when you read from a subdirectory and therefore cannot determine if any of the parquet files are from partial writes and possible duplicates.
Conclusion
As we have seen, the exactly-once semantics is only guaranteed when the _spark_metadata directory is considered. This means that it depends on the reader whether or not exactly-once semantics is applied. In the case of Spark, it is only considered when you read from the root directory, without globbed paths. Whether this behaviour is a bug or a feature is not entirely clear to me. In practice, parquet files from partial writes should occur only rarely since Spark 3.0, as a best-effort cleanup in case of task abortions has been implemented (https://issues.apache.org/jira/browse/SPARK-27210). However, it's important to stress that this is only a best-effort and not a guaranteed cleanup.
Top comments (0)