<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Kevin Wallimann</title>
    <description>The latest articles on DEV Community by Kevin Wallimann (@kevinwallimann).</description>
    <link>https://dev.to/kevinwallimann</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F429135%2F55d759e1-9b89-4fc5-9a67-d2fa89e85e8b.png</url>
      <title>DEV Community: Kevin Wallimann</title>
      <link>https://dev.to/kevinwallimann</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/kevinwallimann"/>
    <language>en</language>
    <item>
      <title>How to recover from a Kafka topic reset in Spark Structured Streaming</title>
      <dc:creator>Kevin Wallimann</dc:creator>
      <pubDate>Wed, 13 Jul 2022 20:40:43 +0000</pubDate>
      <link>https://dev.to/kevinwallimann/how-to-recover-from-a-kafka-topic-reset-in-spark-structured-streaming-3phd</link>
      <guid>https://dev.to/kevinwallimann/how-to-recover-from-a-kafka-topic-reset-in-spark-structured-streaming-3phd</guid>
      <description>&lt;p&gt;Kafka topics should not routinely be deleted and recreated or offsets reset. Should it be necessary, care must be taken how and when to update the offsets in Spark Structured Streaming's checkpoints, in order to avoid data loss.&lt;/p&gt;

&lt;p&gt;Since such an offset reset happens outside of Spark, the offsets in the checkpoints are obviously not automatically updated to reflect the change. This may cause unexpected behaviour, because the offsets are not in the expected range.&lt;/p&gt;

&lt;h1&gt;
  
  
  When and how data loss may occur
&lt;/h1&gt;

&lt;p&gt;Let's assume a Spark Structured Streaming query with a Once-Trigger consumed 500 records from Kafka topic &lt;code&gt;test-topic&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;The checkpointed offset for microbatch 0 contains this&lt;/p&gt;

&lt;p&gt;{checkpoint-dir}/offsets/0:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{"test-topic":{"0":500}}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;On the Kafka topic, the beginning offset is 0 and the end offset is 500.&lt;/p&gt;

&lt;p&gt;So far, so good.&lt;/p&gt;

&lt;h2&gt;
  
  
  New end offsets &amp;lt; checkpointed offset
&lt;/h2&gt;

&lt;p&gt;Now, let's assume the Kafka topic has been deleted and recreated and 300 messages produced, such that the new end offset on the topic is 300. If the Spark query is restarted, it will fail with the following error message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;java.lang.IllegalStateException: Partition test-topic-0's offset was changed from 500 to 300, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Based on its checkpoint from batch 0, Spark would have expected messages with offsets 500 and higher, but only found lower offsets. Spark does not know if all messages have been consumed before the offset reset and by default assumes that messages could have been lost, failing with the above exception.&lt;/p&gt;

&lt;p&gt;What happens if the query is rerun with &lt;code&gt;failOnDataLoss=false&lt;/code&gt;?&lt;/p&gt;

&lt;p&gt;This time, Spark only prints a warning&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;WARN KafkaMicroBatchReader: Partition test-topic-0's offset was changed from 500 to 300, some data may have been missed.

Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want your streaming query to fail on such cases, set the source option "failOnDataLoss" to "true".
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the checkpoint folder, a new file for batchId 1 is created, which contains&lt;/p&gt;

&lt;p&gt;{checkpoint-dir}/offsets/1:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{"test-topic":{"0":300}}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;But how many records were consumed?&lt;/p&gt;

