<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Vivek Burman</title>
    <description>The latest articles on DEV Community by Vivek Burman (@vivekburman).</description>
    <link>https://dev.to/vivekburman</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F400538%2Fd0ff1ed3-78ff-4bbb-8085-35d8ebfd81e5.jpeg</url>
      <title>DEV Community: Vivek Burman</title>
      <link>https://dev.to/vivekburman</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/vivekburman"/>
    <language>en</language>
    <item>
      <title>How to Data Engineer the ETLFunnel Way</title>
      <dc:creator>Vivek Burman</dc:creator>
      <pubDate>Sat, 01 Nov 2025 16:26:00 +0000</pubDate>
      <link>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-jee</link>
      <guid>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-jee</guid>
      <description>&lt;h2&gt;
  
  
  Part 4 — Mastering Pipeline Termination
&lt;/h2&gt;

&lt;h3&gt;
  
  
  When Your Pipeline Needs to Know When to Stop
&lt;/h3&gt;

&lt;p&gt;Picture this: You've built a beautiful streaming data pipeline. Data flows in smoothly, transformations happen like clockwork, and everything seems perfect. Then you check your cloud bill at the end of the month and realize your pipeline has been running 24/7, processing mostly empty queues during off-peak hours. Sound familiar?&lt;/p&gt;

&lt;p&gt;This is where &lt;strong&gt;Termination Rules&lt;/strong&gt; come in — one of &lt;strong&gt;ETLFunnel's&lt;/strong&gt; most powerful features for managing long-running and streaming pipelines.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Problem: Infinite Pipelines in a Finite World
&lt;/h3&gt;

&lt;p&gt;Traditional batch pipelines have it easy. They process all available data and exit gracefully when done. But modern data engineering often involves:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Streaming pipelines&lt;/strong&gt; that continuously listen for new events&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Pub/Sub consumers&lt;/strong&gt; waiting for messages that might never come&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Event-driven workflows&lt;/strong&gt; that need to respond to sporadic triggers&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Real-time processors&lt;/strong&gt; that should adapt to business hours&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;These pipelines don't naturally terminate. They run forever — or until something crashes, resources run out, or someone manually kills them. That's wasteful, expensive, and frankly, inelegant.&lt;/p&gt;

&lt;h3&gt;
  
  
  Enter Termination Rules
&lt;/h3&gt;

&lt;p&gt;Termination Rules in ETLFunnel provide intelligent, configurable exit conditions for your pipelines. Think of them as the "smart shutdown" mechanism that knows when continuing is pointless or unproductive.&lt;/p&gt;

&lt;p&gt;Instead of letting your pipeline run indefinitely, you define clear conditions for when it should gracefully exit:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;TerminateRule&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;maxRecords&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="kt"&gt;uint64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;10000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;idleTimeout&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;5&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;MaxRecords&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxRecords&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;IdleTimeout&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;   &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;idleTimeout&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This simple rule says: "Stop after processing 10,000 records OR after 5 minutes of no activity — whichever comes first."&lt;/p&gt;

&lt;h3&gt;
  
  
  The Four Pillars of Termination
&lt;/h3&gt;

&lt;p&gt;ETLFunnel gives you four powerful mechanisms to control pipeline termination:&lt;/p&gt;

&lt;h3&gt;
  
  
  1. MaxRecords: The Volume Limit
&lt;/h3&gt;

&lt;p&gt;Perfect for: Development, testing, sampling large datasets&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight css"&gt;&lt;code&gt;&lt;span class="nt"&gt;maxRecords&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="nt"&gt;uint64&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="err"&gt;1000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nt"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="nt"&gt;models&lt;/span&gt;&lt;span class="nc"&gt;.TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="py"&gt;MaxRecords&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="err"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxRecords&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="m"&gt;5&lt;/span&gt; &lt;span class="err"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Use case&lt;/strong&gt;: You're testing a new transformation on production data. Instead of processing millions of records, you sample the first 1,000 to verify everything works correctly.&lt;/p&gt;

&lt;h3&gt;
  
  
  2. IdleTimeout: The Silence Detector
&lt;/h3&gt;

&lt;p&gt;Perfect for: Event-driven pipelines, off-peak optimization&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight lua"&gt;&lt;code&gt;&lt;span class="n"&gt;idleTimeout&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;
&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="err"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;IdleTimeout&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="err"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;idleTimeout&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;30&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Use case&lt;/strong&gt;: Your pipeline consumes from a message queue. During nights and weekends, new messages are rare. Why keep the pipeline running? If nothing arrives for 10 minutes, shut it down gracefully.&lt;/p&gt;

&lt;h3&gt;
  
  
  3. MaxPipelineTime: The Duration Guard
&lt;/h3&gt;

&lt;p&gt;Perfect for: Cost control, scheduled batch windows&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight lua"&gt;&lt;code&gt;&lt;span class="n"&gt;maxPipelineTime&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Hour&lt;/span&gt;
&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="err"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;MaxPipelineTime&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="err"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxPipelineTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;20&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Use case&lt;/strong&gt;: You have a 2-hour processing window during off-peak hours. Regardless of how much data is available, the pipeline must complete within this window to avoid impacting other systems.&lt;/p&gt;

&lt;h3&gt;
  
  
  4. UserDefinedCheckFunc: The Custom Logic
&lt;/h3&gt;

&lt;p&gt;Perfect for: Complex business rules, adaptive behavior&lt;/p&gt;

