<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Giannis Polyzos</title>
    <description>The latest articles on DEV Community by Giannis Polyzos (@ipolyzos).</description>
    <link>https://dev.to/ipolyzos</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1488620%2F8d42f7ef-618f-4c4f-8d47-4266c7ccaee6.jpeg</url>
      <title>DEV Community: Giannis Polyzos</title>
      <link>https://dev.to/ipolyzos</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ipolyzos"/>
    <language>en</language>
    <item>
      <title>A Deep Dive Into Apache Flink Timers</title>
      <dc:creator>Giannis Polyzos</dc:creator>
      <pubDate>Tue, 04 Mar 2025 09:08:52 +0000</pubDate>
      <link>https://dev.to/ipolyzos/a-deep-dive-into-apache-flink-timers-6m4</link>
      <guid>https://dev.to/ipolyzos/a-deep-dive-into-apache-flink-timers-6m4</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb4mj2gg65bkk1cuaoxws.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fb4mj2gg65bkk1cuaoxws.png" alt="Flink Timers Overview" width="800" height="426"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Timers in Apache Flink are scheduled callbacks that can be set to trigger at a specific time in the future. &lt;/p&gt;

&lt;p&gt;Flink provides two main notions of time for stream processing: &lt;strong&gt;processing time&lt;/strong&gt; and &lt;strong&gt;event time&lt;/strong&gt;. Timers can be registered for either of these &lt;strong&gt;time domains&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;⌛&lt;strong&gt;Processing-time timers:&lt;/strong&gt; These timers use the system’s real clock (wall-clock time). A processing-time timer triggers when the machine’s current time reaches the specified timestamp​. This is essentially "now" as measured by the processing node. Processing time is straightforward and low-latency (no need for watermark coordination), but it can be less deterministic since it depends on the clock of the machine processing the data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;⏳&lt;strong&gt;Event-time timers:&lt;/strong&gt; These timers use the timestamps of events (the time when an event occurred, as embedded in the event). Flink’s event-time processing is driven by watermarks, special messages that flow with the data to indicate progress in event time. An event-time timer triggers when the operator’s watermark passes the timer’s timestamp​. This means all events with a timestamp less than or equal to that time have been processed. Event-time timers allow correct handling of out-of-order events and lateness, providing deterministic results based on event timestamps, regardless of processing delays.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Key differences
&lt;/h3&gt;

&lt;p&gt;👉 &lt;strong&gt;Trigger mechanism:&lt;/strong&gt; A processing-time timer fires based on wall-clock time, whereas an event-time timer fires based on the advancement of watermarks (which in turn reflect event timestamps)​.&lt;/p&gt;

&lt;p&gt;👉 &lt;strong&gt;Use cases:&lt;/strong&gt; Event-time timers are ideal for time-dependent computations that need to respect event timestamps (for example, window aggregations, session gap detection, or complex event patterns). Processing-time timers are useful for tasks like emitting periodic updates, timing out idle sessions, or heartbeat mechanisms that rely on real-time passage.&lt;/p&gt;

&lt;p&gt;👉 &lt;strong&gt;Determinism:&lt;/strong&gt; Event-time processing yields consistent results under replay or different speeds (since event order and watermarks dictate the timing), whereas processing-time results can vary if the job is paused or slowed (because the wall clock keeps ticking).&lt;/p&gt;

&lt;p&gt;Flink also supports &lt;code&gt;ingestion time&lt;/code&gt; (the time events enter Flink) as a middle ground, but in modern Flink APIs, we typically choose between event time and processing time explicitly.&lt;/p&gt;

&lt;h3&gt;
  
  
  Stateful Processing and Timers in Flink
&lt;/h3&gt;

&lt;p&gt;Most Flink streaming applications are stateful,i.e they remember information across events. Flink’s ProcessFunction gives you access to state and timers, making it a powerful “Swiss army knife” for custom streaming logic. &lt;/p&gt;