&lt;p&gt;Zero.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;22/07/13 10:29:59 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "5762b467-2d58-4b62-937b-427f99b38659",
  "runId" : "fc98564a-f2f0-42be-91e2-6d1f97446372",
  "name" : null,
  "timestamp" : "2022-07-13T08:29:57.863Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 709,
    "getBatch" : 45,
    "queryPlanning" : 301,
    "triggerExecution" : 1372
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[test]]",
    "startOffset" : {
      "test-topic" : {
        "0" : 500
      }
    },
    "endOffset" : {
      "test-topic" : {
        "0" : 300
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider@43bbf133"
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see from the log, Spark takes 500 as the start offset for this microbatch. So it takes the offset of the previous checkpoint and not the current beginning offset of the topic-partition. From that point of view, it makes sense that no record is ingested, but messages 0-300 are lost nonetheless.&lt;/p&gt;

&lt;h2&gt;
  
  
  New end offsets &amp;gt; checkpointed offset
&lt;/h2&gt;

&lt;p&gt;It's also possible that the new end offset is greater than the offset of the latest checkpoint. For example, let's assume that the new end offset is 800. In this case, the user probably expects to ingest the 800 new records. However, if the Spark query is restarted, it will succeed, but only ingest the 300 records from offsets 500-800. The log may look like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;22/07/13 16:39:46 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "5762b467-2d58-4b62-937b-427f99b38659",
  "runId" : "1822bd34-269c-4df0-8dbe-b63e19df0e77",
  "timestamp" : "2022-07-13T14:39:43.074Z",
  "batchId" : 2,
  "numInputRows" : 300,
  "processedRowsPerSecond" : 96.32674030310815,
  "durationMs" : {
    "addBatch" : 2747,
    "getBatch" : 7,
    "getEndOffset" : 0,
    "queryPlanning" : 315,
    "setOffsetRange" : 331,
    "triggerExecution" : 3892,
    "walCommit" : 213
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[test]]",
    "startOffset" : {
      "test-topic" : {
        "0" : 500
      }
    },
    "endOffset" : {
      "test-topic" : {
        "0" : 800
      }
    },
    "numInputRows" : 300,
    "processedRowsPerSecond" : 96.32674030310815
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider@4554d7c4"
  }
}

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Spark will not even print a warning, because from Spark's perspective, it does not know that offsets have been reset, and it looks just like 300 new records have been added to the topic, without any offset reset happening in between.&lt;/p&gt;

&lt;h2&gt;
  
  
  How to avoid data loss
&lt;/h2&gt;

&lt;p&gt;We have seen that in both cases, whether the new end offsets are smaller or larger than the checkpointed offset, data loss may occur. Fundamentally, Spark cannot distinguish between offsets before or after a recreation of a topic, so especially for the latter case, where the new end offset on the topic-partition is larger than the latest offset in the checkpoint, there is no general solution. One could try to manually modify the latest offset in the checkpoint or delete the checkpoints altogether, however this may require deleting the &lt;code&gt;_spark_metadata&lt;/code&gt; folder in case of a file sink.&lt;/p&gt;

&lt;p&gt;A special case is if the new end offset is 0. In that case, there can be no data loss, because there is no new data on the topic yet. A possible strategy to perform a topic deletion and recreation could therefore be:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Make sure all data has been ingested from the topic.&lt;/li&gt;
&lt;li&gt;Delete and recreate the topic.&lt;/li&gt;
&lt;li&gt;Restart the Spark Structured Streaming query that consumes from the topic. Spark will write a new checkpoint with offset 0.&lt;/li&gt;
&lt;li&gt;Only now start producing to the recreated topic.&lt;/li&gt;
&lt;li&gt;In the next microbatch, Spark will consume from offset 0.&lt;/li&gt;
&lt;/ol&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;ul&gt;
&lt;li&gt;The offsets in Spark Structured Streaming's checkpoints are not automatically updated when a offset reset happens on a Kafka topic.&lt;/li&gt;
&lt;li&gt;Queries with Once-Triggers that are restarted periodically may be oblivious to an offset reset&lt;/li&gt;
&lt;li&gt;The best way to keep Spark's offsets up-to-date is to restart the query before any new data has been published on the reset topic.&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>kafka</category>
      <category>spark</category>
    </item>
    <item>
      <title>How to recover from a deleted _spark_metadata folder in Spark Structured Streaming
</title>
      <dc:creator>Kevin Wallimann</dc:creator>
      <pubDate>Thu, 11 Mar 2021 15:30:28 +0000</pubDate>
      <link>https://dev.to/kevinwallimann/how-to-recover-from-a-deleted-sparkmetadata-folder-546j</link>
      <guid>https://dev.to/kevinwallimann/how-to-recover-from-a-deleted-sparkmetadata-folder-546j</guid>
      <description>&lt;p&gt;Warning: The described procedures have been tested on Spark 2.4.3 and 3.0.1, but otherwise not on all possible environments. Be mindful of what you're doing on your system. Having said that, I'd be grateful for any feedback if you find caveats.&lt;/p&gt;

&lt;h1&gt;
  
  
  Introduction
&lt;/h1&gt;