&lt;p&gt;This is where termination rules become truly powerful. You can implement any custom logic based on pipeline state:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="n"&gt;UserDefinedCheckFunc&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;checkProps&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CustomTerminateRuleCheckProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleActionTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c"&gt;// Stop if error rate exceeds 15%&lt;/span&gt;
    &lt;span class="n"&gt;errorRate&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;getErrorRate&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;checkProps&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;errorRate&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="m"&gt;0.15&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;checkProps&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Warn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Terminating: High error rate"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Float64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error_rate"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;errorRate&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleActionTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionStop&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;Reason&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"Error rate exceeded 15% threshold"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c"&gt;// Stop outside business hours if idle for 2+ minutes&lt;/span&gt;
    &lt;span class="n"&gt;currentHour&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Hour&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;currentHour&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="m"&gt;9&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;currentHour&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="m"&gt;18&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;idleDuration&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Since&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;checkProps&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;LastMessageAt&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;idleDuration&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="m"&gt;2&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleActionTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionStop&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;Reason&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"Outside business hours and idle"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleActionTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Real-World Scenarios
&lt;/h3&gt;

&lt;h3&gt;
  
  
  Scenario 1: The Smart Streaming Consumer
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Challenge&lt;/strong&gt;: You're consuming from Kafka topics that have variable message rates. During business hours, thousands of messages per second. At night, maybe one every few minutes.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Solution&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;TerminateRule&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;idleTimeout&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;15&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;
    &lt;span class="n"&gt;maxPipelineTime&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;6&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Hour&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;IdleTimeout&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;idleTimeout&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;MaxPipelineTime&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxPipelineTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;   &lt;span class="m"&gt;30&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Result&lt;/strong&gt;: Pipeline runs as long as data flows. If messages stop for 15 minutes, it shuts down gracefully. You can restart it with a scheduled trigger or when new data arrives. No more 24/7 execution for sporadic workloads.&lt;/p&gt;

&lt;h3&gt;
  
  
  Scenario 2: The Cost-Conscious Development Pipeline
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Challenge&lt;/strong&gt;: Developers are testing pipeline changes against production-like data. Full dataset has 50 million records, but you only need to verify logic.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Solution&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;TerminateRule&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;maxRecords&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="kt"&gt;uint64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;5000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;maxPipelineTime&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;MaxRecords&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxRecords&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;MaxPipelineTime&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxPipelineTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;   &lt;span class="m"&gt;5&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Result&lt;/strong&gt;: Each test run processes at most 5,000 records or runs for 10 minutes. Fast feedback, minimal compute costs.&lt;/p&gt;

&lt;h3&gt;
  
  
  Scenario 3: The Adaptive Business-Hours Pipeline
&lt;/h3&gt;

&lt;p&gt;&lt;strong&gt;Challenge&lt;/strong&gt;: You process user activity events, but your users are primarily in US timezones. Running full capacity globally is wasteful.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Solution&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;TerminateRule&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;

        &lt;span class="n"&gt;UserDefinedCheckFunc&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;checkProps&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CustomTerminateRuleCheckProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleActionTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;hour&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Hour&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

            &lt;span class="c"&gt;// US business hours: 14:00-02:00 UTC (9am-9pm EST/PST range)&lt;/span&gt;
            &lt;span class="n"&gt;isBusinessHours&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;hour&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="m"&gt;14&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;hour&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="m"&gt;2&lt;/span&gt;

            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;isBusinessHours&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="c"&gt;// Outside business hours, be more aggressive&lt;/span&gt;
                &lt;span class="n"&gt;idleDuration&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Since&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;checkProps&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;LastMessageAt&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;idleDuration&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="m"&gt;5&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleActionTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionStop&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                        &lt;span class="n"&gt;Reason&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"Outside US business hours with 5min idle"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;

            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleActionTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Result&lt;/strong&gt;: During business hours, pipeline tolerates longer idle periods. Outside business hours, it shuts down quickly if activity drops.&lt;/p&gt;

&lt;h3&gt;
  
  
  Best Practices: Getting Termination Right
&lt;/h3&gt;

&lt;h3&gt;
  
  
  1. Choose the Right CheckInterval
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;CheckInterval&lt;/code&gt; determines how frequently your termination conditions are evaluated. It's a balance:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Too short&lt;/strong&gt; (&amp;lt; 5 seconds): Unnecessary overhead checking conditions constantly&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Too long&lt;/strong&gt; (&amp;gt; 1 minute): Slow to respond, pipeline may run longer than needed&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Sweet spot&lt;/strong&gt;: 10–30 seconds for most use cases
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// For responsive termination
CheckInterval: 10 * time.Second
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;// For lower overhead on stable pipelines
CheckInterval: 30 * time.Second
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  2. Always Log Termination Events
&lt;/h3&gt;

&lt;p&gt;Future you will thank present you for clear termination logging:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;checkProps.Logger.Info("Pipeline terminating",
    zap.String("reason", "idle timeout exceeded"),
    zap.Duration("idle_duration", idleDuration),
    zap.Uint64("total_processed", checkProps.TotalMessages),
    zap.Duration("total_runtime", time.Since(checkProps.StartTime)),
)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  3. Combine Multiple Conditions
&lt;/h3&gt;

&lt;p&gt;Don't rely on a single termination condition. Defense in depth:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="n"&gt;maxRecords&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="kt"&gt;uint64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;1000000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;idleTimeout&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;
&lt;span class="n"&gt;maxPipelineTime&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;4&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Hour&lt;/span&gt;

&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TerminateRuleTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;MaxRecords&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxRecords&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;      &lt;span class="c"&gt;// Safety: never process more than 1M&lt;/span&gt;
    &lt;span class="n"&gt;IdleTimeout&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;idleTimeout&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;     &lt;span class="c"&gt;// Efficiency: stop if quiet&lt;/span&gt;
    &lt;span class="n"&gt;MaxPipelineTime&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;maxPipelineTime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c"&gt;// Control: absolute time limit&lt;/span&gt;
    &lt;span class="n"&gt;CheckInterval&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;   &lt;span class="m"&gt;20&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  4. Test in Development First