&lt;p&gt;When using a ProcessFunction -- &lt;code&gt;KeyedProcessFunction&lt;/code&gt; for example, Flink provides:&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;Keyed state:&lt;/strong&gt; You can declare state (e.g., ValueState, ListState) that is scoped to each key in a keyed stream. Only events with the same key share that state. The state for each key is stored by Flink in the state backend and is checkpointed for fault tolerance​. This means your state is persisted and can be recovered on failures. State and timers are kept locally in each task and periodically snapshotted to durable storage for fault tolerance.&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;TimerService:&lt;/strong&gt; The context in a &lt;code&gt;ProcessFunction&lt;/code&gt;provides a TimerService for registering timers. You can schedule a callback for a future time in event time or processing time (in milliseconds). For example, &lt;code&gt;ctx.timerService().registerEventTimeTimer(t)&lt;/code&gt; will schedule an &lt;code&gt;onTimer()&lt;/code&gt; call for that key when the event-time watermark reaches &lt;strong&gt;t&lt;/strong&gt;, and &lt;code&gt;registerProcessingTimeTimer(t)&lt;/code&gt; will trigger when the processing time clock reaches &lt;code&gt;t&lt;/code&gt;. The &lt;code&gt;onTimer(...)&lt;/code&gt; callback is where you define what should happen when the timer fires.&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;Timer callbacks:&lt;/strong&gt; When a timer fires, Flink invokes the &lt;code&gt;onTimer()&lt;/code&gt; method in the &lt;code&gt;ProcessFunction&lt;/code&gt;. Within onTimer, you can determine the time domain (event or processing) via ctx.timeDomain(), and emit results or update state. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; The onTimer callback, like &lt;code&gt;processElement&lt;/code&gt;, is executed with the keyed context, meaning you can read or update the keyed state as needed for that key​. Flink ensures that calls to processElement and onTimer are &lt;strong&gt;thread-safe&lt;/strong&gt; and &lt;strong&gt;serialized per key&lt;/strong&gt;, so you don’t need extra locking when accessing state.&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;State and timer persistence:&lt;/strong&gt; Both state and timers are part of Flink’s managed state and are included in snapshots. Timers are checkpointed along with state​. If your job restarts from a checkpoint or savepoint, any timers that were set will be restored. Notably, if a timer was supposed to fire while the job was down (e.g., the checkpoint was taken before the timer timestamp and on restore the current time or watermark is already past that point), Flink will fire the timer immediately upon restore​. This guarantees no timer is “missed” across failures. Timers are also automatically deduplicated by Flink: for each key and exact timestamp, only one timer exists – if you register another timer for the same timestamp (and same key), it will not create a duplicate; the callback will only execute once​. You can also cancel timers by their timestamp if needed (we’ll see an example of that).&lt;/p&gt;

&lt;p&gt;Now that we’ve covered the basics of timers and state, let’s apply them in a concrete example.&lt;/p&gt;

&lt;h3&gt;
  
  
  Example Use Case: Monitoring Sensor Readings
&lt;/h3&gt;

&lt;p&gt;Let's assume we have a stream of sensor readings from IoT sensors. Each reading has a sensor ID, a timestamp (when the reading was produced), and a temperature value. We want to monitor these readings for two purposes:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Periodic aggregation (event-time):&lt;/strong&gt; Continuously compute the average temperature of each sensor over 1-minute and output the result. Yes, we could do this with just a time window 🙃, but since the goal is to demonstrate timers here we will do this using an event-time timer (to trigger at the end of each window). This simulates a custom windowing logic using timers and state.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Inactivity alert (processing-time):&lt;/strong&gt; Detect if a sensor stops sending data for a certain amount of time (e.g., 10 seconds) in real processing time, and raise an alert if so. This will be done with processing-time timers, which naturally fit the real-time notion of “no event received for X seconds”.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;Why use different timers for these?&lt;/strong&gt; The aggregation is aligned with event time because we want the window to consider the event timestamps &lt;strong&gt;and handle out-of-order events correctly based on watermarks&lt;/strong&gt;. The inactivity alert is about a real-time gap in data arrival, which is easier to measure with processing time, since if no events come, event time progress might stall, but processing time will still tick away. This should help understand both kinds of timers.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;The full source code can be found &lt;a href="https://github.com/polyzos/flink-blogpost-examples/tree/main/src/main/java/io/ipolyzos/timers" rel="noopener noreferrer"&gt;here&lt;/a&gt; and if you are looking for production options you can check the &lt;a href="https://www.ververica.com/deployment/managed-service?gad_source=1&amp;amp;gclid=CjwKCAiAt4C-BhBcEiwA8Kp0CXbVnqQOuhs0W5hM6ZiNbi7YPX4HspdWznsyFZd6P-cwy9clTOPXEBoCF0gQAvD_BwE" rel="noopener noreferrer"&gt;Ververica Cloud&lt;/a&gt; for free.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;The core of our job lives in the &lt;a href="https://github.com/polyzos/flink-blogpost-examples/blob/main/src/main/java/io/ipolyzos/timers/SensorProcessorFn.java" rel="noopener noreferrer"&gt;KeyedProcessFunction&lt;/a&gt; that uses state and timers to realize our two goals: 1. average computation and 2. inactivity alert.&lt;/p&gt;

