<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Maksim</title>
    <description>The latest articles on DEV Community by Maksim (@mbessarab).</description>
    <link>https://dev.to/mbessarab</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3067359%2F239c8d4a-1518-4441-905d-04f8c9a3b85b.png</url>
      <title>DEV Community: Maksim</title>
      <link>https://dev.to/mbessarab</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/mbessarab"/>
    <language>en</language>
    <item>
      <title>Building a Task Manager with Apache NiFi: From Custom Scheduler to Distributed Workflows</title>
      <dc:creator>Maksim</dc:creator>
      <pubDate>Fri, 26 Sep 2025 07:08:00 +0000</pubDate>
      <link>https://dev.to/mbessarab/building-a-task-manager-with-apache-nifi-from-custom-scheduler-to-distributed-workflows-4d66</link>
      <guid>https://dev.to/mbessarab/building-a-task-manager-with-apache-nifi-from-custom-scheduler-to-distributed-workflows-4d66</guid>
      <description>&lt;p&gt;In companies of various sizes, solutions often emerge that from time to time need to be revisited due to maintenance costs, performance issues, scalability, bug hot spots, and a number of other reasons. This cyclical evolution process sometimes requires refactoring the architecture.&lt;/p&gt;

&lt;p&gt;At each such iteration, the goal is to lay a foundation with the potential to cover existing needs for the next few years while spending the minimally sufficient amount of resources.&lt;/p&gt;

&lt;p&gt;It is worth noting that in practice, depending on the company’s policy, there are varying degrees of tolerance for introducing new solutions. Where this is harder, a simple, understandable, and at the same time effective approach is prioritized.&lt;/p&gt;

&lt;p&gt;So, three years ago we built an MVP that needed a mechanism to run jobs on a cron. At the start, we had:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;API service in Scala + ZIO;&lt;/li&gt;
&lt;li&gt;NiFi v1.24;&lt;/li&gt;
&lt;li&gt;ClickHouse and PostgreSQL databases.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We had little NiFi expertise, and the job flow was not as trivial as, for example, moving a JSON from one place to another (for which NiFi is ideal). We needed to cleverly aggregate data along the way, and there was also a CD requirement — changes to job run settings had to be made via the database.&lt;br&gt;
As a result, we wrote our own Scheduler on ZIO because it was faster and, as it seemed to us at the time, more reliable.&lt;/p&gt;
&lt;h3&gt;
  
  
  Databases
&lt;/h3&gt;

&lt;p&gt;For clarity, ClickHouse contains databases for each consumer: &lt;code&gt;foo&lt;/code&gt;, &lt;code&gt;bar&lt;/code&gt;. Adding a new consumer means creating a database with an identical structure, and there is no centralized storage of consumer information. Each consumer has an identical DDL schema, including the &lt;code&gt;job_settings&lt;/code&gt; table, which contains information about job runs: &lt;code&gt;cron&lt;/code&gt; and &lt;code&gt;job_type&lt;/code&gt;. The Scheduler holds information about these consumers and, when starting, queries the appropriate database for the list of jobs.&lt;br&gt;
In PostgreSQL, there is an entity called &lt;code&gt;locks&lt;/code&gt;, which is needed for distributed locking in our Scheduler.&lt;/p&gt;
&lt;h2&gt;
  
  
  First challenges
&lt;/h2&gt;