&lt;/h3&gt;

&lt;p&gt;Termination logic can be tricky. Test with different scenarios:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;What happens if no data arrives?&lt;/li&gt;
&lt;li&gt;What if data arrives continuously?&lt;/li&gt;
&lt;li&gt;What if there are brief idle periods?&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Use aggressive limits in development:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="c"&gt;// Development settings&lt;/span&gt;
&lt;span class="n"&gt;maxRecords&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="kt"&gt;uint64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;100&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;idleTimeout&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;
&lt;span class="n"&gt;maxPipelineTime&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;5&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Minute&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  5. Monitor Termination Patterns
&lt;/h3&gt;

&lt;p&gt;Track why your pipelines terminate. If they're always hitting &lt;code&gt;MaxPipelineTime&lt;/code&gt;, maybe your processing is too slow. If they're always hitting &lt;code&gt;IdleTimeout&lt;/code&gt;, maybe your upstream data source has issues.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Bottom Line
&lt;/h3&gt;

&lt;p&gt;Termination Rules are your pipeline's exit strategy. They transform potentially runaway processes into well-behaved, cost-effective, and maintainable systems.&lt;/p&gt;

&lt;p&gt;In a world where data never stops flowing, knowing when to stop processing is just as important as knowing how to process it. ETLFunnel's Termination Rules give you that control — simply, powerfully, and elegantly.&lt;/p&gt;

&lt;p&gt;Your cloud bill will thank you. Your operations team will thank you. And you'll sleep better knowing your pipelines won't run forever.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;&lt;em&gt;Ready to optimize your pipeline termination?&lt;/em&gt;&lt;/strong&gt; &lt;em&gt;Visit &lt;a href="https://etlfunnel.com/" rel="noopener noreferrer"&gt;etlfunnel.com&lt;/a&gt; today to sign up for a free trial of our SaaS platform and transform your data engineering workflows.&lt;/em&gt;&lt;/p&gt;

</description>
      <category>go</category>
      <category>datascience</category>
      <category>database</category>
      <category>cloud</category>
    </item>
    <item>
      <title>How to Data Engineer the ETLFunnel Way</title>
      <dc:creator>Vivek Burman</dc:creator>
      <pubDate>Sat, 01 Nov 2025 16:19:53 +0000</pubDate>
      <link>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-2g9d</link>
      <guid>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-2g9d</guid>
      <description>&lt;h1&gt;
  
  
  Part 3 — Checkpoint and Backlog: Building Visibility Into Your Data Pipeline
&lt;/h1&gt;

&lt;p&gt;In &lt;a href="https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-4pg7"&gt;Part 1&lt;/a&gt;, we covered &lt;strong&gt;idempotency, retries, and recovery&lt;/strong&gt; — the foundational mechanisms that make pipelines resilient to failure.&lt;/p&gt;

&lt;p&gt;In &lt;a href="https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-4k2g"&gt;Part 2&lt;/a&gt;, we explored &lt;strong&gt;dynamic orchestration&lt;/strong&gt; — how to scale ETL workloads intelligently based on available hardware and data volume.&lt;/p&gt;

&lt;p&gt;But here's the critical question we haven't answered yet:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;When your pipeline runs successfully, how do you track what actually committed?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;When it fails, where did the failures go, and how do you replay them?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;That's where &lt;strong&gt;checkpoint&lt;/strong&gt; and &lt;strong&gt;backlog&lt;/strong&gt; hooks come in — the twin pillars of pipeline observability and incident management.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Problem
&lt;/h2&gt;

&lt;p&gt;You've built a robust ETL pipeline. It extracts data from Postgres, transforms it, and loads it into Elasticsearch. Everything works… until you need to answer these questions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;"Which records were successfully committed in the last run?"&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;"Did the pipeline skip any records during transformation?"&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;"What happens when a destination write fails?"&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;"Can I replay failed records without reprocessing the entire dataset?"&lt;/strong&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Without structured tracking, you're flying blind. Success and failure both happen in a black box. You have logs, sure — but logs are unstructured, hard to query, and don't give you a recovery path.&lt;/p&gt;

&lt;p&gt;This is where &lt;strong&gt;checkpoint&lt;/strong&gt; and &lt;strong&gt;backlog&lt;/strong&gt; hooks transform your pipeline from a fire-and-forget script into a fully auditable, recoverable data system.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Concept: Checkpoint and Backlog Hooks
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Checkpoint hooks&lt;/strong&gt; fire automatically after every successful write to the destination. They answer the question: &lt;em&gt;"What just committed?"&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Backlog hooks&lt;/strong&gt; fire automatically when a write to the destination fails. They answer the question: &lt;em&gt;"What just failed, and how do I handle it?"&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;Together, they provide:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Data lineage tracking&lt;/strong&gt; — every committed record is logged with metadata.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Incident management&lt;/strong&gt; — every failed record is persisted to a structured dead-letter queue (DLQ) with failure context.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Replay capability&lt;/strong&gt; — backlogged records can be reprocessed as a separate flow, independent of the main pipeline.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let's see how this works in practice.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Implementation
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Checkpoint Hook: Tracking Success
&lt;/h3&gt;

