DEV Community

Kevin Wallimann
Kevin Wallimann

Posted on

Is Structured Streaming Exactly-Once? Well, it depends...

TLDR

  • Yes, but only for the file sink. Not for the Kafka sink nor the Foreach sink
  • It depends on how you read from the sink, whether you get exactly-once semantics or not
  • If you read using a globbed path or read directly from partition subdirectory, exactly-once semantics is not applied.

Exactly-once semantics depends on the reader

One of the key features of Spark Structured Streaming is its support for exactly-once semantics, meaning that no row will be missing or duplicated in the sink after recovery from failure.

As per the documentation, the feature is only available for the file sink, while the Kafka sink and Foreach sink only support at-least-once semantics (https://spark.apache.org/docs/3.0.0/structured-streaming-programming-guide.html#output-sinks)

Let's demonstrate exactly-once semantics using a spark-shell:
First, we'll write some streaming data to a destination. We add a literal column and partition by it just for the sake of having a partition subdirectory. Finally, we repartition the dataframe just to get multiple parquet files in the output.

scala> import org.apache.spark.sql.execution.streaming.MemoryStream
scala> import org.apache.spark.sql.streaming.Trigger
scala> import org.apache.spark.sql.functions._
scala> val input = MemoryStream[Int](1, spark.sqlContext)
scala> input.addData(1 to 100)
scala> val df = input.toDF().
     | withColumn("partition1", lit("value1")).
     | repartition(4)
scala> val query = df.writeStream.
     | partitionBy("partition1")
     | trigger(Trigger.Once).
     | option("checkpointLocation", "/tmp/checkpoint").
     | format(source="parquet").
     | start("/tmp/destination")
Enter fullscreen mode Exit fullscreen mode

We can go ahead and count those values

scala> query.awaitTermination()
scala> spark.read.parquet("/tmp/destination").count
res1: Long = 100
Enter fullscreen mode Exit fullscreen mode

As expected, we get 100 as the result

We should now see 4 parquet files in the destination and one file in the metadata log, e.g. like this

% ls -R /tmp/destination
_spark_metadata   partition1=value1

/tmp/destination/_spark_metadata:
0

/tmp/destination/partition1=value1:
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet
part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet
part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet
part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet
Enter fullscreen mode Exit fullscreen mode

The metadata log file should reference exactly the four files, like this:

% cat /tmp/destination/_spark_metadata/0
v1
{"path":"file:///tmp/destination/partition1=value1/part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
Enter fullscreen mode Exit fullscreen mode

We can simulate a partial write by copying one of the four dataframes. Now, we have 5 parquet files in the destination:

% cd /tmp/destination/partition1=value1
% cp part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000-copy.snappy.parquet
% ls /tmp/destination/partition1=value1
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000-copy.snappy.parquet
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet
part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet
part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet
part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet
Enter fullscreen mode Exit fullscreen mode

Reading with globbed paths

Exactly-once semantics guarantees that we will still only read 100 rows. Let's check that

scala> spark.read.parquet("/tmp/destination").count
res2: Long = 100
Enter fullscreen mode Exit fullscreen mode

As expected, we get 100.

What about this query (notice the star)

scala> spark.read.parquet("/tmp/destination/*").count
res3: Long = 125
Enter fullscreen mode Exit fullscreen mode

Well, that's why exactly-once semantics depends on how you read. As shown above, the destination directory does contain 5 parquet files. When reading without the globbed path, spark consults the _spark_metadata directory (aka metadata log) and only reads from the parquet files that are listed there. That's not the case with globbed paths. The metadata log is not consulted, hence exactly-once semantics does not apply when reading with globbed paths and we read duplicated data.

Reading from partition subdirectory

What about filtering by the partition? All of our values are in the same partition, so we should count 100 elements when we filter for it:

scala> spark.read.parquet("/tmp/destination").filter("partition1='value1'").count
res4: Long = 100
Enter fullscreen mode Exit fullscreen mode

And indeed, it works as expected. Now, in non-streaming Spark you could also read directly from the partition subdirectory and arrive at the same result. Does this work with streaming as well?

scala> spark.read.parquet("/tmp/destination/partition1=value1").count
res5: Long = 125
Enter fullscreen mode Exit fullscreen mode

No. It's the same reason as above, Spark does not consider the metadata log when you read from a subdirectory and therefore cannot determine if any of the parquet files are from partial writes and possible duplicates.

Conclusion

As we have seen, the exactly-once semantics is only guaranteed when the _spark_metadata directory is considered. This means that it depends on the reader whether or not exactly-once semantics is applied. In the case of Spark, it is only considered when you read from the root directory, without globbed paths. Whether this behaviour is a bug or a feature is not entirely clear to me. In practice, parquet files from partial writes should occur only rarely since Spark 3.0, as a best-effort cleanup in case of task abortions has been implemented (https://issues.apache.org/jira/browse/SPARK-27210). However, it's important to stress that this is only a best-effort and not a guaranteed cleanup.

Top comments (0)