&lt;p&gt;Almost immediately we ran into issues with the homegrown scheduler. They can be ranked from critical to inconvenient as follows:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;difficulty debugging in production;&lt;/li&gt;
&lt;li&gt;distributed locking issues — the scheduler runs in several instances and, despite transactions, every now and then we have to fix the locking mechanism;&lt;/li&gt;
&lt;li&gt;the need to improve the scheduling mechanism itself — for example, dynamically pulling new jobs into the schedule;&lt;/li&gt;
&lt;li&gt;a larger bug surface — as the service develops, the complexity of maintaining the entire project grows, increasing the human factor.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I think that’s enough to start looking for a new solution, but instead of “throw everything away and rewrite from scratch,” let’s think about how we could fix the situation without changing the approach:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;write more tests;&lt;/li&gt;
&lt;li&gt;allocate an orchestrator for task management;&lt;/li&gt;
&lt;li&gt;create an independent worker for each job type;&lt;/li&gt;
&lt;li&gt;set up communication between the orchestrator and workers via a message broker, or build a worker-pool with task distribution.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This will solve part of the problems but create new ones:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;managing the orchestrator and workers will become more complex and require additional effort;&lt;/li&gt;
&lt;li&gt;creating a separate worker for each job type is not cost-effective. Then we would have to group jobs and create group workers. Questions arise... By what principle should we group them? And won’t we face a code junkyard in the future that we’ll still have to deal with?&lt;/li&gt;
&lt;li&gt;writing a large number of tests does not always help avoid bugs, and often they actually slow down the process, with most of the time spent adjusting tests to the current solution. Tests are needed where they are truly needed, namely in sensitive areas. Predicting their necessity in the place where a bug will appear is the same as failing to account for a specific case in the tests themselves.&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Phase one. Searching for a solution.
&lt;/h2&gt;

&lt;p&gt;Before searching for a solution, let’s define the criteria that meet our requirements:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;the job launch process must be available out of the box;&lt;/li&gt;
&lt;li&gt;dynamic scheduling — when new jobs appear, they must be added to the schedule;&lt;/li&gt;
&lt;li&gt;transparent execution process — ease of debugging, since we sometimes have to do it in production;&lt;/li&gt;
&lt;li&gt;support for load distribution across nodes;&lt;/li&gt;
&lt;li&gt;moving away from mentioning consumers in code toward centralized descriptions of them;&lt;/li&gt;
&lt;li&gt;support for triggering jobs on demand;&lt;/li&gt;
&lt;li&gt;minimal impact on infrastructure — use existing technologies without involving DevOps.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The first thing that comes to mind in this situation is to move workflow construction to Prefect or Airflow, but among what we had available was only a NiFi cluster, which conceptually does not quite meet our requirements, but we will try to make something out of it.&lt;/p&gt;

&lt;p&gt;Our solution will contain three top-level process groups: &lt;code&gt;state manager&lt;/code&gt;, &lt;code&gt;task trigger&lt;/code&gt;, &lt;code&gt;task executor&lt;/code&gt;.&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fc5zyixztezad2cuipp3k.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fc5zyixztezad2cuipp3k.webp" alt="Overall NiFi process group diagram" width="800" height="500"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h3&gt;
  
  
  State manager
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4ux5wegcqf0jzyzvf4da.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4ux5wegcqf0jzyzvf4da.webp" alt="State manager. Version 1." width="800" height="547"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We want to dynamically store consumer metadata collected from the &lt;code&gt;consumer_foo&lt;/code&gt;, &lt;code&gt;consumer_bar&lt;/code&gt; databases and avoid querying the DB for it each time we need the list of current records.&lt;br&gt;
Accordingly, when NiFi restarts, we warm up the cache: once a minute we check whether an entry is present there, if not — we enrich it, and once an hour we refresh it.&lt;/p&gt;
&lt;h3&gt;
  
  
  Task trigger
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxt0cuiapbqwkx8j0668t.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxt0cuiapbqwkx8j0668t.webp" alt="Task trigger. Version 1." width="800" height="639"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The process group is run on the &lt;code&gt;Primary Node (Scheduling Strategy: Primary node only)&lt;/code&gt; to avoid races, and once a minute it fetches the cron from the &lt;code&gt;job_settings&lt;/code&gt; table for each consumer and compares it with the current time. If the time has not come — FlowFiles are skipped, otherwise they are sent to the input of the task executor process group.&lt;br&gt;
The key point here is the ExecuteScript processor: it checks whether the cron matches the current time. For comparison, we use the cronutils library and the QUARTZ format familiar to NiFi.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.ZonedDateTime&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;groovy.json.JsonSlurper&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.cronutils.model.definition.CronDefinitionBuilder&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.cronutils.parser.CronParser&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.cronutils.model.time.ExecutionTime&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;static&lt;/span&gt; &lt;span class="n"&gt;com&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;cronutils&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;model&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;CronType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;QUARTZ&lt;/span&gt; 

