DEV Community

Kevin Wallimann
Kevin Wallimann

Posted on

3

How to make a column non-nullable in Spark Structured Streaming

TLDR

Like this:

import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

dataFrame
  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))

Changing column nullability in Batch mode

For Spark in Batch mode, one way to change column nullability is by creating a new dataframe with a new schema that has the desired nullability.

 val schema = dataframe.schema
 // modify [[StructField] with name `cn`
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) 
        =>  StructField( c, t, nullable = nullable, m)
   case y: StructField => y
 })
 // apply new schema
 df.sqlContext.createDataFrame( df.rdd, newSchema )

https://stackoverflow.com/a/33195510/13532243

However, this approach isn't supported for a structured streaming dataframe, which fails with the following error.

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)

Make a column nullable in structured streaming

In the same stackoverflow thread, another answer provides a way how to make a non-nullable column nullable, which works for Structured Streaming queries.

dataframe.withColumn("col_name", when(col("col_name").isNotNull,
  col("col_name")).otherwise(lit(null)))

https://stackoverflow.com/a/46119565/13532243

This is a neat trick, since Spark has to account for the (hypothetical) fact that a value could be null and mark the column nullable, even though the column doesn't contain any null value in practice.

Make a column non-nullable in structured streaming

If you know that a nullable column in fact only contains non-nullable values, you may want to make that column non-nullable. Here's the trick with AssertNotNull again:

import  org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col

dataFrame
  .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))

How does it work? Looking at its implementation https://github.com/apache/spark/blob/3fdfce3120f307147244e5eaf46d61419a723d50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L1591-L1628, the key is that AssertNotNull overrides nullable and always returns false. That's how Spark determines this column to be non-nullable. Of course, if your column unexpectedly contains null values, the query will fail with a NullPointerException.

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Discover a treasure trove of wisdom within this insightful piece, highly respected in the nurturing DEV Community enviroment. Developers, whether novice or expert, are encouraged to participate and add to our shared knowledge basin.

A simple "thank you" can illuminate someone's day. Express your appreciation in the comments section!

On DEV, sharing ideas smoothens our journey and strengthens our community ties. Learn something useful? Offering a quick thanks to the author is deeply appreciated.

Okay