&lt;p&gt;For each sensor (key), we need to keep track of:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The running &lt;code&gt;sum&lt;/code&gt; of temperatures and &lt;code&gt;count&lt;/code&gt; of readings in the current window (to compute the average at the window end).&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;end timestamp&lt;/code&gt; of the current window, so we know when to trigger output, and to detect when a new event falls into the next window.&lt;/li&gt;
&lt;li&gt;The &lt;code&gt;timestamp&lt;/code&gt; of the &lt;strong&gt;currently scheduled inactivity timer&lt;/strong&gt;, so we can cancel it if a new event arrives in time.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We will use Flink’s ValueState to store these values.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;
    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;processElement&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SensorReading&lt;/span&gt; &lt;span class="n"&gt;sensorReading&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; 
                               &lt;span class="nc"&gt;KeyedProcessFunction&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SensorReading&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;.&lt;/span&gt;&lt;span class="na"&gt;Context&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                               &lt;span class="nc"&gt;Collector&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;collector&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Extract the event timestamp of the current reading&lt;/span&gt;
        &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;eventTime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sensorReading&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTimestamp&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;currentWindowEnd&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;windowEndState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

        &lt;span class="c1"&gt;// 1) Event-Time Timer Logic for windowing (1-minute tumbling windows per sensor)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;currentWindowEnd&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// This is the first event for this key or the first event after a window reset&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;windowStart&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;eventTime&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventTime&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="no"&gt;WINDOW_DURATION&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;windowEnd&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;windowStart&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="no"&gt;WINDOW_DURATION&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="c1"&gt;// register an event-time timer for end of the window&lt;/span&gt;
            &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timerService&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;registerEventTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowEnd&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

            &lt;span class="n"&gt;windowEndState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowEnd&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sensorReading&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTemperature&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1L&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventTime&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;currentWindowEnd&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Still within the current window&lt;/span&gt;
            &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;sensorReading&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTemperature&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// The new event belongs to a next window (current window has ended)&lt;/span&gt;
            &lt;span class="c1"&gt;// Emit the result for the current window before resetting&lt;/span&gt;
            &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;sum&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;avg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sum&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="n"&gt;collector&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                    &lt;span class="s"&gt;"Average temperature for sensor "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;sensorReading&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSensorId&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; 
                            &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" for window ending at "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;currentWindowEnd&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" = "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;avg&lt;/span&gt;
            &lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="c1"&gt;// Clear the old window state and cancel the old timer&lt;/span&gt;
            &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timerService&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;deleteEventTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;currentWindowEnd&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

            &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="n"&gt;windowEndState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

            &lt;span class="c1"&gt;// Start a new window for the incoming event&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;windowStart&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;eventTime&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventTime&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="no"&gt;WINDOW_DURATION&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;windowEnd&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;windowStart&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="no"&gt;WINDOW_DURATION&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

            &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timerService&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;registerEventTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowEnd&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="n"&gt;windowEndState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowEnd&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sensorReading&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTemperature&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
            &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1L&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="c1"&gt;// 2) Processing-Time Timer Logic for inactivity alert (10s of no events)&lt;/span&gt;
        &lt;span class="c1"&gt;//      Every time we get an event, schedule a processing-time timer X ms in the future.&lt;/span&gt;
        &lt;span class="c1"&gt;//      If a new event comes before that, cancel the previous timer and schedule a new one.&lt;/span&gt;
        &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;prevTimerTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;inactivityTimerState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;prevTimerTimestamp&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Remove the old scheduled timer because we got a new event&lt;/span&gt;
            &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timerService&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;deleteProcessingTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;prevTimerTimestamp&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="c1"&gt;// Register a new processing-time timer for now + threshold&lt;/span&gt;
        &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;newTimerTimestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timerService&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;currentProcessingTime&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="no"&gt;INACTIVITY_THRESHOLD&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timerService&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;registerProcessingTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;newTimerTimestamp&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="c1"&gt;// Store the new timer's timestamp in state&lt;/span&gt;
        &lt;span class="n"&gt;inactivityTimerState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;newTimerTimestamp&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let’s break down what’s happening here:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;If this is the first event for the sensor (i.e., &lt;code&gt;currentWindowEnd == null&lt;/code&gt; because we haven’t seen this key before or we cleared after a window), we determine the current window’s start and end. We align the window to minute boundaries using the timestamp modulo the window size. For example, if an event arrives at 12:34:45.123 (ms since epoch), and our window is 60,000 ms (1 minute), we align to &lt;code&gt;12:34:00.000&lt;/code&gt; as start and &lt;code&gt;12:35:00.000&lt;/code&gt; as end. We then &lt;strong&gt;register an event-time timer&lt;/strong&gt; for the window end. This means “when the watermark reaches windowEnd call onTimer for this key.” We initialize the sum and count state with this first event’s values.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If another event comes and its timestamp is still before the current window’s end (&lt;code&gt;eventTime &amp;lt; currentWindowEnd&lt;/code&gt;), it’s in the same window. We update the running sum and count in state. (We do not need to register a new timer because one is already set for the window end.)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If an event arrives that is at or beyond the &lt;strong&gt;current window’s end&lt;/strong&gt; (&lt;code&gt;eventTime &amp;gt;= currentWindowEnd&lt;/code&gt;), that indicates the previous window should be finished. This can happen if events come slightly out of order or if there was a gap. In this case, we manually close out the current window: calculate the average from &lt;code&gt;sumState&lt;/code&gt; and &lt;code&gt;countState&lt;/code&gt;, emit it and clear the states. We also cancel the event-time timer for the old window since we’re handling it early – this prevents the &lt;code&gt;onTimer()&lt;/code&gt; from later firing for a window we’ve already emitted​. Then we start a new window: we determine the new window boundaries based on the current event’s timestamp, register a new event-time timer for the new window’s end, and set the state. In effect, we are handling the window rollover immediately when we see an event from the next window. This logic ensures that an event exactly on a window boundary or slightly late will still be accounted correctly in the next window. In a perfect scenario with in-order data, this branch might not trigger often, but it’s important for correctness with out-of-order events.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In the code, we use a &lt;code&gt;ValueState&amp;lt;Long&amp;gt;&lt;/code&gt; (inactivityTimerState) to remember the timestamp of the last scheduled processing-time timer for the key. On each event:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;We retrieve the previous timer timestamp (if any). If it exists, we cancel that timer. If the timer had already fired, the state would likely have been cleared; if it hasn’t fired yet, now it won’t because we canceled it.&lt;/li&gt;