&lt;p&gt;Spark Structured Streaming guarantees exactly-once processing for file outputs. One element to maintain that guarantee is a folder called &lt;code&gt;_spark_metadata&lt;/code&gt; which is located in the output folder. The folder &lt;code&gt;_spark_metadata&lt;/code&gt; is also known as the "Metadata Log" and its files "Metadata log files". It may look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;A metadata log file may look like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;v1
{"path":"file:///tmp/destination/part-00000-5ee05bb5-3c65-4028-9c9e-dbc99f5fdbca.c000.snappy.parquet","size":3919,"isDir":false,"modificationTime":1615462080000,"blockReplication":1,"blockSize":33554432,"action":"add"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When Spark writes a file to the output folder, it writes the absolute path of the added file to the metadata log file of the current micro-batch.&lt;/p&gt;

&lt;p&gt;If a partial write occurs, that filename will not be added to the metadata log, and that's how Spark can maintain exactly-once semantics.&lt;/p&gt;

&lt;p&gt;When Spark reads a file from the output folder, it only reads from files that are referenced in the metadata log. At least that's the idea. For more details on that topic, see &lt;a href="https://dev.to/kevinwallimann/is-structured-streaming-exactly-once-well-it-depends-noe"&gt;https://dev.to/kevinwallimann/is-structured-streaming-exactly-once-well-it-depends-noe&lt;/a&gt;&lt;/p&gt;

&lt;h1&gt;
  
  
  Deleting the &lt;code&gt;_spark_metadata&lt;/code&gt; folder
&lt;/h1&gt;

&lt;p&gt;I hope it's clear by now that this folder should not be deleted. It should not be deleted!&lt;/p&gt;

&lt;p&gt;Anyway, let's see what happens if we delete it nonetheless.&lt;br&gt;
For this scenario, let's assume we have a structured streaming query, writing to a folder called &lt;code&gt;/tmp/destination&lt;/code&gt; and a checkpoint folder called &lt;code&gt;/tmp/checkpoint-location&lt;/code&gt;. After two micro-batches, the folder structure for the checkpoint-folder and the &lt;code&gt;_spark_metadata&lt;/code&gt; folder looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now for some reason, the &lt;code&gt;_spark_metadata&lt;/code&gt; folder in the destination is deleted or moved, but not the corresponding checkpoints folder.&lt;/p&gt;

&lt;p&gt;The following exception will be thrown sooner or later:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Caused by: java.lang.IllegalStateException: /tmp/destination/_spark_metadata/0 doesn't exist when compacting batch 9 (compactInterval: 10)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$3(CompactibleFileStreamLog.scala:187)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2(CompactibleFileStreamLog.scala:185)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$2$adapted(CompactibleFileStreamLog.scala:183)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:74)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.$anonfun$compact$1(CompactibleFileStreamLog.scala:183)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:181)
    at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
    at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Looking at the checkpoint folder, we see the following files&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/tmp/checkpoint-location/commits
/tmp/checkpoint-location/commits/0
/tmp/checkpoint-location/commits/1
/tmp/checkpoint-location/commits/2
/tmp/checkpoint-location/commits/3
/tmp/checkpoint-location/commits/4
/tmp/checkpoint-location/commits/5
/tmp/checkpoint-location/commits/6
/tmp/checkpoint-location/commits/7
/tmp/checkpoint-location/commits/8
/tmp/checkpoint-location/metadata
/tmp/checkpoint-location/offsets
/tmp/checkpoint-location/offsets/0
/tmp/checkpoint-location/offsets/1
/tmp/checkpoint-location/offsets/2
/tmp/checkpoint-location/offsets/3
/tmp/checkpoint-location/offsets/4
/tmp/checkpoint-location/offsets/5
/tmp/checkpoint-location/offsets/6
/tmp/checkpoint-location/offsets/7
/tmp/checkpoint-location/offsets/8
/tmp/checkpoint-location/offsets/9
/tmp/checkpoint-location/sources
/tmp/checkpoint-location/sources/0
/tmp/checkpoint-location/sources/0/0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Meanwhile, the destination folder contains&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As we can see, the &lt;code&gt;_spark_metadata&lt;/code&gt; folder is missing the files &lt;code&gt;0&lt;/code&gt; and &lt;code&gt;1&lt;/code&gt;, that were previously deleted.&lt;br&gt;
Instead of simply writing &lt;code&gt;/tmp/destination/_spark_metadata/9&lt;/code&gt;, Spark tries to concatenate the files &lt;code&gt;0&lt;/code&gt;, &lt;code&gt;1&lt;/code&gt;, ..., &lt;code&gt;8&lt;/code&gt; to a file called &lt;code&gt;9.compact&lt;/code&gt; to improve reading efficiency and to avoid the small files problem. This process is called log compaction. That's when the exception is thrown because the files &lt;code&gt;0&lt;/code&gt; and &lt;code&gt;1&lt;/code&gt; unexpectedly don't exist. Log compaction doesn't happen in every micro-batch, but the frequency is determined by the &lt;code&gt;compactInterval&lt;/code&gt; which is 10 by default.&lt;/p&gt;
&lt;h1&gt;
  
  
  How to fix the problem
&lt;/h1&gt;

&lt;p&gt;&lt;strong&gt;1. Restore the files of the removed &lt;code&gt;_spark_metadata&lt;/code&gt; folder&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If the deleted &lt;code&gt;_spark_metadata&lt;/code&gt; folder has only been moved and can be restored, its files should be restored.&lt;br&gt;
The files of the deleted &lt;code&gt;_spark_metadata&lt;/code&gt; folder should be moved into the new &lt;code&gt;_spark_metadata&lt;/code&gt; folder. There should be no overlapping filenames.&lt;/p&gt;

&lt;p&gt;After restoring the files, the &lt;code&gt;_spark_metadata&lt;/code&gt; folder should look like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
/tmp/destination/_spark_metadata/2
/tmp/destination/_spark_metadata/3
/tmp/destination/_spark_metadata/4
/tmp/destination/_spark_metadata/5
/tmp/destination/_spark_metadata/6
/tmp/destination/_spark_metadata/7
/tmp/destination/_spark_metadata/8
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, the query can be restarted and should finish without errors.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. Create dummy log files&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If the metadata log files are irrecoverable, we could create dummy log files for the missing micro-batches.&lt;br&gt;
In our example, this could be done like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="k"&gt;for &lt;/span&gt;i &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;0..1&lt;span class="o"&gt;}&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do &lt;/span&gt;&lt;span class="nb"&gt;echo &lt;/span&gt;v1 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="s2"&gt;"/tmp/destination/_spark_metadata/&lt;/span&gt;&lt;span class="nv"&gt;$i&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;or on HDFS&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="k"&gt;for &lt;/span&gt;i &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;0..1&lt;span class="o"&gt;}&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;do &lt;/span&gt;&lt;span class="nb"&gt;echo &lt;/span&gt;v1 &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="s2"&gt;"/tmp/&lt;/span&gt;&lt;span class="nv"&gt;$i&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; hdfs dfs &lt;span class="nt"&gt;-copyFromLocal&lt;/span&gt; &lt;span class="s2"&gt;"/tmp/&lt;/span&gt;&lt;span class="nv"&gt;$i&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt; &lt;span class="s2"&gt;"/tmp/destination/_spark_metadata/&lt;/span&gt;&lt;span class="nv"&gt;$i&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;done&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This will create the files&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, the query can be restarted and should finish without errors.&lt;/p&gt;

