<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Alexandros Biratsis</title>
    <description>The latest articles on DEV Community by Alexandros Biratsis (@abiratsis).</description>
    <link>https://dev.to/abiratsis</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F356206%2Fe805b067-519d-4765-b592-ea8b2af6a0cd.jpeg</url>
      <title>DEV Community: Alexandros Biratsis</title>
      <link>https://dev.to/abiratsis</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/abiratsis"/>
    <language>en</language>
    <item>
      <title>Stopping Spark Structured Streaming jobs via external signals</title>
      <dc:creator>Alexandros Biratsis</dc:creator>
      <pubDate>Mon, 06 Apr 2026 14:05:44 +0000</pubDate>
      <link>https://dev.to/abiratsis/stopping-spark-structured-streaming-jobs-via-external-signals-39k4</link>
      <guid>https://dev.to/abiratsis/stopping-spark-structured-streaming-jobs-via-external-signals-39k4</guid>
      <description>&lt;h2&gt;
  
  
  The Problem
&lt;/h2&gt;

&lt;p&gt;If you've ever tried to stop a Spark Structured Streaming job from outside the application, you've run into the wall: &lt;code&gt;StreamingQuery.stop()&lt;/code&gt; requires a direct reference to the running &lt;code&gt;StreamingQuery&lt;/code&gt; object, which is only available inside the driver process.&lt;/p&gt;

&lt;p&gt;In managed environments like Databricks notebooks, Kubernetes pods, or YARN containers, that reference is simply not accessible from the outside. Your options are usually:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use a vendor's API (vendor lock-in)&lt;/li&gt;
&lt;li&gt;Kill the cluster (brutal)&lt;/li&gt;
&lt;li&gt;Hack something with shared state (fragile)&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  The Idea
&lt;/h2&gt;

&lt;p&gt;Instead of calling &lt;code&gt;stop()&lt;/code&gt; directly, what if the job itself listened for an external signal and stopped gracefully when it arrived?&lt;/p&gt;

&lt;p&gt;That's exactly what &lt;a href="https://github.com/a-biratsis/stopstreaming" rel="noopener noreferrer"&gt;stopstreaming&lt;/a&gt; does. It adds a single extension method to any &lt;code&gt;StreamingQuery&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="nv"&gt;query&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitExternalTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;An async watcher (a Scala &lt;code&gt;Future&lt;/code&gt;) runs in the background monitoring a signal source. When the signal arrives, it calls &lt;code&gt;stop()&lt;/code&gt; internally. The streaming thread is never blocked, and you never need a handle to the &lt;code&gt;SparkSession&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Internally it uses Spark's own &lt;code&gt;StreamingQueryListener&lt;/code&gt; API for event-driven termination detection — so there's no busy polling when Spark itself stops the query.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fiqemin5cryn5mear6o4l.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fiqemin5cryn5mear6o4l.png" alt=" " width="800" height="1054"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Backends / Watchers
&lt;/h2&gt;

&lt;h3&gt;
  
  
  REST
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F01r8gw00wjplcn5wl2c7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F01r8gw00wjplcn5wl2c7.png" alt=" " width="800" height="909"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Spins up a lightweight &lt;code&gt;HttpServer&lt;/code&gt; on the Spark driver. Multiple queries automatically share the same port, each registering its own path derived from &lt;code&gt;query.id&lt;/code&gt;. Running multiple servers on different ports is also supported.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.github.stopstreaming.extensions.StreamingQueryOps._&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.github.stopstreaming.extensions.conf.RestStopConfig&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;query&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;spark&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;readStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt; &lt;span class="o"&gt;...&lt;/span&gt; &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;start&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;config&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;RestStopConfig&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;8558&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;// default: all interfaces, /stop path&lt;/span&gt;

&lt;span class="nv"&gt;query&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitExternalTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Stop from anywhere:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; POST http://&amp;lt;driver-host&amp;gt;:8558/stop/&amp;lt;query-id&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;query-id&lt;/code&gt; is the UUID Spark assigns to each streaming query, visible in the Spark UI and logs.&lt;/p&gt;