&lt;li&gt;We then register a new processing-time timer for &lt;code&gt;currentProcessingTime + 10000&lt;/code&gt; ms (10 seconds in the future).&lt;/li&gt;
&lt;li&gt;We store this new timer’s timestamp in the state (so that a subsequent event can cancel it if needed).&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The actual emission of the results happens in &lt;code&gt;onTimer()&lt;/code&gt; method.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;  &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;onTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KeyedProcessFunction&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SensorReading&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;.&lt;/span&gt;&lt;span class="na"&gt;OnTimerContext&lt;/span&gt; &lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Collector&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;out&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Check the domain of the timer (event-time or processing-time)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timeDomain&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="nc"&gt;TimeDomain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;EVENT_TIME&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Event-time timer fired (window end reached)&lt;/span&gt;
            &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;sum&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="mf"&gt;0.0&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;0L&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
                &lt;span class="kt"&gt;double&lt;/span&gt; &lt;span class="n"&gt;avg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sum&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
                &lt;span class="n"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Average temperature for sensor "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCurrentKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
                        &lt;span class="s"&gt;" for window ending at "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" = "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;avg&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="o"&gt;}&lt;/span&gt;
            &lt;span class="c1"&gt;// Clear window state after emitting result&lt;/span&gt;
            &lt;span class="n"&gt;sumState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
            &lt;span class="n"&gt;windowEndState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timeDomain&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="nc"&gt;TimeDomain&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;PROCESSING_TIME&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Processing-time timer fired (inactivity threshold passed)&lt;/span&gt;
            &lt;span class="n"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"ALERT: Sensor "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCurrentKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" has been inactive for "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;INACTIVITY_THRESHOLD&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="mi"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" seconds"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
            &lt;span class="c1"&gt;// Clear the inactivity timer state (no active timer now for this key)&lt;/span&gt;
            &lt;span class="n"&gt;inactivityTimerState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Running and Testing the Flink Job
&lt;/h3&gt;

&lt;p&gt;If you run the application you should see the following output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Average temperature &lt;span class="k"&gt;for &lt;/span&gt;sensor sensor_2 &lt;span class="k"&gt;for &lt;/span&gt;window ending at 60000 &lt;span class="o"&gt;=&lt;/span&gt; 30.0
Average temperature &lt;span class="k"&gt;for &lt;/span&gt;sensor sensor_1 &lt;span class="k"&gt;for &lt;/span&gt;window ending at 60000 &lt;span class="o"&gt;=&lt;/span&gt; 22.625
Average temperature &lt;span class="k"&gt;for &lt;/span&gt;sensor sensor_2 &lt;span class="k"&gt;for &lt;/span&gt;window ending at 120000 &lt;span class="o"&gt;=&lt;/span&gt; 32.0
ALERT: Sensor sensor_1 has been inactive &lt;span class="k"&gt;for &lt;/span&gt;10 seconds
ALERT: Sensor sensor_2 has been inactive &lt;span class="k"&gt;for &lt;/span&gt;10 seconds
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; In order to see the above output though, some "cheating" is required 😊 Since we have a bounded source, this means that the program will exit as soon as its consumed. So we need to add some delay to make sure 10 seconds of processing time are passed, see &lt;a href="https://github.com/polyzos/flink-blogpost-examples/blob/main/src/main/java/io/ipolyzos/timers/SensorProcessorFn.java#L127" rel="noopener noreferrer"&gt;here&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For testing, you can adjust the input events or durations to see different scenarios, such as a sensor not sending any data (to quickly trigger the inactivity alert), or events coming slightly out-of-order. The WatermarkStrategy with 5 seconds out-of-orderness means the window result will wait until 5 seconds after the window to emit (to allow late arrivals) – you can lower this for faster results if needed.&lt;/p&gt;

&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;Apache Flink’s timers are really useful for building responsive, stateful streaming applications. They allow you to implement custom time-based logic that goes beyond the built-in window and time-based operators – for example, detecting complex patterns, implementing custom windowing, or monitoring inactivity as we did. &lt;/p&gt;

&lt;p&gt;Some key takeaways and best practices:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Decide between event time and processing time based on your use case. Use event time for correctness w.r.t event timestamps and handling of out-of-order events (with watermarks), and processing time for real-time driven actions where slight nondeterminism is acceptable.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Key your stream before using timers, as timers are per key​. This also helps isolate state per entity (like sensor) for scalability.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Clean up state when it’s no longer needed (we cleared our window state after use, and managed the cancelation of timers). This prevents state from growing indefinitely for inactive keys.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Remember that timers are part of state – they will survive failures and resume, firing if their time is due​.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Monitor event-time progress (watermarks) when using event-time timers. If watermarks don’t advance (e.g., if sources become idle and you haven’t marked them as idle), event-time timers won’t fire. &lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By leveraging timers and managed state, you can tackle a wide array of streaming scenarios that require remembering information and acting on time-based conditions. Apache Flink handles the heavy lifting of making this reliable and scalable under the hood. &lt;/p&gt;