&lt;p&gt;Note that the information from the metadata log files &lt;code&gt;0&lt;/code&gt; and &lt;code&gt;1&lt;/code&gt; will definitely be lost, hence the exactly-once guarantee is lost for micro-batches 0 and 1, and you need to address this problem separately, but at least the query can continue.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;3. Deferring compaction&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If it's the middle of the night and you simply need that query to continue, or you have no write access to the filesystem, you can buy yourself some time by deferring&lt;br&gt;
the compaction. However, this solution does not solve the root cause.&lt;/p&gt;

&lt;p&gt;By default, the &lt;code&gt;compactInterval&lt;/code&gt; is 10. You can increase it to e.g. 100 by restarting the query with this additional config&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;spark-submit &lt;span class="nt"&gt;--conf&lt;/span&gt; spark.sql.streaming.fileSink.log.compactInterval&lt;span class="o"&gt;=&lt;/span&gt;100 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The same exception will be thrown in 100 micro-batches, so this is really just a very temporary fix to keep the query running for a few more micro-batches.&lt;/p&gt;

&lt;p&gt;Eventually, the missing log files have to be recreated.&lt;/p&gt;

</description>
      <category>spark</category>
    </item>
    <item>
      <title>Upgrading ABRiS from version 3 to version 4</title>
      <dc:creator>Kevin Wallimann</dc:creator>
      <pubDate>Tue, 15 Dec 2020 10:56:04 +0000</pubDate>
      <link>https://dev.to/kevinwallimann/upgrading-abris-from-version-3-to-version-4-4gl4</link>
      <guid>https://dev.to/kevinwallimann/upgrading-abris-from-version-3-to-version-4-4gl4</guid>
      <description>&lt;p&gt;With release &lt;a href="https://github.com/AbsaOSS/ABRiS/releases/tag/v4.0.1"&gt;v4.0.1&lt;/a&gt;, a new fluent API was introduced to ABRiS to reduce configuration errors and provide more type safety. While this change is a huge improvement going forward, it causes a breaking change for users migrating from version 3. This article walks you through an upgrade of some common use-cases of ABRiS.&lt;/p&gt;

