In companies of various sizes, solutions often emerge that from time to time need to be revisited due to maintenance costs, performance issues, scalability, bug hot spots, and a number of other reasons. This cyclical evolution process sometimes requires refactoring the architecture.
At each such iteration, the goal is to lay a foundation with the potential to cover existing needs for the next few years while spending the minimally sufficient amount of resources.
It is worth noting that in practice, depending on the company’s policy, there are varying degrees of tolerance for introducing new solutions. Where this is harder, a simple, understandable, and at the same time effective approach is prioritized.
So, three years ago we built an MVP that needed a mechanism to run jobs on a cron. At the start, we had:
- API service in Scala + ZIO;
- NiFi v1.24;
- ClickHouse and PostgreSQL databases.
We had little NiFi expertise, and the job flow was not as trivial as, for example, moving a JSON from one place to another (for which NiFi is ideal). We needed to cleverly aggregate data along the way, and there was also a CD requirement — changes to job run settings had to be made via the database.
As a result, we wrote our own Scheduler on ZIO because it was faster and, as it seemed to us at the time, more reliable.
Databases
For clarity, ClickHouse contains databases for each consumer: foo
, bar
. Adding a new consumer means creating a database with an identical structure, and there is no centralized storage of consumer information. Each consumer has an identical DDL schema, including the job_settings
table, which contains information about job runs: cron
and job_type
. The Scheduler holds information about these consumers and, when starting, queries the appropriate database for the list of jobs.
In PostgreSQL, there is an entity called locks
, which is needed for distributed locking in our Scheduler.
First challenges
Almost immediately we ran into issues with the homegrown scheduler. They can be ranked from critical to inconvenient as follows:
- difficulty debugging in production;
- distributed locking issues — the scheduler runs in several instances and, despite transactions, every now and then we have to fix the locking mechanism;
- the need to improve the scheduling mechanism itself — for example, dynamically pulling new jobs into the schedule;
- a larger bug surface — as the service develops, the complexity of maintaining the entire project grows, increasing the human factor.
I think that’s enough to start looking for a new solution, but instead of “throw everything away and rewrite from scratch,” let’s think about how we could fix the situation without changing the approach:
- write more tests;
- allocate an orchestrator for task management;
- create an independent worker for each job type;
- set up communication between the orchestrator and workers via a message broker, or build a worker-pool with task distribution.
This will solve part of the problems but create new ones:
- managing the orchestrator and workers will become more complex and require additional effort;
- creating a separate worker for each job type is not cost-effective. Then we would have to group jobs and create group workers. Questions arise... By what principle should we group them? And won’t we face a code junkyard in the future that we’ll still have to deal with?
- writing a large number of tests does not always help avoid bugs, and often they actually slow down the process, with most of the time spent adjusting tests to the current solution. Tests are needed where they are truly needed, namely in sensitive areas. Predicting their necessity in the place where a bug will appear is the same as failing to account for a specific case in the tests themselves.
Phase one. Searching for a solution.
Before searching for a solution, let’s define the criteria that meet our requirements:
- the job launch process must be available out of the box;
- dynamic scheduling — when new jobs appear, they must be added to the schedule;
- transparent execution process — ease of debugging, since we sometimes have to do it in production;
- support for load distribution across nodes;
- moving away from mentioning consumers in code toward centralized descriptions of them;
- support for triggering jobs on demand;
- minimal impact on infrastructure — use existing technologies without involving DevOps.
The first thing that comes to mind in this situation is to move workflow construction to Prefect or Airflow, but among what we had available was only a NiFi cluster, which conceptually does not quite meet our requirements, but we will try to make something out of it.
Our solution will contain three top-level process groups: state manager
, task trigger
, task executor
.
State manager
We want to dynamically store consumer metadata collected from the consumer_foo
, consumer_bar
databases and avoid querying the DB for it each time we need the list of current records.
Accordingly, when NiFi restarts, we warm up the cache: once a minute we check whether an entry is present there, if not — we enrich it, and once an hour we refresh it.
Task trigger
The process group is run on the Primary Node (Scheduling Strategy: Primary node only)
to avoid races, and once a minute it fetches the cron from the job_settings
table for each consumer and compares it with the current time. If the time has not come — FlowFiles are skipped, otherwise they are sent to the input of the task executor process group.
The key point here is the ExecuteScript processor: it checks whether the cron matches the current time. For comparison, we use the cronutils library and the QUARTZ format familiar to NiFi.
import java.time.ZonedDateTime
import groovy.json.JsonSlurper
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.parser.CronParser
import com.cronutils.model.time.ExecutionTime
import static com.cronutils.model.CronType.QUARTZ
def flowFile = session.get()
if(!flowFile) return
try {
// Read the FlowFile body, parse JSON, and extract the cron value
def cronExpression = ''
session.read(flowFile) { inputStream ->
try {
// Parse JSON
def json = new JsonSlurper().parseText(inputStream.getText('UTF-8'))
cronExpression = json.cron?.toString()
} catch (e) {
log.warn("Could not parse JSON from FlowFile content for ${flowFile.id}", e)
}
}
// If cron is empty or missing in JSON, route to failure
if(!cronExpression || cronExpression.trim().isEmpty()) {
log.warn("CRON expression is null or empty for FlowFile ${flowFile.id}. Routing to failure.")
session.transfer(flowFile, REL_FAILURE)
return
}
// We use QUARTZ, which matches NiFi's scheduler
def cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(QUARTZ)
def parser = new CronParser(cronDefinition)
// Parse cron from the FlowFile body
def quartzCron = parser.parse(cronExpression.trim())
// Create ExecutionTime, which can compute according to cron
def executionTime = ExecutionTime.forCron(quartzCron)
// Does current time match our cron?
if (executionTime.isMatch(ZonedDateTime.now())) {
session.transfer(flowFile, REL_SUCCESS)
} else {
session.transfer(flowFile, REL_FAILURE)
}
} catch(e) {
log.error("Failed to process cron for FlowFile ${flowFile.id}", e)
session.transfer(flowFile, REL_FAILURE)
}
It is important to note that the cron should ignore seconds (i.e., match the syntax “any second”), since the launch can occur in any second at the beginning of the minute, so we neglect this deviation.
Task executor
It routes jobs by the job_type
attribute to the corresponding Input Port of a process group.
Load distribution between cluster nodes occurs using the Round Robin
strategy in the split
queue settings after splitting the list of jobs into individual FlowFiles in the SplitJson processor.
Porting worker logic.
A task’s logic can be non-trivial, and even to implement a simple operation you may need several processors. This can partly be handled with Groovy scripts or custom processors.
The latter are more complicated: we currently run three such legacy processors and, when something happens to one of them, it is often a “black box.” For example, we recently faced property caching — for an unclear reason, NiFi started caching custom processor properties and would not reset them until the processor was recreated. Implementing multithreaded processing is also difficult — there are no built-in tools; you need to write thread-safe wrappers yourself. But even if, for some reason, Groovy scripts are not enough and you need a custom processor, it is advisable to stick to the rule: a processor should solve one specific task — this will greatly simplify debugging later.
There are situations where several processors can be replaced with a small script — in a large process group this greatly simplifies understanding the overall process.
Results of phase one.
So, this implementation has clear limitations:
- Data caching is too frequent; otherwise, when NiFi restarts with an empty cache, jobs will not start.
- It is impossible to separate the trigger mechanism from execution.
- It is impossible to start jobs by trigger from external services.
Among the advantages is the simplicity of implementation, which serves as a foundation for further modification.
It is also worth mentioning that logs here are collected from any nesting level through an Output Port and passed up to the top level, where they are processed by the corresponding processor.
Phase two. Using a message broker.
We can make the process less tightly coupled using message brokers; let’s consider Kafka as an example.
We add a topic with cleanup.policy=compact
to the state_manager
process group, which will contain just one key and be updated once an hour.
The task trigger now writes to the job queue topic instead of sending jobs to an Output Port.
The task executor reads a job from the topic and still routes it to process groups.
This already looks better. This approach allows us to:
- smoothly move to a neighboring NiFi during a version upgrade;
- trigger jobs from any service by pushing the required JSON to the topic;
- store dynamically changing consumer metadata in Kafka with access from other services, and synchronize the cache with the topic.
Using a message broker solves our task, but it does not meet the requirement of minimal impact on the infrastructure.
Phase three. Final implementation
Instead of a message broker, we can use:
-
ListenHTTP processor
. \ Pros: simplicity. \ Cons: you have to open additional ports in the container, which violates the requirement about impact on infrastructure. -
Remote Process Group + Remote Connections Ports
. \ Pros: minimal impact on infrastructure. \ Cons: some overhead, but within acceptable limits. At first glance this option suits us.
State manager.
To receive state from external services, we implement an asynchronous request to an input port and a separate request to receive a response. A trigger FlowFile is sent to the Input Port, which pulls data from the cache and sends it to an Output Port listened to by our service. All Input/Output ports that look outside must have the setting Receive from: Remote connections (site-to-site)
.
Task executor.
In our case, NiFi will send the job queue to itself and have an open port for remote connections (not a physical one, but an abstract one), therefore the receiving process group — task_executor
— must have an Input Port capable of remote connections. Now we can send a trigger to start a job from different NiFi instances or external services.
Task trigger
In the task trigger process group, we add a Remote Process Group
(RPG) and connect the outgoing FlowFile to it.
RPG allows exchanging queues between the Input/Output ports of different NiFi instances/clusters. In the RPG itself we specify a URL like https://host:port/nifi
.
If you deployed NiFi locally in a Docker container, you will most likely have something like: https://7865f4237219:8443/nifi
. When addressing localhost
, NiFi does not find the connection. The pod name also yields an error that the certificate CN contains the domain names localhost
and an alternative option, which is exactly what we need — in my case this is 7865f4237219
. If desired, you can issue your own certificates with the pod name in the CN and drop them into NiFi.
Done, our flow is fully assembled. Based on the requirements, let’s define which tasks we have solved:
- NiFi itself is responsible for launching and distributing jobs;
- the script that checks the match between cron and the current time provides dynamic scheduling;
- debugging has become easier due to transparency of the execution process in the UI;
- load distribution across nodes;
-
state_manager
is responsible for collecting and storing consumer descriptions; - the ability to start jobs from external services via Remote Process Group;
- minimal impact on infrastructure — we did not use additional tools, everything is implemented in NiFi without involving DevOps.
The implementation has assumptions that we ignored in our process:
- the need to write a wrapper around NiFi for external services;
- a small overhead in process groups compared to the architecture through a message broker.
Let’s consider this the price paid for the constraints.
Results
This is the exploration we ended up with. NiFi has many settings and knobs you can tweak to build the process you need. But it has two clear minor downsides — an outdated interface and the browser hanging when the UI is left open for a long time in a neighboring tab (at least in version 1.24; there is already 2.x, where this may have been fixed), and one tangible downside — the development and deployment of custom processors.
Whether or not to use NiFi depends on the specific conditions and tasks, but even within it you can do quite a lot.
All three examples are on my GitHub; I’ll be glad if they serve as a starting point for your tasks.
If the article saved you time, inspired your own solution, or you have faced a similar task, leave your feedback in the comments — it would be interesting to read.
I would also appreciate a rating for the article and if you visit my Journal, where I publish only what ends up in my bookmarks.
Top comments (0)