&lt;p&gt;You made it and reached the end. I hope you enjoyed this 👋 and happy streaming 🌊&lt;/p&gt;

</description>
      <category>streaming</category>
      <category>streamprocessing</category>
      <category>eventdriven</category>
      <category>apacheflink</category>
    </item>
    <item>
      <title>Understanding Custom Triggers In Apache Flink</title>
      <dc:creator>Giannis Polyzos</dc:creator>
      <pubDate>Fri, 28 Feb 2025 08:22:24 +0000</pubDate>
      <link>https://dev.to/ipolyzos/understanding-custom-triggers-in-apache-flink-5c4m</link>
      <guid>https://dev.to/ipolyzos/understanding-custom-triggers-in-apache-flink-5c4m</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2d5534lr6pybbybt61cg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F2d5534lr6pybbybt61cg.png" alt="Flink Windows &amp;amp; Triggers" width="800" height="482"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Apache Flink’s window triggers determine &lt;strong&gt;when&lt;/strong&gt; a window’s computation is emitted (and optionally cleared) during stream processing. &lt;/p&gt;

&lt;p&gt;Every window in Flink has an associated trigger that evaluates each incoming element and any relevant timer events to decide whether to &lt;code&gt;FIRE&lt;/code&gt; (emit results), &lt;code&gt;PURGE&lt;/code&gt; (drop window contents), or both​.&lt;br&gt;
​&lt;br&gt;
In other words, the trigger monitors window conditions and tells Flink when to produce the aggregated output of that window. By default, Flink provides built-in triggers for common scenarios, but they have limitations and this is where custom triggers become useful.&lt;/p&gt;
&lt;h3&gt;
  
  
  Build-in Triggers
&lt;/h3&gt;

&lt;p&gt;Flink comes with a few built-in triggers for standard behaviors​:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;EventTimeTrigger:&lt;/strong&gt; fires when the event-time watermark passes the end of the window (i.e. window closes in event time)​. This is the default for event-time windows and ensures results are emitted once the window’s end timestamp is reached according to the stream’s watermarks.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;ProcessingTimeTrigger:&lt;/strong&gt; fires when the processing-time clock reaches the end of the window​. This is the default for processing-time windows.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;CountTrigger:&lt;/strong&gt; fires once the number of elements in the window reaches a specified count threshold​&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;PurgingTrigger:&lt;/strong&gt; a wrapper that turns any other trigger into a purging trigger, meaning it will clear the window’s content whenever it fires​.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;These built-in triggers cover basic cases -- &lt;code&gt;time-driven&lt;/code&gt; or &lt;code&gt;count-driven&lt;/code&gt; firing, but they operate independently. If you override a window’s trigger with one of these, you replace the default trigger, not supplement it​. For example, if you apply a CountTrigger to an event-time window, the window will only fire based on count and ignore the event-time watermark. This means built-in triggers can’t be combined out-of-the-box – you can’t directly get a window that fires on either a count or a time condition using only the provided triggers. &lt;/p&gt;

&lt;p&gt;In practice, complex scenarios often require custom triggers. Custom triggers let you define arbitrary conditions (or combinations of conditions) for firing windows, such as &lt;strong&gt;“fire on count or time, whichever comes first”&lt;/strong&gt; or &lt;strong&gt;“fire when a special event occurs.”&lt;/strong&gt; &lt;/p&gt;

&lt;p&gt;By writing a custom trigger, you can overcome the limitations of the built-ins and implement custom early results, session-specific logic, late firing policies, and more.&lt;/p&gt;
&lt;h3&gt;
  
  
  Creating Custom Triggers
&lt;/h3&gt;

&lt;p&gt;To implement a custom trigger in Flink (DataStream API), you create a class that extends the abstract &lt;code&gt;Trigger&amp;lt;T, W&amp;gt;&lt;/code&gt; class, where &lt;code&gt;T&lt;/code&gt; is the type of elements in the window and W is the window type (e.g. TimeWindow)​. &lt;/p&gt;

&lt;p&gt;In your subclass, you override several methods that Flink uses to drive the trigger’s behavior. You can find a full list and description with those methods &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/windows/#triggers" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;A &lt;code&gt;TriggerContext&lt;/code&gt; is provided to these methods, which offers utilities to register and delete timers (see more &lt;a href=""&gt;here&lt;/a&gt;) and to access partitioned state tied to the window. You can use Flink's state within triggers to track information like element counts or flags. For instance, if implementing a count-based trigger, you might keep a &lt;code&gt;ValueState&amp;lt;Integer&amp;gt;&lt;/code&gt; for the count of elements seen so far in the window.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; Triggers must be serializable as they are part of the job graph sent to workers. Also, remember that returning &lt;code&gt;FIRE&lt;/code&gt; &lt;strong&gt;does not clear the window state&lt;/strong&gt; by default – the window remains and may accumulate more data, potentially firing again later​. If you want the window to be cleared when it fires (emitting results only once), you should return &lt;code&gt;FIRE_AND_PURGE&lt;/code&gt; or use a &lt;code&gt;PurgingTrigger&lt;/code&gt; wrapper. &lt;/p&gt;