&lt;p&gt;A checkpoint hook is invoked every time data is successfully written to the destination. It receives:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The committed record(s)&lt;/li&gt;
&lt;li&gt;Access to source, destination, and auxiliary databases&lt;/li&gt;
&lt;li&gt;The pipeline execution context&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here's a real implementation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"etlfunnel/execution/models"&lt;/span&gt;
    &lt;span class="s"&gt;"etlfunnel/database/cast"&lt;/span&gt;
    &lt;span class="s"&gt;"encoding/json"&lt;/span&gt;
    &lt;span class="s"&gt;"time"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;Checkpoint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Checkpoint triggered"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"record_count"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;

    &lt;span class="c"&gt;// Cast auxiliary MySQL connection for audit logging&lt;/span&gt;
    &lt;span class="n"&gt;mysqlConn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;cast&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CastAsMySQLDBConnection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AuxiliaryDBConnMap&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"mysql"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to cast MySQL connection"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c"&gt;// Process each committed record&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;auditEntry&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{}{&lt;/span&gt;
            &lt;span class="s"&gt;"pipeline_name"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetName&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="s"&gt;"record_id"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;         &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="s"&gt;"commit_timestamp"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;  &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="s"&gt;"source_table"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"_source_table"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="s"&gt;"destination_table"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"_destination_table"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="s"&gt;"processing_status"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"committed"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="n"&gt;recordJSON&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Marshal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c"&gt;// Persist to audit log&lt;/span&gt;
        &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`
            INSERT INTO pipeline_audit_log 
            (pipeline_name, record_id, commit_timestamp, record_data, processing_status)
            VALUES (?, ?, ?, ?, ?)
        `&lt;/span&gt;

        &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;mysqlConn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;auditEntry&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"pipeline_name"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="n"&gt;auditEntry&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"record_id"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="n"&gt;auditEntry&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"commit_timestamp"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;recordJSON&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;auditEntry&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"processing_status"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to write audit log"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c"&gt;// Update pipeline statistics&lt;/span&gt;
    &lt;span class="n"&gt;updatePipelineStats&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mysqlConn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetName&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;updatePipelineStats&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conn&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Conn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`
        INSERT INTO pipeline_stats (pipeline_name, last_commit_time, record_count)
        VALUES (?, ?, 1)
        ON DUPLICATE KEY UPDATE
        last_commit_time = VALUES(last_commit_time),
        record_count = record_count + 1
    `&lt;/span&gt;
    &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;What does this give you?&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Full audit trail&lt;/strong&gt; — every committed record is logged with timestamp, source, destination, and full payload.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Pipeline metrics&lt;/strong&gt; — track throughput, last commit time, and record count per pipeline.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Event-driven workflows&lt;/strong&gt; — trigger alerts or downstream processes for specific record types (e.g., high-value transactions).&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Backlog Hook: Handling Failure
&lt;/h3&gt;

&lt;p&gt;A backlog hook is invoked when a write to the destination fails. It receives:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The failed record(s)&lt;/li&gt;
&lt;li&gt;Access to auxiliary databases for DLQ storage&lt;/li&gt;
&lt;li&gt;The pipeline execution context&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here's how you implement it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"etlfunnel/execution/models"&lt;/span&gt;
    &lt;span class="s"&gt;"etlfunnel/database/cast"&lt;/span&gt;
    &lt;span class="s"&gt;"encoding/json"&lt;/span&gt;
    &lt;span class="s"&gt;"time"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;Backlog&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Write failure detected"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed_record_count"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;

    &lt;span class="c"&gt;// Cast auxiliary MySQL connection for DLQ storage&lt;/span&gt;
    &lt;span class="n"&gt;mysqlConn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;cast&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CastAsMySQLDBConnection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AuxiliaryDBConnMap&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"mysql"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to cast MySQL connection for backlog"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c"&gt;// Process each failed record&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;recordJSON&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Marshal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c"&gt;// Persist failed record to DLQ&lt;/span&gt;
        &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`
            INSERT INTO failed_records 
            (pipeline_name, record_id, record_data, failure_timestamp, retry_count, status)
            VALUES (?, ?, ?, ?, 0, 'pending')
        `&lt;/span&gt;

        &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;mysqlConn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetName&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
            &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;recordJSON&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
        &lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to store backlog record"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; 
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c"&gt;// Update failure statistics&lt;/span&gt;
    &lt;span class="n"&gt;updateFailureStats&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mysqlConn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetName&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Records successfully backlogged. Continuing pipeline."&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
        &lt;span class="n"&gt;zap&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"backlogged_count"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; 
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;updateFailureStats&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conn&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Conn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;failureCount&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`
        INSERT INTO pipeline_failure_stats (pipeline_name, last_failure_time, failure_count)
        VALUES (?, ?, ?)
        ON DUPLICATE KEY UPDATE
        last_failure_time = VALUES(last_failure_time),
        failure_count = failure_count + VALUES(failure_count)
    `&lt;/span&gt;
    &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;failureCount&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;What does this give you?&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Structured DLQ&lt;/strong&gt; — failed records are stored in a queryable table, not just scattered in logs.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Failure metrics&lt;/strong&gt; — track failure rates, last failure time, and failure count per pipeline.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Incident triage&lt;/strong&gt; — identify and prioritize critical failures for immediate attention.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Safe continuation&lt;/strong&gt; — the main pipeline continues running even when individual writes fail.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  The Replay Flow: Turning Failures Into Recovery
&lt;/h2&gt;

&lt;p&gt;Here's where it all comes together.&lt;/p&gt;

&lt;p&gt;Failed records in the &lt;code&gt;failed_records&lt;/code&gt; table don't just sit there. You can build a &lt;strong&gt;separate replay flow&lt;/strong&gt; that:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Reads records from the DLQ (with &lt;code&gt;status = 'pending'&lt;/code&gt;)&lt;/li&gt;
&lt;li&gt;Applies the same transformations as the main pipeline&lt;/li&gt;
&lt;li&gt;Attempts to rewrite to the destination&lt;/li&gt;
&lt;li&gt;Updates the DLQ status to &lt;code&gt;'resolved'&lt;/code&gt; or &lt;code&gt;'failed_again'&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Tracks replay progress with its own checkpoints&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This is a &lt;strong&gt;first-class recovery mechanism&lt;/strong&gt; — not an ad-hoc script, but a structured flow with the same guarantees as your main pipeline.&lt;/p&gt;