&lt;span class="kt"&gt;def&lt;/span&gt; &lt;span class="n"&gt;flowFile&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;span class="k"&gt;if&lt;/span&gt;&lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;flowFile&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="k"&gt;return&lt;/span&gt;

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// Read the FlowFile body, parse JSON, and extract the cron value&lt;/span&gt;
    &lt;span class="kt"&gt;def&lt;/span&gt; &lt;span class="n"&gt;cronExpression&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;''&lt;/span&gt;

    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;read&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flowFile&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt; &lt;span class="n"&gt;inputStream&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// Parse JSON&lt;/span&gt;
            &lt;span class="kt"&gt;def&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;JsonSlurper&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;parseText&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getText&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'UTF-8'&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
            &lt;span class="n"&gt;cronExpression&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;cron&lt;/span&gt;&lt;span class="o"&gt;?.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;warn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"Could not parse JSON from FlowFile content for ${flowFile.id}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="c1"&gt;// If cron is empty or missing in JSON, route to failure&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt;&lt;span class="o"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;cronExpression&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;cronExpression&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;trim&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;warn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"CRON expression is null or empty for FlowFile ${flowFile.id}. Routing to failure."&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;transfer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flowFile&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;REL_FAILURE&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="c1"&gt;// We use QUARTZ, which matches NiFi's scheduler&lt;/span&gt;
    &lt;span class="kt"&gt;def&lt;/span&gt; &lt;span class="n"&gt;cronDefinition&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;CronDefinitionBuilder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;instanceDefinitionFor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;QUARTZ&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

    &lt;span class="kt"&gt;def&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;CronParser&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cronDefinition&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;// Parse cron from the FlowFile body&lt;/span&gt;
    &lt;span class="kt"&gt;def&lt;/span&gt; &lt;span class="n"&gt;quartzCron&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;parse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cronExpression&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;trim&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;

    &lt;span class="c1"&gt;// Create ExecutionTime, which can compute according to cron&lt;/span&gt;
    &lt;span class="kt"&gt;def&lt;/span&gt; &lt;span class="n"&gt;executionTime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ExecutionTime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;forCron&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;quartzCron&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;// Does current time match our cron?&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;executionTime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isMatch&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ZonedDateTime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;transfer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flowFile&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;REL_SUCCESS&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;transfer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flowFile&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;REL_FAILURE&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;"Failed to process cron for FlowFile ${flowFile.id}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;transfer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;flowFile&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;REL_FAILURE&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt; 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It is important to note that the cron should ignore seconds (i.e., match the syntax “any second”), since the launch can occur in any second at the beginning of the minute, so we neglect this deviation.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task executor
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fv7pdmkzfrq8itvs2ouvj.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fv7pdmkzfrq8itvs2ouvj.webp" alt="Task executor. Version 1." width="800" height="488"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;It routes jobs by the &lt;code&gt;job_type&lt;/code&gt; attribute to the corresponding Input Port of a process group.&lt;br&gt;
Load distribution between cluster nodes occurs using the &lt;code&gt;Round Robin&lt;/code&gt; strategy in the &lt;code&gt;split&lt;/code&gt; queue settings after splitting the list of jobs into individual FlowFiles in the SplitJson processor.&lt;/p&gt;

&lt;h3&gt;
  
  
  Porting worker logic.
&lt;/h3&gt;

&lt;p&gt;A task’s logic can be non-trivial, and even to implement a simple operation you may need several processors. This can partly be handled with Groovy scripts or custom processors.&lt;/p&gt;

&lt;p&gt;The latter are more complicated: we currently run three such legacy processors and, when something happens to one of them, it is often a “black box.” For example, we recently faced property caching — for an unclear reason, NiFi started caching custom processor properties and would not reset them until the processor was recreated. Implementing multithreaded processing is also difficult — there are no built-in tools; you need to write thread-safe wrappers yourself. But even if, for some reason, Groovy scripts are not enough and you need a custom processor, it is advisable to stick to the rule: a processor should solve one specific task — this will greatly simplify debugging later.&lt;/p&gt;