&lt;p&gt;We’ll see examples of both behaviors below, so let's walk through 🚶two real-world examples using custom triggers in Flink.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;The full source code can be found &lt;a href="https://github.com/polyzos/flink-blogpost-examples/tree/main/src/main/java/io/ipolyzos/triggers" rel="noopener noreferrer"&gt;here&lt;/a&gt; and if you are looking for production options you can check the &lt;a href="https://www.ververica.com/deployment/managed-service?gad_source=1&amp;amp;gclid=CjwKCAiAt4C-BhBcEiwA8Kp0CXbVnqQOuhs0W5hM6ZiNbi7YPX4HspdWznsyFZd6P-cwy9clTOPXEBoCF0gQAvD_BwE" rel="noopener noreferrer"&gt;Ververica Cloud&lt;/a&gt; for free.&lt;/p&gt;
&lt;/blockquote&gt;
&lt;h3&gt;
  
  
  Example 1: Tumbling Window and Count-Based Early Firing
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Scenario:&lt;/strong&gt; Imagine an e-commerce application tracking user events – e.g. &lt;code&gt;page views&lt;/code&gt;, &lt;code&gt;add-to-cart&lt;/code&gt;, &lt;code&gt;checkout initiations&lt;/code&gt; – as a stream. We want to compute some analytics per user in a tumbling time window (say, 1 hour windows per user). &lt;/p&gt;

&lt;p&gt;However, if a user is very active, we don’t want to wait until the end of the hour to get intermediate results. We decide that for each user’s window, the result should be emitted early after 5 events (we will use a small threshold for demonstration purposes) have been received in that window, rather than waiting the full hour. &lt;/p&gt;

&lt;p&gt;We will implement a custom trigger that tracks the count of elements in the window. It will &lt;strong&gt;fire when the count reaches 5, and also ensure the window will fire at the end of the time window&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;The important part is within the &lt;a href="https://github.com/polyzos/flink-blogpost-examples/blob/main/src/main/java/io/ipolyzos/triggers/tumbling/CustomCountTrigger.java" rel="noopener noreferrer"&gt;CustomCountTrigger&lt;/a&gt; class:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt; &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt; &lt;span class="nf"&gt;onElement&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;UserEvent&lt;/span&gt; &lt;span class="n"&gt;userEvent&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; 
                                   &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                   &lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                   &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Get or initialize the current count&lt;/span&gt;
        &lt;span class="nc"&gt;ValueState&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;countState&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getPartitionedState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;countStateDesc&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;Integer&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="c1"&gt;// Increment count for every element&lt;/span&gt;
        &lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;+=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;update&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="c1"&gt;// If this is the first element, register an event-time timer for end-of-window&lt;/span&gt;
        &lt;span class="c1"&gt;// (Timers are set at window end timestamp, so when watermark passes window.end, onEventTime will fire)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;windowEnd&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEnd&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;  &lt;span class="c1"&gt;// end timestamp of this TimeWindow&lt;/span&gt;
            &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;registerEventTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowEnd&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;

        &lt;span class="c1"&gt;// Check if we've reached 5 events in this window&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;count&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Fire (emit the window) *now*, but do NOT purge (we return FIRE, not FIRE_AND_PURGE).&lt;/span&gt;
            &lt;span class="c1"&gt;// This means the window contents remain, and the window will possibly fire again at end-of-window.&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;FIRE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Not yet reached 5, so continue accumulating&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CONTINUE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt; &lt;span class="nf"&gt;onProcessingTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// We don't use processing-time timers in this trigger.&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CONTINUE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt; &lt;span class="nf"&gt;onEventTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// This is called when the event-time timer for the window fires (i.e., watermark reached window end).&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEnd&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Window end reached, so fire the window result.&lt;/span&gt;
            &lt;span class="c1"&gt;// We return FIRE_AND_PURGE to emit the result and clear the window state.&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;FIRE_AND_PURGE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="c1"&gt;// If it's not the window-end timer (e.g., some other timer), we ignore.&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CONTINUE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;        &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Clean up the count state when the window is purged/closed.&lt;/span&gt;
        &lt;span class="nc"&gt;ValueState&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;countState&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getPartitionedState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;countStateDesc&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;countState&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="c1"&gt;// We don't need to manually delete the event-time timer for window end – Flink does that when window closes.&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's break this down. On each element:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;It registers an event-time timer for the end-of-window when the first element arrives (to ensure the window eventually fires at the end if the count trigger didn’t already fire) – similar to what Flink’s default EventTimeTrigger does internally.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If count reaches 5, it returns &lt;code&gt;TriggerResult.FIRE&lt;/code&gt;. We choose &lt;code&gt;FIRE&lt;/code&gt; (not &lt;code&gt;FIRE_AND_PURGE&lt;/code&gt;) so that the window’s contents are not thrown away; this means more elements can still accumulate for the final output. We could also decide to reset the count state to 0 here if we wanted to allow another early firing every 5 events, but in this example we only fire once early at 5 and then rely on the final timer.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In &lt;code&gt;onEventTime()&lt;/code&gt;, when the watermark hits the window end timestamp, we return &lt;code&gt;FIRE_AND_PURGE&lt;/code&gt;. This emits the final window result and clears out the window state (the &lt;code&gt;clear()&lt;/code&gt; method will be called to drop the count state). We use &lt;code&gt;FIRE_AND_PURGE&lt;/code&gt; here because once the window end is reached, the window should close and not hold resources.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The &lt;code&gt;clear()&lt;/code&gt; method ensures we remove the count from state to avoid leaks. Flink will call &lt;code&gt;clear()&lt;/code&gt; after the window is purged (either via our final fire or if the window is disposed for any reason).&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This custom trigger demonstrates combining a count condition with an event-time window trigger. &lt;strong&gt;Without a custom trigger, achieving “5 events or end-of-window” would not be possible with Flink’s built-ins alone​&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;If you run the above example you should see the following output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;User user_1 had 5 events &lt;span class="k"&gt;in &lt;/span&gt;window &lt;span class="o"&gt;[&lt;/span&gt;2025-02-01 00:00:00.0,2025-02-01 01:00:00.0&lt;span class="o"&gt;)&lt;/span&gt;