&lt;h2&gt;
  
  
  Why This Matters
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Checkpoint and backlog hooks transform your pipeline from a black box into a glass box.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You get:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Full visibility&lt;/strong&gt; — every committed record is tracked. Every failed record is captured.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Auditable lineage&lt;/strong&gt; — trace any record from source to destination, with timestamps and metadata.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Deterministic recovery&lt;/strong&gt; — replay failed records without reprocessing the entire dataset.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Operational confidence&lt;/strong&gt; — know exactly what succeeded, what failed, and what's pending.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This is the difference between &lt;strong&gt;pipelines that move data&lt;/strong&gt; and &lt;strong&gt;pipelines that move data reliably&lt;/strong&gt;.&lt;/p&gt;




&lt;p&gt;&lt;em&gt;Ready to Build Auditable, Recoverable ETL Pipelines? Visit &lt;a href="https://etlfunnel.com/" rel="noopener noreferrer"&gt;etlfunnel.com&lt;/a&gt; today to sign up for a free trial of our SaaS platform and transform your data engineering workflows.&lt;/em&gt;&lt;/p&gt;

</description>
      <category>database</category>
      <category>go</category>
      <category>dataengineering</category>
      <category>datascience</category>
    </item>
    <item>
      <title>How to Data Engineer the ETLFunnel Way</title>
      <dc:creator>Vivek Burman</dc:creator>
      <pubDate>Sat, 01 Nov 2025 16:14:26 +0000</pubDate>
      <link>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-4k2g</link>
      <guid>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-4k2g</guid>
      <description>&lt;h1&gt;
  
  
  Part 2 — Dynamic Orchestration
&lt;/h1&gt;

&lt;p&gt;In &lt;a href="https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-4pg7"&gt;&lt;strong&gt;Part 1&lt;/strong&gt;&lt;/a&gt;&lt;strong&gt;, **we discussed **reliability&lt;/strong&gt; — how idempotency, retries, and recovery make your ETL pipelines resilient.&lt;/p&gt;

&lt;p&gt;But once things start scaling, reliability alone isn't enough. You start asking:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;"How do I make sure every machine runs at its full potential?"&lt;br&gt;
"What if some datasets are 10× larger than others?"&lt;br&gt;
"Can my orchestration layer adapt automatically?"&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;That's the heart of today's topic:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Dynamic Orchestration&lt;/strong&gt; — tuning your ETL workload based on the machine and the data it's running on.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Problem
&lt;/h2&gt;

&lt;p&gt;You've got 10 flows syncing data from Postgres to Elastic. They're running across multiple regions and machines:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight markdown"&gt;&lt;code&gt; Region | Machine Type       | Postgres Schemas | Data Volume (per schema)
 -----  | -----------------  | ---------------- | -----------------------
 US     | 32-core, 128GB RAM | 200+             | Medium                   
 EU     | 8-core, 32GB RAM   | 50               | Small                    
 APAC   | 16-core, 64GB RAM  | 100              | Very Large               
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If you run the same orchestration plan everywhere, you'll waste compute in the US region and choke the EU one.&lt;/p&gt;

&lt;p&gt;This is where orchestration hooks come in — &lt;strong&gt;to shape your execution plan dynamically&lt;/strong&gt; before any flow even starts.&lt;/p&gt;




&lt;h2&gt;
  
  
  The Concept: Orchestration Hooks
&lt;/h2&gt;

&lt;p&gt;An &lt;em&gt;Orchestration Hook&lt;/em&gt; is a pre-step that decides &lt;strong&gt;how many replicas of a flow or pipeline&lt;/strong&gt; should exist and &lt;strong&gt;how they should be distributed&lt;/strong&gt; based on:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Available hardware (CPU, cores, memory)&lt;/li&gt;
&lt;li&gt;Data volume (number of rows, partitions, tables)&lt;/li&gt;
&lt;li&gt;Connection characteristics (source latency, throughput)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You can define hooks at two levels:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Flow Level&lt;/strong&gt; — to replicate entire flows&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Pipeline Level&lt;/strong&gt; — to replicate pipelines inside a flow&lt;/li&gt;
&lt;/ol&gt;




&lt;h2&gt;
  
  
  The Implementation
&lt;/h2&gt;

&lt;p&gt;Let's look at how this works in &lt;a href="https://etlfunnel.com" rel="noopener noreferrer"&gt;&lt;strong&gt;ETLFunnel&lt;/strong&gt;&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;ETLFunnel's engine defines an orchestration contract:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;OrchestratorEntityDef&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Name&lt;/span&gt;               &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="n"&gt;SourceDBConn&lt;/span&gt;       &lt;span class="n"&gt;IDatabaseEngine&lt;/span&gt;
    &lt;span class="n"&gt;DestDBConn&lt;/span&gt;         &lt;span class="n"&gt;IDatabaseEngine&lt;/span&gt;
    &lt;span class="n"&gt;AuxiliaryDBConnMap&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="n"&gt;IDatabaseEngine&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;OrchestratorProps&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Entity&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="n"&gt;OrchestratorEntityDef&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;OrchestratorTune&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ParentEntityName&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="n"&gt;NewEntityName&lt;/span&gt;    &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="n"&gt;ReplicaProps&lt;/span&gt;     &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="n"&gt;any&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The idea is simple: each entity represents a pipeline unit, and the &lt;strong&gt;orchestrator&lt;/strong&gt; decides how many replicas to create and what their tuning parameters should be.&lt;/p&gt;