&lt;p&gt;There are situations where several processors can be replaced with a small script — in a large process group this greatly simplifies understanding the overall process.&lt;/p&gt;

&lt;h3&gt;
  
  
  Results of phase one.
&lt;/h3&gt;

&lt;p&gt;So, this implementation has clear limitations:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Data caching is too frequent; otherwise, when NiFi restarts with an empty cache, jobs will not start.&lt;/li&gt;
&lt;li&gt;It is impossible to separate the trigger mechanism from execution.&lt;/li&gt;
&lt;li&gt;It is impossible to start jobs by trigger from external services.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Among the advantages is the simplicity of implementation, which serves as a foundation for further modification.&lt;/p&gt;

&lt;p&gt;It is also worth mentioning that logs here are collected from any nesting level through an Output Port and passed up to the top level, where they are processed by the corresponding processor.&lt;/p&gt;

&lt;h2&gt;
  
  
  Phase two. Using a message broker.
&lt;/h2&gt;

&lt;p&gt;We can make the process less tightly coupled using message brokers; let’s consider Kafka as an example.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnrpisejaykxo0tdlejnq.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fnrpisejaykxo0tdlejnq.webp" alt="State manager. Version 2." width="800" height="478"&gt;&lt;/a&gt;&lt;br&gt;
We add a topic with &lt;code&gt;cleanup.policy=compact&lt;/code&gt; to the &lt;code&gt;state_manager&lt;/code&gt; process group, which will contain just one key and be updated once an hour.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fc5f6lxen0xxt84ducq8w.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fc5f6lxen0xxt84ducq8w.webp" alt="Task trigger. Version 2." width="800" height="658"&gt;&lt;/a&gt;&lt;br&gt;
The &lt;strong&gt;task trigger&lt;/strong&gt; now writes to the job queue topic instead of sending jobs to an Output Port.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyk7te2j8d0nt3onpfd7z.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyk7te2j8d0nt3onpfd7z.webp" alt="Task executor. Version 2." width="800" height="498"&gt;&lt;/a&gt;&lt;br&gt;
The &lt;strong&gt;task executor&lt;/strong&gt; reads a job from the topic and still routes it to process groups.&lt;/p&gt;

&lt;p&gt;This already looks better. This approach allows us to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;smoothly move to a neighboring NiFi during a version upgrade;&lt;/li&gt;
&lt;li&gt;trigger jobs from any service by pushing the required JSON to the topic;&lt;/li&gt;
&lt;li&gt;store dynamically changing consumer metadata in Kafka with access from other services, and synchronize the cache with the topic.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Using a message broker solves our task, but it does not meet the requirement of minimal impact on the infrastructure.&lt;/p&gt;

&lt;h2&gt;
  
  
  Phase three. Final implementation
&lt;/h2&gt;

&lt;p&gt;Instead of a message broker, we can use:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;ListenHTTP processor&lt;/code&gt;. \
&lt;strong&gt;Pros&lt;/strong&gt;: simplicity. \
&lt;strong&gt;Cons&lt;/strong&gt;: you have to open additional ports in the container, which violates the requirement about impact on infrastructure.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Remote Process Group + Remote Connections Ports&lt;/code&gt;. \
&lt;strong&gt;Pros&lt;/strong&gt;: minimal impact on infrastructure. \
&lt;strong&gt;Cons&lt;/strong&gt;: some overhead, but within acceptable limits. At first glance this option suits us.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  State manager.
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F66trafuji32jx6h3g6b8.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F66trafuji32jx6h3g6b8.webp" alt="State manager. Version 3." width="800" height="337"&gt;&lt;/a&gt;&lt;br&gt;
To receive state from external services, we implement an asynchronous request to an input port and a separate request to receive a response. A trigger FlowFile is sent to the Input Port, which pulls data from the cache and sends it to an Output Port listened to by our service. All Input/Output ports that look outside must have the setting &lt;code&gt;Receive from: Remote connections (site-to-site)&lt;/code&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task executor.
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffbcqhospga31fxdb3u7j.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ffbcqhospga31fxdb3u7j.webp" alt="Task executor. Version 3." width="800" height="491"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In our case, NiFi will send the job queue to itself and have an open port for remote connections (not a physical one, but an abstract one), therefore the receiving process group — &lt;code&gt;task_executor&lt;/code&gt; — must have an Input Port capable of remote connections. Now we can send a trigger to start a job from different NiFi instances or external services.&lt;/p&gt;

