DEV Community

Kevin Wallimann
Kevin Wallimann

Posted on

How to make a column non-nullable in Spark Structured Streaming


Like this:

import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))

Changing column nullability in Batch mode

For Spark in Batch mode, one way to change column nullability is by creating a new dataframe with a new schema that has the desired nullability.

 val schema = dataframe.schema
 // modify [[StructField] with name `cn`
 val newSchema = StructType( {
   case StructField( c, t, _, m) if c.equals(cn) 
        =>  StructField( c, t, nullable = nullable, m)
   case y: StructField => y
 // apply new schema
 df.sqlContext.createDataFrame( df.rdd, newSchema )

However, this approach isn't supported for a structured streaming dataframe, which fails with the following error.

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)

Make a column nullable in structured streaming

In the same stackoverflow thread, another answer provides a way how to make a non-nullable column nullable, which works for Structured Streaming queries.

dataframe.withColumn("col_name", when(col("col_name").isNotNull,

This is a neat trick, since Spark has to account for the (hypothetical) fact that a value could be null and mark the column nullable, even though the column doesn't contain any null value in practice.

Make a column non-nullable in structured streaming

If you know that a nullable column in fact only contains non-nullable values, you may want to make that column non-nullable. Here's the trick with AssertNotNull again:

import  org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))

How does it work? Looking at its implementation, the key is that AssertNotNull overrides nullable and always returns false. That's how Spark determines this column to be non-nullable. Of course, if your column unexpectedly contains null values, the query will fail with a NullPointerException.

Oldest comments (0)