&lt;h2&gt;
  
  
  Flow-level orchestration — scale by CPU
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"etlfunnel/execution/models"&lt;/span&gt;
    &lt;span class="s"&gt;"fmt"&lt;/span&gt;
    &lt;span class="s"&gt;"runtime"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;GetFlowOrchestration&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;([]&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c"&gt;// Dynamically detect available CPU cores&lt;/span&gt;
    &lt;span class="n"&gt;numThreads&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;runtime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NumCPU&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;replicas&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorTune&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Entity&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;numThreads&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;replica&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;ParentEntityName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;NewEntityName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"%s_core_%d"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="n"&gt;ReplicaProps&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="n"&gt;any&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="s"&gt;"replica_id"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="s"&gt;"thread_id"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="s"&gt;"total_replicas"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;numThreads&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="s"&gt;"cpu_optimized"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;  &lt;span class="no"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="p"&gt;},&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="n"&gt;replicas&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;replicas&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;replica&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;replicas&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is a &lt;strong&gt;hardware-aware&lt;/strong&gt; orchestration. Each pipeline replica maps to a CPU core, so your ETL workload scales automatically with the available cores on that node.&lt;/p&gt;




&lt;h2&gt;
  
  
  Pipeline-level orchestration — scale by data volume
&lt;/h2&gt;

&lt;p&gt;Let's take it a step further.&lt;/p&gt;

&lt;p&gt;You can use your &lt;code&gt;SourceDBConn&lt;/code&gt; (which implements &lt;code&gt;IDatabaseEngine&lt;/code&gt;) to introspect data size and partition the workload accordingly.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="n"&gt;client_orchestrator_pipeline&lt;/span&gt;
&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"etlfunnel/execution/models"&lt;/span&gt;
    &lt;span class="s"&gt;"fmt"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;GetPipelineOrchestration&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;([]&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"orchestrator props cannot be nil"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;replicas&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorTune&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Entity&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c"&gt;// Example: fetch table stats using your IDatabaseEngine&lt;/span&gt;
        &lt;span class="n"&gt;stats&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;SourceDBConn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetTableStats&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"failed to fetch stats for %s: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="c"&gt;// Split pipelines based on total row count&lt;/span&gt;
        &lt;span class="n"&gt;numReplicas&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;stats&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TotalRows&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt;&lt;span class="n"&gt;_000_000&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;numReplicas&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="m"&gt;4&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;stats&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TotalRows&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;&lt;span class="n"&gt;_000_000&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;numReplicas&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="m"&gt;2&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;numReplicas&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;++&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;replica&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OrchestratorTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;ParentEntityName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;NewEntityName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"%s_partition_%d"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                &lt;span class="n"&gt;ReplicaProps&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="n"&gt;any&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="s"&gt;"replica_id"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="s"&gt;"total_replicas"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;numReplicas&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="s"&gt;"partition_hint"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"split_%d"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                    &lt;span class="s"&gt;"data_driven"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="no"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="p"&gt;},&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="n"&gt;replicas&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nb"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;replicas&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;replica&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;replicas&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now orchestration adapts not just to &lt;strong&gt;the machine&lt;/strong&gt;, but also to &lt;strong&gt;the data itself&lt;/strong&gt;. A pipeline that needs to move 100M rows automatically gets partitioned into multiple smaller replicas, each handling a subset.&lt;/p&gt;




&lt;h2&gt;
  
  
  Combined: Flow + Pipeline Orchestration
&lt;/h2&gt;

&lt;p&gt;In real-world setups, you can even &lt;strong&gt;chain both levels&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;For example:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Flow orchestration splits jobs by CPU capacity.&lt;/li&gt;
&lt;li&gt;Pipeline orchestration further divides heavy tables within each flow.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The result is a perfectly balanced plan — CPU-efficient, data-aware, and regionally optimized.&lt;/p&gt;




&lt;h2&gt;
  
  
  Why It Matters
&lt;/h2&gt;

&lt;p&gt;Dynamic orchestration gives you:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Predictable performance&lt;/strong&gt; across heterogeneous environments.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Elastic scalability&lt;/strong&gt; — every machine operates at full capacity.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data-aware load balancing&lt;/strong&gt; — big datasets automatically spread across workers.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Unified code&lt;/strong&gt; — your flow definitions remain the same, only orchestration changes.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You've effectively made your ETL system &lt;strong&gt;self-tuning&lt;/strong&gt; — a foundational step toward distributed dataflow intelligence.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Ready to build resilient, idempotent ETL pipelines that handle can scale to any environment? Visit &lt;a href="https://etlfunnel.com" rel="noopener noreferrer"&gt;etlfunnel.com&lt;/a&gt; today to sign up for a free trial of our SaaS platform and transform your data engineering workflows.&lt;/em&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>go</category>
      <category>database</category>
      <category>datascience</category>
    </item>
    <item>
      <title>How to Data Engineer the ETLFunnel Way</title>
      <dc:creator>Vivek Burman</dc:creator>
      <pubDate>Sat, 01 Nov 2025 16:07:49 +0000</pubDate>
      <link>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-4pg7</link>
      <guid>https://dev.to/vivekburman/how-to-data-engineer-the-etlfunnel-way-4pg7</guid>
      <description>&lt;h2&gt;
  
  
  Part 1 — Idempotency, Retry, and Recovery
&lt;/h2&gt;