&lt;p&gt;More information can be found on the &lt;a href="https://github.com/AbsaOSS/ABRiS"&gt;Github Page&lt;/a&gt;. More usage examples can be found on the &lt;a href="https://github.com/AbsaOSS/ABRiS/tree/master/documentation"&gt;documentation pages&lt;/a&gt;. Documentation for version 3 can be found under &lt;a href="https://github.com/AbsaOSS/ABRiS/tree/branch-3.2"&gt;branch 3.2&lt;/a&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Reading records
&lt;/h1&gt;

&lt;p&gt;A common use-case is to read data from a topic with both key and value schema. In ABRiS 3, this could be done like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;keyConfig&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_TOPIC&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"example_topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_URL&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_KEY_SCHEMA_NAMING_STRATEGY&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"topic.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_KEY_SCHEMA_ID&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"latest"&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;valueConfig&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_TOPIC&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"example_topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_URL&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_VALUE_SCHEMA_NAMING_STRATEGY&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"topic.record.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_VALUE_SCHEMA_ID&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"latest"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_VALUE_SCHEMA_NAME_FOR_RECORD_STRATEGY&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"record.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_VALUE_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"record.namespace"&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;za.co.absa.abris.avro.functions.from_confluent_avro&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;result&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt;  &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
    &lt;span class="nf"&gt;from_confluent_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"key"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="n"&gt;keyConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nf"&gt;from_confluent_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"value"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="n"&gt;valueConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With ABRiS 4, it looks like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;keyConfig&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;FromAvroConfig&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AbrisConfig&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;fromConfluentAvro&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;downloadReaderSchemaByLatestVersion&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;andTopicNameStrategy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"topicName"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;isKey&lt;/span&gt;&lt;span class="k"&gt;=&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;usingSchemaRegistry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;valueConfig&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;FromAvroConfig&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AbrisConfig&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;fromConfluentAvro&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;downloadReaderSchemaByLatestVersion&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;andTopicRecordNameStrategy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"topicName"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"record.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"record.namespace"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;usingSchemaRegistry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;za.co.absa.abris.avro.functions.from_avro&lt;/span&gt;
&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;result&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
   &lt;span class="nf"&gt;from_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"key"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="n"&gt;keyConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
   &lt;span class="nf"&gt;from_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;col&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"value"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; &lt;span class="n"&gt;valueConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;First and foremost, a new object was introduced, &lt;code&gt;AbrisConfig&lt;/code&gt;. This is the entry point for the new fluent API.&lt;/p&gt;

&lt;p&gt;Second, the method &lt;code&gt;from_confluent_avro&lt;/code&gt; was removed and should be replaced with &lt;code&gt;from_avro&lt;/code&gt;. To use the confluent format, specify &lt;code&gt;.fromConfluentAvro&lt;/code&gt; on &lt;code&gt;AbrisConfig&lt;/code&gt;. If you've been using simple vanilla avro, choose &lt;code&gt;.fromSimpleAvro&lt;/code&gt; instead.&lt;/p&gt;

&lt;p&gt;Third, notice the second parameter of &lt;code&gt;.andTopicNameStrategy&lt;/code&gt;. The default value of &lt;code&gt;isKey&lt;/code&gt; is &lt;code&gt;false&lt;/code&gt;, which is ok for value schemas. However, in the case of key schemas, &lt;code&gt;isKey&lt;/code&gt; must be set to &lt;code&gt;true&lt;/code&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Writing records
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Using an existing schema
&lt;/h2&gt;

&lt;p&gt;In ABRiS 3, writing records providing an existing schema id could be done like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;writeAvro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataFrame&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;config&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
    &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_TOPIC&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"example_topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_URL&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_VALUE_SCHEMA_NAMING_STRATEGY&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"topic.record.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_VALUE_SCHEMA_ID&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"42"&lt;/span&gt;
  &lt;span class="o"&gt;)&lt;/span&gt;

  &lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;za.co.absa.abris.avro.functions.to_confluent_avro&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;allColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;struct&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;head&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;tail&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;to_confluent_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;allColumns&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In ABRiS 4, it's like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;writeAvro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataFrame&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;config&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;ToAvroConfig&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AbrisConfig&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;toConfluentAvro&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;downloadSchemaById&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;42&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;usingSchemaRegistry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

  &lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;za.co.absa.abris.avro.functions.to_avro&lt;/span&gt;

  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;allColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;struct&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;head&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;tail&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;to_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;allColumns&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here again, the method &lt;code&gt;to_confluent_avro&lt;/code&gt; was removed and you have to use &lt;code&gt;.toConfluentAvro&lt;/code&gt; from &lt;code&gt;AbrisConfig&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Generating the schema from the records
