TLDR
Like this:
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col
dataFrame
.withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
Changing column nullability in Batch mode
For Spark in Batch mode, one way to change column nullability is by creating a new dataframe with a new schema that has the desired nullability.
val schema = dataframe.schema // modify [[StructField] with name `cn` val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m) case y: StructField => y }) // apply new schema df.sqlContext.createDataFrame( df.rdd, newSchema )
https://stackoverflow.com/a/33195510/13532243
However, this approach isn't supported for a structured streaming dataframe, which fails with the following error.
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
Make a column nullable in structured streaming
In the same stackoverflow thread, another answer provides a way how to make a non-nullable column nullable, which works for Structured Streaming queries.
dataframe.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))
https://stackoverflow.com/a/46119565/13532243
This is a neat trick, since Spark has to account for the (hypothetical) fact that a value could be null and mark the column nullable, even though the column doesn't contain any null value in practice.
Make a column non-nullable in structured streaming
If you know that a nullable column in fact only contains non-nullable values, you may want to make that column non-nullable. Here's the trick with AssertNotNull
again:
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col
dataFrame
.withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
How does it work? Looking at its implementation https://github.com/apache/spark/blob/3fdfce3120f307147244e5eaf46d61419a723d50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L1591-L1628, the key is that AssertNotNull
overrides nullable
and always returns false
. That's how Spark determines this column to be non-nullable. Of course, if your column unexpectedly contains null values, the query will fail with a NullPointerException
.
Top comments (0)