&lt;p&gt;Modern data engineering isn't about moving data from A to B — it's about doing it &lt;em&gt;correctly&lt;/em&gt; every single time. The deeper a pipeline goes, the more fragile it becomes. A single failed batch can leave datasets half-written, cause duplicates downstream, or silently skip records. The hardest part isn't transformation logic — it's keeping the system consistent when things go wrong. At &lt;a href="https://etlfunnel.com" rel="noopener noreferrer"&gt;&lt;strong&gt;ETLFunnel&lt;/strong&gt;&lt;/a&gt;, we solve these challenges to ensure reliable, scalable data pipelines for developers and teams.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Problem: When Retry Becomes Corruption
&lt;/h2&gt;

&lt;p&gt;A failed task is easy to rerun, but doing so safely isn't.&lt;br&gt;
If a data step partially commits results before crashing, the next retry might reprocess the same data again — duplicating rows or overwriting the wrong partitions. Traditional batch pipelines (like simple cron + SQL scripts) have no built-in concept of &lt;strong&gt;idempotency&lt;/strong&gt; — they only know how to rerun.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Retries without idempotency create &lt;em&gt;corruption&lt;/em&gt;.&lt;br&gt;
Retries without structured state tracking create &lt;em&gt;blind spots&lt;/em&gt;.&lt;br&gt;
Retries without visibility create &lt;em&gt;operational debt&lt;/em&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Dead-letter queues (DLQs) are supposed to help here, but most implementations are crude: a folder of failed records, no schema, no context, no replay mechanism. They collect data but don't provide an actual recovery path.&lt;/p&gt;

&lt;p&gt;Data engineers end up writing retry scripts, marking rows manually, or purging partial results — a process that doesn't scale or audit well.&lt;/p&gt;


&lt;h2&gt;
  
  
  The Solution: Declarative Checkpoints and Backlogs
&lt;/h2&gt;

&lt;p&gt;The fix begins with a change in mindset: a pipeline must &lt;em&gt;remember&lt;/em&gt; where it was and &lt;em&gt;record&lt;/em&gt; what failed — consistently.&lt;/p&gt;

&lt;p&gt;Instead of treating idempotency as a postmortem patch, we treat it as a &lt;strong&gt;first-class citizen&lt;/strong&gt; of execution. Each step carries three key definitions:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Checkpoint Hook&lt;/strong&gt;— The checkpoint hook receives a &lt;code&gt;*models.CheckpointProps&lt;/code&gt;. Read the current context and records from that struct, resolve an auxiliary DB engine via &lt;code&gt;CastAsPostgresDBConnection(...)&lt;/code&gt;, and persist a compact checkpoint row (pipeline identifier + last-processed id + timestamp). The hook returns a &lt;code&gt;*models.CheckpointTune&lt;/code&gt; indicating whether the pipeline should continue or stop. The example below shows a conservative, idempotent upsert into a &lt;code&gt;pipeline_checkpoints&lt;/code&gt; table and logs useful events via the provided &lt;code&gt;ILoggerContract&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
 &lt;span class="s"&gt;"context"&lt;/span&gt;
 &lt;span class="s"&gt;"fmt"&lt;/span&gt;
 &lt;span class="s"&gt;"time"&lt;/span&gt;

 &lt;span class="s"&gt;"etlfunnel/execution/models"&lt;/span&gt;

 &lt;span class="s"&gt;"github.com/jackc/pgx/v5"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c"&gt;// Checkpoint persists last processed ID to Postgres.&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;Checkpoint&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;

 &lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetContext&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
 &lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;

 &lt;span class="c"&gt;// Extract last ID&lt;/span&gt;
 &lt;span class="n"&gt;lastRec&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
 &lt;span class="n"&gt;lastIDRaw&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;lastRec&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Warn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"No 'id' field; skipping persist"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="n"&gt;lastIDStr&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"%v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lastIDRaw&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

 &lt;span class="c"&gt;// Resolve Postgres connection (auxiliary or fallback to dest)&lt;/span&gt;
 &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;IDatabaseEngine&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AuxiliaryDBConnMap&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AuxiliaryDBConnMap&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"checkpoint_db"&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt; &lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
   &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DestDBConn&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="n"&gt;pgConn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CastAsPostgresDBConnection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionStop&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"cast failed: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;

 &lt;span class="c"&gt;// Ensure table and upsert&lt;/span&gt;
 &lt;span class="n"&gt;createSQL&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`CREATE TABLE IF NOT EXISTS pipeline_checkpoints (
  pipeline_name TEXT PRIMARY KEY, last_id TEXT, updated_at TIMESTAMP WITH TIME ZONE
 )`&lt;/span&gt;
 &lt;span class="n"&gt;pgConn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;createSQL&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

 &lt;span class="n"&gt;pipelineName&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;getPipelineName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c"&gt;// Helper to derive name&lt;/span&gt;
 &lt;span class="n"&gt;upsertSQL&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`INSERT INTO pipeline_checkpoints (pipeline_name, last_id, updated_at)
  VALUES ($1, $2, $3) ON CONFLICT (pipeline_name) DO UPDATE SET last_id = EXCLUDED.last_id, updated_at = EXCLUDED.updated_at`&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;pgConn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;upsertSQL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lastIDStr&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Persist failed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionStop&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;

 &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Checkpoint for %s at %s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lastIDStr&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
 &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CheckpointTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The checkpoint commits after a successful batch, ensuring that the next retry begins &lt;em&gt;exactly&lt;/em&gt; from the last consistent point.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Backlog Function&lt;/strong&gt; — following the same pattern as the checkpoint function. It uses the &lt;code&gt;*models.BacklogProps&lt;/code&gt; struct, persists failed records to an auxiliary Postgres DB, and returns a &lt;code&gt;*models.BacklogTune&lt;/code&gt; indicating whether to continue or stop the pipeline.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
 &lt;span class="s"&gt;"context"&lt;/span&gt;
 &lt;span class="s"&gt;"encoding/json"&lt;/span&gt;
 &lt;span class="s"&gt;"fmt"&lt;/span&gt;
 &lt;span class="s"&gt;"time"&lt;/span&gt;

 &lt;span class="s"&gt;"etlfunnel/execution/models"&lt;/span&gt;

 &lt;span class="s"&gt;"github.com/jackc/pgx/v5"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c"&gt;// Backlog persists failed records to DLQ table.&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;Backlog&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogProps&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;

 &lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetContext&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
 &lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;

 &lt;span class="c"&gt;// Resolve Postgres connection&lt;/span&gt;
 &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;IDatabaseEngine&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AuxiliaryDBConnMap&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AuxiliaryDBConnMap&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"backlog_db"&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt; &lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
   &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DestDBConn&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="n"&gt;pgConn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CastAsPostgresDBConnection&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
 &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionStop&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"cast failed: %w"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;

 &lt;span class="c"&gt;// Ensure DLQ table&lt;/span&gt;
 &lt;span class="n"&gt;createSQL&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`CREATE TABLE IF NOT EXISTS pipeline_backlog (
  id SERIAL PRIMARY KEY, pipeline_name TEXT, record_data JSONB, error_time TIMESTAMP WITH TIME ZONE DEFAULT now()
 )`&lt;/span&gt;
 &lt;span class="n"&gt;pgConn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;createSQL&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

 &lt;span class="n"&gt;pipelineName&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;getPipelineName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ctx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
 &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;rec&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;dataBytes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Marshal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rec&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
   &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Warn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Marshal failed: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
   &lt;span class="k"&gt;continue&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

  &lt;span class="n"&gt;insertSQL&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;`INSERT INTO pipeline_backlog (pipeline_name, record_data, error_time) VALUES ($1, $2, $3)`&lt;/span&gt;
  &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;pgConn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exec&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;insertSQL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dataBytes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UTC&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
   &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Insert failed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
   &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionStop&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
 &lt;span class="p"&gt;}&lt;/span&gt;

 &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Backlog: %d records for %s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;param&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
 &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BacklogTune&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Action&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;models&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ActionContinue&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each backlog entry includes structured metadata: the step ID, failure reason, and serialized payload. Failed records are &lt;strong&gt;never retried automatically in the main pipeline&lt;/strong&gt;. Instead, they form the source for &lt;strong&gt;separate replay flows&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Separate Replay Flow