&lt;p&gt;In the following example, multiple jobs run on the same driver sharing the same port:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;cfg&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;RestStopConfig&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;8558&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="nv"&gt;queryA&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitExternalTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cfg&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;// POST /stop/&amp;lt;id-of-A&amp;gt;&lt;/span&gt;
&lt;span class="nv"&gt;queryB&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitExternalTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cfg&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;// POST /stop/&amp;lt;id-of-B&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  FileSystem
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F53wwx1x8n4jy0tawk80m.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F53wwx1x8n4jy0tawk80m.png" alt=" " width="800" height="1557"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Creates a marker file at startup. Delete it and the query stops. Works on local FS or DBFS.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.github.stopstreaming.extensions.conf.&lt;/span&gt;&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nc"&gt;FileSystemStopConfig&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;FsType&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;config&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FileSystemStopConfig&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;stopDir&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"dbfs:/tmp/streaming-stop"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;fsType&lt;/span&gt;  &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;FsType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;DBFS&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="nv"&gt;query&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitExternalTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Stop from a Databricks notebook cell:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="nv"&gt;dbutils&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;fs&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;rm&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"dbfs:/tmp/streaming-stop/&amp;lt;query-id&amp;gt;"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Or from a shell:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;rm&lt;/span&gt; /tmp/streaming-stop/&amp;lt;query-id&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Configuration
&lt;/h2&gt;

&lt;p&gt;Users can choose whether to configure the watchers through HOCON or custom settings.&lt;/p&gt;

&lt;p&gt;This is how to configure the REST watcher via &lt;code&gt;application.conf&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight hocon"&gt;&lt;code&gt;&lt;span class="nl"&gt;stopstreaming&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;backend&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"rest"&lt;/span&gt;&lt;span class="w"&gt;

  &lt;/span&gt;&lt;span class="nl"&gt;rest&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;host&lt;/span&gt;&lt;span class="w"&gt;      &lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"0.0.0.0"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;port&lt;/span&gt;&lt;span class="w"&gt;      &lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;8558&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;stop-path&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"/stop"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;config&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nv"&gt;StopConfigLoader&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;load&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="nv"&gt;query&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitExternalTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Or by creating a your own config instance:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight scala"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.github.stopstreaming.extensions.conf.RestStopConfig&lt;/span&gt;

&lt;span class="k"&gt;val&lt;/span&gt; &lt;span class="nv"&gt;restConfig&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;RestStopConfig&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"127.0.0.1"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;port&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;9000&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stopPath&lt;/span&gt; &lt;span class="k"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"/terminate"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

&lt;span class="nv"&gt;query&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="py"&gt;awaitExternalTermination&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;restConfig&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;.......&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Deployment
&lt;/h2&gt;

&lt;p&gt;You can deploy the library by using it with the &lt;code&gt;spark-submit&lt;/code&gt; command, or in databricks by adding it to your cluster.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;spark-submit:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;spark-submit &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--jars&lt;/span&gt; stopstreaminggracefully-assembly-0.1.jar &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--conf&lt;/span&gt; &lt;span class="s2"&gt;"spark.driver.extraJavaOptions=--add-exports=jdk.httpserver/com.sun.net.httpserver=ALL-UNNAMED"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--class&lt;/span&gt; com.example.MyApp &lt;span class="se"&gt;\&lt;/span&gt;
  my-app.jar
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Databricks:&lt;/strong&gt; Upload the fat JAR via the cluster Libraries UI, add the JVM flag in Spark config, and you're done.&lt;/p&gt;

&lt;h2&gt;
  
  
  What's Coming
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Azure Service Bus watcher&lt;/strong&gt; - poll a Service Bus queue for a stop message; ideal for Azure-native workloads&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka control topic watcher&lt;/strong&gt; - consume a dedicated Kafka topic; ideal for Kafka-heavy pipelines&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Stack
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Scala 2.13 / Apache Spark 4.1&lt;/li&gt;
&lt;li&gt;JDK built-in &lt;code&gt;HttpServer&lt;/code&gt; (no extra HTTP dependency for the REST backend)&lt;/li&gt;
&lt;li&gt;Typesafe Config for HOCON support&lt;/li&gt;
&lt;li&gt;SBT&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I would love feedback from anyone running Structured Streaming in production on Databricks, Kubernetes, YARN, or similar environments. Let me know if you find this approach interesting and how do you stop your own streaming jobs.&lt;/p&gt;

&lt;p&gt;🔗 &lt;a href="https://github.com/a-biratsis/stopstreaming" rel="noopener noreferrer"&gt;GitHub repo&lt;/a&gt;&lt;/p&gt;

</description>
      <category>spark</category>
      <category>scala</category>
      <category>databricks</category>
      <category>streaming</category>
    </item>
  </channel>
</rss>
