DEV Community

Alexandros Biratsis
Alexandros Biratsis

Posted on

Stopping Spark Structured Streaming jobs via external signals

The Problem

If you've ever tried to stop a Spark Structured Streaming job from outside the application, you've run into the wall: StreamingQuery.stop() requires a direct reference to the running StreamingQuery object, which is only available inside the driver process.

In managed environments like Databricks notebooks, Kubernetes pods, or YARN containers, that reference is simply not accessible from the outside. Your options are usually:

  • Use a vendor's API (vendor lock-in)
  • Kill the cluster (brutal)
  • Hack something with shared state (fragile)

The Idea

Instead of calling stop() directly, what if the job itself listened for an external signal and stopped gracefully when it arrived?

That's exactly what stopstreaming does. It adds a single extension method to any StreamingQuery:

query.awaitExternalTermination(config)
Enter fullscreen mode Exit fullscreen mode

An async watcher (a Scala Future) runs in the background monitoring a signal source. When the signal arrives, it calls stop() internally. The streaming thread is never blocked, and you never need a handle to the SparkSession.

Internally it uses Spark's own StreamingQueryListener API for event-driven termination detection — so there's no busy polling when Spark itself stops the query.

Backends / Watchers

REST

Spins up a lightweight HttpServer on the Spark driver. Multiple queries automatically share the same port, each registering its own path derived from query.id. Running multiple servers on different ports is also supported.

import io.github.stopstreaming.extensions.StreamingQueryOps._
import io.github.stopstreaming.extensions.conf.RestStopConfig

val query = spark.readStream. ... .start()

val config = RestStopConfig(port = 8558) // default: all interfaces, /stop path

query.awaitExternalTermination(config)
Enter fullscreen mode Exit fullscreen mode

Stop from anywhere:

curl -X POST http://<driver-host>:8558/stop/<query-id>
Enter fullscreen mode Exit fullscreen mode

The query-id is the UUID Spark assigns to each streaming query, visible in the Spark UI and logs.

In the following example, multiple jobs run on the same driver sharing the same port:

val cfg = RestStopConfig(port = 8558)

queryA.awaitExternalTermination(cfg)  // POST /stop/<id-of-A>
queryB.awaitExternalTermination(cfg)  // POST /stop/<id-of-B>
Enter fullscreen mode Exit fullscreen mode

FileSystem

Creates a marker file at startup. Delete it and the query stops. Works on local FS or DBFS.

import io.github.stopstreaming.extensions.conf.{FileSystemStopConfig, FsType}

val config = FileSystemStopConfig(
  stopDir = "dbfs:/tmp/streaming-stop",
  fsType  = FsType.DBFS
)

query.awaitExternalTermination(config)
Enter fullscreen mode Exit fullscreen mode

Stop from a Databricks notebook cell:

dbutils.fs.rm("dbfs:/tmp/streaming-stop/<query-id>")
Enter fullscreen mode Exit fullscreen mode

Or from a shell:

rm /tmp/streaming-stop/<query-id>
Enter fullscreen mode Exit fullscreen mode

Configuration

Users can choose whether to configure the watchers through HOCON or custom settings.

This is how to configure the REST watcher via application.conf:

stopstreaming {
  backend = "rest"

  rest {
    host      = "0.0.0.0"
    port      = 8558
    stop-path = "/stop"
  }
}
Enter fullscreen mode Exit fullscreen mode
val config = StopConfigLoader.load()
query.awaitExternalTermination(config)
Enter fullscreen mode Exit fullscreen mode

Or by creating a your own config instance:

import io.github.stopstreaming.extensions.conf.RestStopConfig

val restConfig = RestStopConfig(host = "127.0.0.1", port = 9000, stopPath = "/terminate")

query.awaitExternalTermination(restConfig)
.......
Enter fullscreen mode Exit fullscreen mode

Deployment

You can deploy the library by using it with the spark-submit command, or in databricks by adding it to your cluster.

spark-submit:

spark-submit \
  --jars stopstreaminggracefully-assembly-0.1.jar \
  --conf "spark.driver.extraJavaOptions=--add-exports=jdk.httpserver/com.sun.net.httpserver=ALL-UNNAMED" \
  --class com.example.MyApp \
  my-app.jar
Enter fullscreen mode Exit fullscreen mode

Databricks: Upload the fat JAR via the cluster Libraries UI, add the JVM flag in Spark config, and you're done.

What's Coming

  • Azure Service Bus watcher - poll a Service Bus queue for a stop message; ideal for Azure-native workloads
  • Kafka control topic watcher - consume a dedicated Kafka topic; ideal for Kafka-heavy pipelines

Stack

  • Scala 2.13 / Apache Spark 4.1
  • JDK built-in HttpServer (no extra HTTP dependency for the REST backend)
  • Typesafe Config for HOCON support
  • SBT

I would love feedback from anyone running Structured Streaming in production on Databricks, Kubernetes, YARN, or similar environments. Let me know if you find this approach interesting and how do you stop your own streaming jobs.

🔗 GitHub repo

Top comments (0)