&lt;/h2&gt;

&lt;p&gt;In ABRiS 3, it was incredibly easy (too easy!) to simply have ABRiS generate the schema for you from the records if you didn't provide the schema, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;writeAvro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataFrame&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;config&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
    &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_TOPIC&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"example_topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_SCHEMA_REGISTRY_URL&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nv"&gt;SchemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;PARAM_VALUE_SCHEMA_NAMING_STRATEGY&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"topic.record.name"&lt;/span&gt;
  &lt;span class="o"&gt;)&lt;/span&gt;

  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;allColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;struct&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;head&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;tail&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;to_confluent_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;allColumns&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Generating and registering the schema had to be done during the evaluation of the Spark expression, which was inefficient. Therefore this functionality was removed in v4 and now the schema needs to be registered before the evaluation phase and passed to the ABRiS config.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.spark.sql.avro.SchemaConverters.toAvroType&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;za.co.absa.abris.avro.read.confluent.SchemaManagerFactory&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;za.co.absa.abris.avro.registry.SchemaSubject&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;writeAvro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dataFrame&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="kt"&gt;DataFrame&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="c1"&gt;// generate schema&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;allColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;struct&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="k"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;expression&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;allColumns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;expr&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;schema&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;toAvroType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;expression&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;dataType&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;expression&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;nullable&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

  &lt;span class="c1"&gt;// register schema&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;schemaRegistryClientConfig&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; 
&lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;AbrisConfig&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;SCHEMA_REGISTRY_URL&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;schemaManager&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;SchemaManagerFactory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schemaRegistryClientConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;subject&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;SchemaSubject&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;usingTopicNameStrategy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"topic"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;isKey&lt;/span&gt;&lt;span class="k"&gt;=&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;schemaId&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;schemaManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;register&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;subject&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

  &lt;span class="c1"&gt;// create config&lt;/span&gt;
  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;config&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AbrisConfig&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;toConfluentAvro&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;downloadSchemaById&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schemaId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;usingSchemaRegistry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"http://localhost:8081"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

  &lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;allColumns&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;struct&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;head&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;tail&lt;/span&gt;&lt;span class="k"&gt;:&lt;/span&gt; &lt;span class="k"&gt;_&lt;/span&gt;&lt;span class="kt"&gt;*&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nv"&gt;dataFrame&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;select&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;to_avro&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;allColumns&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;as&lt;/span&gt; &lt;span class="ss"&gt;'value&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice that we used the topic name strategy in this example. &lt;code&gt;SchemaSubject&lt;/code&gt; offers methods for the record name strategy (&lt;code&gt;.usingRecordNameStrategy&lt;/code&gt;) and topic record name strategy as well (&lt;code&gt;.usingTopicRecordNameStrategy&lt;/code&gt;)&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Is Structured Streaming Exactly-Once? Well, it depends...</title>
      <dc:creator>Kevin Wallimann</dc:creator>
      <pubDate>Fri, 06 Nov 2020 10:06:53 +0000</pubDate>
      <link>https://dev.to/kevinwallimann/is-structured-streaming-exactly-once-well-it-depends-noe</link>
      <guid>https://dev.to/kevinwallimann/is-structured-streaming-exactly-once-well-it-depends-noe</guid>
      <description>&lt;h1&gt;
  
  
  TLDR
&lt;/h1&gt;

&lt;ul&gt;
&lt;li&gt;Yes, but only for the file sink. Not for the Kafka sink nor the Foreach sink&lt;/li&gt;
&lt;li&gt;It depends on how you read from the sink, whether you get exactly-once semantics or not&lt;/li&gt;
&lt;li&gt;If you read using a globbed path or read directly from partition subdirectory, exactly-once semantics is not applied.&lt;/li&gt;
&lt;/ul&gt;

&lt;h1&gt;
  
  
  Exactly-once semantics depends on the reader
&lt;/h1&gt;

&lt;p&gt;One of the key features of Spark Structured Streaming is its support for exactly-once semantics, meaning that no row will be missing or duplicated in the sink after recovery from failure.&lt;/p&gt;