User user_1 had 6 events &lt;span class="k"&gt;in &lt;/span&gt;window &lt;span class="o"&gt;[&lt;/span&gt;2025-02-01 00:00:00.0,2025-02-01 01:00:00.0&lt;span class="o"&gt;)&lt;/span&gt;
User user_2 had 5 events &lt;span class="k"&gt;in &lt;/span&gt;window &lt;span class="o"&gt;[&lt;/span&gt;2025-02-01 00:00:00.0,2025-02-01 01:00:00.0&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Notice here that the window gets triggered for &lt;code&gt;user_1&lt;/code&gt; when it gets 5 events and also when the window ends. &lt;/p&gt;

&lt;h3&gt;
  
  
  Example 2: Session Inactivity With Event Signals
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Scenario:&lt;/strong&gt; Now let’s consider a session window example. In many applications, events are grouped by sessions (a period of user activity separated by inactivity). Flink’s session windows can close a window after a period of inactivity. However, sometimes a session might also end due to a specific event –for example, a user explicitly &lt;code&gt;logs out&lt;/code&gt; or completes a &lt;code&gt;checkout&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;In an e-commerce or login-based app, you may define a session to end either after 30 minutes of inactivity or when a &lt;strong&gt;“logout”&lt;/strong&gt; or &lt;strong&gt;“checkout complete”&lt;/strong&gt; event occurs. &lt;/p&gt;

&lt;p&gt;Flink’s built-in session windows don’t know about the semantics of a logout event, they’d only close based on the timeout. We can create a custom trigger to handle both: fire the window when a special session-end event is seen, or if the session times out with no activity.&lt;/p&gt;

&lt;p&gt;The important part is within the &lt;a href="https://github.com/polyzos/flink-blogpost-examples/blob/main/src/main/java/io/ipolyzos/triggers/session/SessionEndTrigger.java" rel="noopener noreferrer"&gt;SessionEndTrigger&lt;/a&gt; class:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt; &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt; &lt;span class="nf"&gt;onElement&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;UserEvent&lt;/span&gt; &lt;span class="n"&gt;userEvent&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Register an event-time timer for the end-of-window (session timeout)&lt;/span&gt;
        &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;registerEventTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEnd&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="c1"&gt;// If this event is a session terminating event, fire and purge the window&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"LOGOUT"&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;equals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEventType&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="s"&gt;"CHECKOUT_COMPLETE"&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;equals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEventType&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;FIRE_AND_PURGE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="c1"&gt;// Otherwise, continue&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CONTINUE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt; &lt;span class="nf"&gt;onProcessingTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// not used in this trigger&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CONTINUE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt; &lt;span class="nf"&gt;onEventTime&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEnd&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Session inactivity timeout reached, fire and purge the window&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;FIRE_AND_PURGE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TriggerResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CONTINUE&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;onMerge&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;window&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;OnMergeContext&lt;/span&gt; &lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// If windows merge, register a new timer for the new window end and let the old timers lapse.&lt;/span&gt;
        &lt;span class="c1"&gt;// (Flink will call onEventTime for the exact timestamps that occur; by re-registering the new end we ensure final firing.)&lt;/span&gt;
        &lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;registerEventTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;window&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getEnd&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="nf"&gt;canMerge&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Override&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;clear&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeWindow&lt;/span&gt; &lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TriggerContext&lt;/span&gt; &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;triggerContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;deleteEventTimeTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeWindow&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;maxTimestamp&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's break this down. On each element:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;We register a timer, which ensures an event-time timer is set for the current end-of-window. (If the session window extends or merges, &lt;code&gt;window.getEnd()&lt;/code&gt; will be updated accordingly on each trigger invocation.)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;We then check the event’s type. If it is a designated end-of-session event (in this case, &lt;code&gt;"LOGOUT"&lt;/code&gt; or &lt;code&gt;"CHECKOUT_COMPLETE"&lt;/code&gt;), we immediately return &lt;code&gt;FIRE_AND_PURGE&lt;/code&gt;. This emits the window’s results and clears the window state right away, effectively closing the session as soon as that event is encountered.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;If it’s not an end signal, we return &lt;code&gt;CONTINUE&lt;/code&gt; to keep collecting events.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In &lt;code&gt;onEventTime()&lt;/code&gt;, we check if the timer is at the window’s end timestamp. If yes, that means the watermark has reached the session’s end (i.e., no new events for 15 minutes), so we return &lt;code&gt;FIRE_AND_PURGE&lt;/code&gt; to close out the session. (If the timer firing is for some other timestamp, we ignore it with &lt;code&gt;CONTINUE&lt;/code&gt;.)&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;We don’t use &lt;code&gt;onProcessingTime()&lt;/code&gt; here, so it just continues.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The &lt;code&gt;clear()&lt;/code&gt; method doesn’t have to do much because we didn’t use manual state (timers will be cleaned up by Flink after purging). If we had used any ValueState in the trigger, we’d clear it here.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;If you run the code above you should see the following output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Session &lt;span class="k"&gt;for &lt;/span&gt;user user_1 &lt;span class="o"&gt;[&lt;/span&gt;2025-02-01 00:20:00.0,2025-02-01 00:35:20.0&lt;span class="o"&gt;)&lt;/span&gt; -&amp;gt; Events: &lt;span class="o"&gt;[&lt;/span&gt;login, click, click, LOGOUT]
Session &lt;span class="k"&gt;for &lt;/span&gt;user user_2 &lt;span class="o"&gt;[&lt;/span&gt;2025-02-01 02:31:35.0,2025-02-01 02:48:19.0&lt;span class="o"&gt;)&lt;/span&gt; -&amp;gt; Events: &lt;span class="o"&gt;[&lt;/span&gt;login, click, CHECKOUT_COMPLETE]
Session &lt;span class="k"&gt;for &lt;/span&gt;user user_1 &lt;span class="o"&gt;[&lt;/span&gt;2025-02-01 01:20:00.0,2025-02-01 01:35:25.0&lt;span class="o"&gt;)&lt;/span&gt; -&amp;gt; Events: &lt;span class="o"&gt;[&lt;/span&gt;login, click, click]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;Custom triggers in Apache Flink provide a powerful mechanism to control window evaluation beyond the standard time or count policies. In this post, we saw that triggers decide when a window’s results are emitted and can fire multiple times or clear the window state​. Flink’s built-in triggers (event-time, processing-time, count, etc.) cover basic needs, but they cannot be combined or tailored to complex conditions​. By implementing a custom trigger, you can overcome these limitations – for example, firing early based on element count while &lt;strong&gt;still observing event-time boundaries&lt;/strong&gt;, or ending a window when a &lt;strong&gt;domain-specific event occurs&lt;/strong&gt;.&lt;/p&gt;

&lt;h4&gt;
  
  
  Key takeaways and best practices:
&lt;/h4&gt;

&lt;p&gt;✅ &lt;strong&gt;Understand the Window Lifecycle:&lt;/strong&gt; A trigger that fires (returns &lt;code&gt;FIRE&lt;/code&gt;) does not close the window; the window can accumulate more data and fire again. To close it, use &lt;code&gt;FIRE_AND_PURGE&lt;/code&gt; or design the trigger to eventually purge (as we did at the end of time windows or session gaps).&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;Override Required Methods:&lt;/strong&gt; At minimum override &lt;code&gt;onElement&lt;/code&gt;, &lt;code&gt;onEventTime&lt;/code&gt;, &lt;code&gt;onProcessingTime&lt;/code&gt;, and &lt;code&gt;clear&lt;/code&gt;. Use onMerge for merging window scenarios (like sessions) to handle timers or state consistently​&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;Use TriggerContext for State and Timers:&lt;/strong&gt; If your logic needs to count events or track flags, use partitioned state via &lt;code&gt;TriggerContext&lt;/code&gt;. Always clean up state in &lt;code&gt;clear()&lt;/code&gt; to avoid leaks.&lt;/p&gt;

&lt;p&gt;✅ &lt;strong&gt;When to Use Custom Triggers:&lt;/strong&gt; Use them when built-in triggers don’t suffice – e.g., combined conditions (time + count), irregular event-driven firing, early and incremental results, or avoiding default behaviors (one common use case is preventing late events from retriggering a window by writing a trigger that ignores late elements​). If a simple built-in trigger meets the need, stick with it for simplicity. But as shown, custom triggers shine for implementing business rules directly in the streaming logic.&lt;/p&gt;

&lt;p&gt;You made it and reached the end. I hope you enjoyed this 👋 and happy streaming 🌊&lt;/p&gt;

</description>
      <category>streaming</category>
      <category>streamprocessing</category>
      <category>eventdriven</category>
      <category>apacheflink</category>
    </item>
  </channel>
</rss>
