DEV Community

137Foundry
137Foundry

Posted on

How to Handle Late-Arriving Events in Apache Flink With Side Outputs

The biggest single-line bug in a Flink job is usually the watermark. The second biggest is what happens when an event arrives past the watermark. The defaults Flink ships with are sensible for the synthetic examples in the documentation but wrong for nearly every real-world integration.

This walks through how to wire up Flink's side-output mechanism for late events, and how to feed the side output into a correction process that keeps downstream consumers honest.

A close-up of fiber optic cables glowing in a server room
Photo by Quoc Anh Tran Duong on Pexels

What Flink does by default

A Flink keyed window with a tumbling window assigner emits a final result when the watermark passes the window's end time. By default, any event arriving with an event time at or before the window's end but after the watermark has passed is dropped silently.

The behavior is documented in the Apache Flink windowing documentation, and the relevant API surface is allowedLateness and sideOutputLateData.

The mental model that helps: the watermark is the pipeline's promise about "I have seen everything up to time T." allowedLateness is a grace period during which the window stays alive and accepts late events. sideOutputLateData is the escape hatch for events arriving past the grace period.

Setting up the side output

OutputTag<MyEvent> lateOutputTag = new OutputTag<MyEvent>("late-events") {};

SingleOutputStreamOperator<Aggregate> mainResult = events
    .keyBy(e -> e.getKey())
    .window(TumblingEventTimeWindows.of(Time.minutes(15)))
    .allowedLateness(Time.minutes(5))
    .sideOutputLateData(lateOutputTag)
    .process(new MyAggregator());

DataStream<MyEvent> lateEvents = mainResult.getSideOutput(lateOutputTag);
lateEvents.addSink(new LateEventSink());
Enter fullscreen mode Exit fullscreen mode

The pattern: a fifteen-minute tumbling window, five-minute grace period, late events captured to a named side output and shipped to a sink. The LateEventSink should be durable; a Kafka topic or a database table both work.

Why allowedLateness alone is not enough

allowedLateness is necessary but not sufficient. It only widens the window's acceptance period to the value you set. Events arriving past that period are still dropped unless sideOutputLateData is also configured.

Setting allowedLateness to a very large value is a common workaround that does not work. The window state has to stay open for the full lateness period, so a one-hour allowed lateness on a fifteen-minute window means Flink holds state for one hour and fifteen minutes per window key. That is a lot of state. The recommended pattern is to set allowedLateness to a tight value (matching the realistic late-event distribution observed in production) and use the side output to absorb the longer-tail late events.

What goes in the LateEventSink

The sink should persist enough context to reconstruct the affected window for correction.

For each late event, write a row with: the original event payload, the event's timestamp, the window key, the window's start time, the window's end time, and the time the late event was received by the pipeline.

That last field, the receive time, is what your correction job uses to do incremental processing. A correction job that runs every fifteen minutes only needs to look at side-output rows with receive_time newer than its last successful run.

The correction job

The correction job is a separate Flink job (or batch job) that reads the side output, recomputes the aggregate for affected windows, and writes corrected aggregates back to the target table.

The mechanics:

# Pseudocode
last_run_ts = read_last_run_timestamp()
late_events = read_side_output(since=last_run_ts)
affected_windows = group_by_window_key_and_window_start(late_events)

for window_key, window_start, events in affected_windows:
    original_events = read_original_events_in_window(window_key, window_start)
    corrected_aggregate = aggregate(original_events + events)
    write_versioned_fact(window_key, window_start, corrected_aggregate, now())

write_last_run_timestamp(now())
Enter fullscreen mode Exit fullscreen mode

The key piece is the versioned-fact write: instead of mutating the original aggregate row, write a new row with a later emitted_at timestamp. Downstream consumers query the latest version per window key.

Idempotent emit

The correction job has to be idempotent. If it crashes mid-run and restarts, it should not produce duplicate corrections.

The cleanest way to achieve idempotency is to make the target table's primary key (window_key, window_start, emitted_at), with emitted_at populated from a deterministic source like "the timestamp of the latest event in the correction batch." Then re-running the same batch produces the same emitted_at and the same primary key, which collides on insert and either no-ops or updates.

For PostgreSQL targets, INSERT ... ON CONFLICT DO UPDATE handles this in one statement. For BigQuery or Snowflake, MERGE does the same.

If your target is a data lake using Apache Iceberg, the Iceberg specification supports merge operations natively and the streaming engine can write through Iceberg's merge-on-read or copy-on-write modes depending on read pattern.

The view that downstream consumers query

CREATE VIEW window_aggregates_current AS
SELECT window_key, window_start, value
FROM (
  SELECT
    window_key,
    window_start,
    value,
    emitted_at,
    ROW_NUMBER() OVER (
      PARTITION BY window_key, window_start
      ORDER BY emitted_at DESC
    ) AS rn
  FROM window_aggregates
) ranked
WHERE rn = 1;
Enter fullscreen mode Exit fullscreen mode

This is the only thing the dashboard, the BI tool, or any downstream consumer should query. They see the latest version per window automatically. The underlying table keeps every version for audit and reconstruction.

Metrics to add

numLateEvents is a counter you can register from inside the ProcessWindowFunction using Flink's metrics API. It increments every time a late event arrives. Tag it by window-key partition if your job has skewed partitions. The Prometheus docs cover the metric-collection layer once Flink is emitting these.

watermarkLag is the gap between current processing time and current watermark, in milliseconds. Flink exposes this as a built-in metric; just enable it in your metrics config.

correctionsEmittedPerHour is a counter from the correction job. Spikes here correlate with upstream incidents.

Wire all three into Prometheus or whatever your stack uses. The point is to have a visible signal when late-event volume changes, so you can retune the grace period or chase down a producer regression.

Tradeoffs to acknowledge

The side-output pattern is not free.

State management costs more because allowedLateness keeps windows open longer. For high-cardinality keys, this can be significant. Watch your state size and adjust if it grows beyond your operational budget.

The correction job adds latency. Downstream consumers see the "almost final" aggregate when the window closes, and the corrected aggregate fifteen minutes to one hour later. For most business reporting this is acceptable; for sub-second reporting it is not.

The versioned-fact pattern adds storage cost because every correction is persisted. Old versions are usually compacted on a quarterly schedule.

These costs are usually worth paying. The alternative is silent drift, which costs more in trust and rework.

What this looks like at 137Foundry

137Foundry has retrofitted this pattern onto live pipelines for clients integrating Stripe webhooks, Salesforce event streams, and internal CDC streams to Snowflake. The typical retrofit is a one- to two-week project, not a full rewrite.

The longer article that frames this in context, including the versioned-fact pattern in more detail and the lambda-architecture alternative, is How to Handle Late-Arriving Data in a Streaming Integration Pipeline Without Corrupting Downstream Reports. The piece above is the Flink-specific recipe; the article is the design rationale.

A note on testing

The hardest part of validating late-event handling is constructing a realistic test case. Most unit tests for streaming jobs assume events arrive in order. The late-event path only fires when events arrive out of order.

The cleanest test pattern: build a fixture stream that emits events with deliberately scrambled event times, including some events past the watermark, and assert that the side output contains the late events and the corrected aggregate matches the expected total. Run the test against an embedded Flink mini-cluster.

Without this test, the late-event handling path is essentially untested in CI, and regressions slip through unnoticed. With it, you can iterate on grace periods and side-output logic without fear.

Top comments (0)