&lt;/h2&gt;

&lt;p&gt;Once records are persisted in the backlog, they can be processed as a &lt;strong&gt;dedicated flow&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;This replay flow reads records from the DLQ or auxiliary database.&lt;/li&gt;
&lt;li&gt;It applies the same transformations and business logic as the original pipeline.&lt;/li&gt;
&lt;li&gt;It can also include additional validation, sampling, or recovery logic.&lt;/li&gt;
&lt;li&gt;Progress in the replay flow is tracked independently using its own checkpoints, providing &lt;strong&gt;auditable, deterministic recovery&lt;/strong&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By separating normal execution from replay, you avoid mixing failure handling with live processing, simplify debugging, and maintain idempotent guarantees across both flows. &lt;a href="https://etlfunnel.com" rel="noopener noreferrer"&gt;&lt;strong&gt;ETLFunnel&lt;/strong&gt;&lt;/a&gt; automates this replay capability to make recovery seamless for on-premise and cloud deployments.&lt;/p&gt;




&lt;h2&gt;
  
  
  Why This Works
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Idempotency by Design:&lt;/strong&gt; Checkpoints persist logical progress, not just job status.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Structured Recovery:&lt;/strong&gt; Backlogs store typed errors, making them queryable and auditable.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Replay as a First-Class Flow:&lt;/strong&gt; Recovery is explicit and traceable, avoiding accidental reprocessing in the main pipeline.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Reviewable Operations:&lt;/strong&gt; Failures aren't opaque; they're captured with metadata and visible through structured logs or UI.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This approach ensures that every record — successful or failed — has a deterministic, auditable path, while keeping the main pipeline clean and focused.&lt;/p&gt;




&lt;p&gt;&lt;em&gt;Ready to build resilient, idempotent ETL pipelines that handle failures gracefully? Visit &lt;a href="https://etlfunnel.com" rel="noopener noreferrer"&gt;etlfunnel.com&lt;/a&gt; today to sign up for a free trial of our SaaS platform and transform your data engineering workflows.&lt;/em&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>datascience</category>
      <category>etl</category>
      <category>elt</category>
    </item>
    <item>
      <title>This is  TESt</title>
      <dc:creator>Vivek Burman</dc:creator>
      <pubDate>Tue, 02 Jun 2020 16:36:56 +0000</pubDate>
      <link>https://dev.to/vivekburman/this-is-test-4fm3</link>
      <guid>https://dev.to/vivekburman/this-is-test-4fm3</guid>
      <description>&lt;p&gt;I am a big proponent of always thinking about scale and optimization when you are writing code, no matter how big or small of a product you are working on. A lot of people like to argue that there is no reason to optimize when you don't have to and that is wastes time. Yes, sometimes it does take a little bit of extra time, but down the road that little bit of time can save you a huge headache.&lt;/p&gt;

&lt;p&gt;I'm not talking about rearchitecting a whole feature, I'm talking about the little things you can do while you are coding that I guarantee will make a difference in the future. In this post, I will walk through an example of a small optimization I made recently at DEV with this pull request and how we could of avoided creating that pull request altogether.&lt;/p&gt;

&lt;p&gt;The Most Popular DEV endpoint&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