&lt;p&gt;As per the documentation, the feature is only available for the file sink, while the Kafka sink and Foreach sink only support at-least-once semantics (&lt;a href="https://spark.apache.org/docs/3.0.0/structured-streaming-programming-guide.html#output-sinks"&gt;https://spark.apache.org/docs/3.0.0/structured-streaming-programming-guide.html#output-sinks&lt;/a&gt;)&lt;/p&gt;

&lt;p&gt;Let's demonstrate exactly-once semantics using a spark-shell:&lt;br&gt;
First, we'll write some streaming data to a destination. We add a literal column and partition by it just for the sake of having a partition subdirectory. Finally, we repartition the dataframe just to get multiple parquet files in the output.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; import org.apache.spark.sql.execution.streaming.MemoryStream
scala&amp;gt; import org.apache.spark.sql.streaming.Trigger
scala&amp;gt; import org.apache.spark.sql.functions._
scala&amp;gt; val input = MemoryStream[Int](1, spark.sqlContext)
scala&amp;gt; input.addData(1 to 100)
scala&amp;gt; val df = input.toDF().
     | withColumn("partition1", lit("value1")).
     | repartition(4)
scala&amp;gt; val query = df.writeStream.
     | partitionBy("partition1")
     | trigger(Trigger.Once).
     | option("checkpointLocation", "/tmp/checkpoint").
     | format(source="parquet").
     | start("/tmp/destination")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can go ahead and count those values&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; query.awaitTermination()
scala&amp;gt; spark.read.parquet("/tmp/destination").count
res1: Long = 100
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As expected, we get 100 as the result&lt;/p&gt;

&lt;p&gt;We should now see 4 parquet files in the destination and one file in the metadata log, e.g. like this&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;% ls -R /tmp/destination
_spark_metadata   partition1=value1

/tmp/destination/_spark_metadata:
0

/tmp/destination/partition1=value1:
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet
part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet
part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet
part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The metadata log file should reference exactly the four files, like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;% cat /tmp/destination/_spark_metadata/0
v1
{"path":"file:///tmp/destination/partition1=value1/part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
{"path":"file:///tmp/destination/partition1=value1/part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet","size":498,"isDir":false,"modificationTime":1604655052000,"blockReplication":1,"blockSize":33554432,"action":"add"}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can simulate a partial write by copying one of the four dataframes. Now, we have 5 parquet files in the destination:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;% cd /tmp/destination/partition1=value1
% cp part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000-copy.snappy.parquet
% ls /tmp/destination/partition1=value1
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000-copy.snappy.parquet
part-00000-54c74e55-7cdb-44f0-9c6f-2db62e2901aa.c000.snappy.parquet
part-00001-d2b67dae-3fe9-40ed-8e6a-75c4a36e8300.c000.snappy.parquet
part-00002-275dd640-4148-4947-96ca-3cad4feae215.c000.snappy.parquet
part-00003-bd18be1e-3906-4c49-905b-a9d1c37d3282.c000.snappy.parquet
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Reading with globbed paths
&lt;/h2&gt;

&lt;p&gt;Exactly-once semantics guarantees that we will still only read 100 rows. Let's check that&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; spark.read.parquet("/tmp/destination").count
res2: Long = 100
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As expected, we get 100. &lt;/p&gt;

&lt;p&gt;What about this query (notice the star)&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; spark.read.parquet("/tmp/destination/*").count
res3: Long = 125
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Well, that's why exactly-once semantics depends on how you read. As shown above, the destination directory does contain 5 parquet files. When reading without the globbed path, spark consults the _spark_metadata directory (aka metadata log) and only reads from the parquet files that are listed there. That's not the case with globbed paths. The metadata log is not consulted, hence exactly-once semantics does not apply when reading with globbed paths and we read duplicated data.&lt;/p&gt;

&lt;h2&gt;
  
  
  Reading from partition subdirectory
&lt;/h2&gt;

&lt;p&gt;What about filtering by the partition? All of our values are in the same partition, so we should count 100 elements when we filter for it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; spark.read.parquet("/tmp/destination").filter("partition1='value1'").count
res4: Long = 100
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And indeed, it works as expected. Now, in non-streaming Spark you could also read directly from the partition subdirectory and arrive at the same result. Does this work with streaming as well?&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;scala&amp;gt; spark.read.parquet("/tmp/destination/partition1=value1").count
res5: Long = 125
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;No. It's the same reason as above, Spark does not consider the metadata log when you read from a subdirectory and therefore cannot determine if any of the parquet files are from partial writes and possible duplicates.&lt;/p&gt;

&lt;h1&gt;
  
  
  Conclusion
&lt;/h1&gt;

&lt;p&gt;As we have seen, the exactly-once semantics is only guaranteed when the _spark_metadata directory is considered.  This means that it depends on the reader whether or not exactly-once semantics is applied. In the case of Spark, it is only considered when you read from the root directory, without globbed paths. Whether this behaviour is a bug or a feature is not entirely clear to me. In practice, parquet files from partial writes should occur only rarely since Spark 3.0, as a best-effort cleanup in case of task abortions has been implemented (&lt;a href="https://issues.apache.org/jira/browse/SPARK-27210"&gt;https://issues.apache.org/jira/browse/SPARK-27210&lt;/a&gt;). However, it's important to stress that this is only a best-effort and not a guaranteed cleanup.&lt;/p&gt;

</description>
      <category>spark</category>
    </item>
    <item>
      <title>How to make a column non-nullable in Spark Structured Streaming</title>
      <dc:creator>Kevin Wallimann</dc:creator>
      <pubDate>Sat, 11 Jul 2020 10:06:30 +0000</pubDate>
      <link>https://dev.to/kevinwallimann/how-to-make-a-column-non-nullable-in-spark-structured-streaming-4b62</link>
      <guid>https://dev.to/kevinwallimann/how-to-make-a-column-non-nullable-in-spark-structured-streaming-4b62</guid>
      <description>&lt;h1&gt;
  
  
  TLDR
&lt;/h1&gt;

&lt;p&gt;Like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

dataFrame
  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h1&gt;
  
  
  Changing column nullability in Batch mode
&lt;/h1&gt;

&lt;p&gt;For Spark in Batch mode, one way to change column nullability is by creating a new dataframe with a new schema that has the desired nullability. &lt;/p&gt;

&lt;blockquote&gt;

&lt;pre class="highlight plaintext"&gt;&lt;code&gt; val schema = dataframe.schema
 // modify [[StructField] with name `cn`
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) 
        =&amp;gt;  StructField( c, t, nullable = nullable, m)
   case y: StructField =&amp;gt; y
 })
 // apply new schema
 df.sqlContext.createDataFrame( df.rdd, newSchema )