&lt;h3&gt;
  
  
  Task trigger
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyqzuv6u7d68soyzcedqs.webp" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fyqzuv6u7d68soyzcedqs.webp" alt="Task trigger. Version 3." width="800" height="623"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In the task trigger process group, we add a &lt;code&gt;Remote Process Group&lt;/code&gt; (RPG) and connect the outgoing FlowFile to it.&lt;br&gt;
RPG allows exchanging queues between the Input/Output ports of different NiFi instances/clusters. In the RPG itself we specify a URL like &lt;code&gt;https://host:port/nifi&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;If you deployed NiFi locally in a Docker container, you will most likely have something like: &lt;code&gt;https://7865f4237219:8443/nifi&lt;/code&gt;. When addressing &lt;code&gt;localhost&lt;/code&gt;, NiFi does not find the connection. The pod name also yields an error that the certificate CN contains the domain names &lt;code&gt;localhost&lt;/code&gt; and an alternative option, which is exactly what we need — in my case this is &lt;code&gt;7865f4237219&lt;/code&gt;. If desired, you can issue your own certificates with the pod name in the CN and drop them into NiFi.&lt;/p&gt;

&lt;p&gt;Done, our flow is fully assembled. Based on the requirements, let’s define which tasks we have solved:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;NiFi itself is responsible for launching and distributing jobs;&lt;/li&gt;
&lt;li&gt;the script that checks the match between cron and the current time provides dynamic scheduling;&lt;/li&gt;
&lt;li&gt;debugging has become easier due to transparency of the execution process in the UI;&lt;/li&gt;
&lt;li&gt;load distribution across nodes;&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;state_manager&lt;/code&gt; is responsible for collecting and storing consumer descriptions;&lt;/li&gt;
&lt;li&gt;the ability to start jobs from external services via Remote Process Group;&lt;/li&gt;
&lt;li&gt;minimal impact on infrastructure — we did not use additional tools, everything is implemented in NiFi without involving DevOps.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The implementation has assumptions that we ignored in our process:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;the need to write a wrapper around NiFi for external services;&lt;/li&gt;
&lt;li&gt;a small overhead in process groups compared to the architecture through a message broker.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let’s consider this the price paid for the constraints.&lt;/p&gt;

&lt;h2&gt;
  
  
  Results
&lt;/h2&gt;

&lt;p&gt;This is the exploration we ended up with. NiFi has many settings and knobs you can tweak to build the process you need. But it has two clear minor downsides — an outdated interface and the browser hanging when the UI is left open for a long time in a neighboring tab (at least in version 1.24; there is already 2.x, where this may have been fixed), and one tangible downside — the development and deployment of custom processors.&lt;br&gt;
Whether or not to use NiFi depends on the specific conditions and tasks, but even within it you can do quite a lot.&lt;/p&gt;

&lt;p&gt;All three examples are on my &lt;a href="https://github.com/mbessarab/nifi_templates" rel="noopener noreferrer"&gt;GitHub&lt;/a&gt;; I’ll be glad if they serve as a starting point for your tasks.&lt;/p&gt;

&lt;p&gt;If the article saved you time, inspired your own solution, or you have faced a similar task, leave your feedback in the comments — it would be interesting to read.&lt;/p&gt;

&lt;p&gt;I would also appreciate a rating for the article and if you visit my &lt;a href="https://mbessarab.ru/" rel="noopener noreferrer"&gt;Journal&lt;/a&gt;, where I publish only what ends up in my bookmarks.&lt;/p&gt;

</description>
      <category>architecture</category>
      <category>automation</category>
      <category>dataengineering</category>
    </item>
  </channel>
</rss>