&lt;/code&gt;&lt;/pre&gt;

&lt;/blockquote&gt;

&lt;p&gt;&lt;a href="https://stackoverflow.com/a/33195510/13532243"&gt;https://stackoverflow.com/a/33195510/13532243&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, this approach isn't supported for a structured streaming dataframe, which fails with the following error.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h1&gt;
  
  
  Make a column nullable in structured streaming
&lt;/h1&gt;

&lt;p&gt;In the same stackoverflow thread, another answer provides a way how to make a non-nullable column nullable, which works for Structured Streaming queries.&lt;/p&gt;

&lt;blockquote&gt;

&lt;pre class="highlight plaintext"&gt;&lt;code&gt;dataframe.withColumn("col_name", when(col("col_name").isNotNull,
  col("col_name")).otherwise(lit(null)))
&lt;/code&gt;&lt;/pre&gt;

&lt;/blockquote&gt;

&lt;p&gt;&lt;a href="https://stackoverflow.com/a/46119565/13532243"&gt;https://stackoverflow.com/a/46119565/13532243&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This is a neat trick, since Spark has to account for the (hypothetical) fact that a value could be null and mark the column nullable, even though the column doesn't contain any null value in practice.&lt;/p&gt;

&lt;h1&gt;
  
  
  Make a column non-nullable in structured streaming
&lt;/h1&gt;

&lt;p&gt;If you know that a nullable column in fact only contains non-nullable values, you may want to make that column non-nullable. Here's the trick with &lt;code&gt;AssertNotNull&lt;/code&gt; again:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import  org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

dataFrame
  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;How does it work? Looking at its implementation &lt;a href="https://github.com/apache/spark/blob/3fdfce3120f307147244e5eaf46d61419a723d50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L1591-L1628"&gt;https://github.com/apache/spark/blob/3fdfce3120f307147244e5eaf46d61419a723d50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L1591-L1628&lt;/a&gt;, the key is that &lt;code&gt;AssertNotNull&lt;/code&gt; overrides &lt;code&gt;nullable&lt;/code&gt; and always returns &lt;code&gt;false&lt;/code&gt;. That's how Spark determines this column to be non-nullable. Of course, if your column unexpectedly contains null values, the query will fail with a &lt;code&gt;NullPointerException&lt;/code&gt;.&lt;/p&gt;

</description>
      <category>spark</category>
    </item>
  </channel>
</rss>
