This article presents the concept and implementation of a universal workbench for Apache NiFi data flows.
The workbench is intended to improve the quality of your data flows by increasing their:
- Testability - How to test the specified functionality of my data flows?
- Extensibility - How to ensure the functionality of my data flows after changes?
- Reliability - How to define and test edge cases of my data flows?
The article is primarily aimed at advanced Apache NiFi users, but is also of interest to beginners who are in the process of learning basic development concepts.
Motivation
Following up my last post Setup a secure Apache NiFi cluster in Kubernetes, I would now like to cover an important topic regarding the quality of the Apache NiFi data flow.
Before I began using Apache NiFi, I took a close look at its tools and concepts. While I was excited about its potential, I found the standards and tools for developing data flows lacking. To be honest, I was expecting something like this:
The green lights give me a feeling of security and well-being. It signals to me that I can make adjustments and changes to the code without having to worry about destroying the functionality. Also, unit tests give me an entry point to understand programs and develop new features.
Even if it is a bit challenging at first, test-driven development is a very stable approach and prevents problems before they occur. Especially when extensions and refactorings are required after the first releases, the test-driven approach is unbeatable in my eyes.
That doesn't mean that there aren't other great approaches. But unfortunately, I couldn't find any established approaches or best practices for developing data flows in Apache NiFi. Apart from a few articles on unit testing custom processors and using GenerateFlowFile
processors for basic smoke testing, the number of articles about securing the functionality of data flows is small.
Working in an environment that's always changing, where adapting functionality and ensuring reliability are key, I realized I needed to develop my own structured approach.
That's when I came up with the idea of a Data Flow Workbench.
The basic idea is straightforward:
Simply encapsulate a data flow within a predefined and immutable context. Inputs and expected outputs remain the same, while only the data flow itself changes. This approach is similar to the principles of unit testing but needs to be tailored for data flow scenarios.
Concept
Lets dive deeper into the concept of the Apache NiFi Workbench.
Apache NiFi data flows can be organized within process groups, which is similar to organize code within functions. A ProcessGroup
has defined inputs and outputs (represented by input and output ports) that are connected by processing pipelines of process groups and processors. A FlowFile
is the data which flows through the piplines and consists of attributes
and a content
.
Now you know everything you need to know about data flows in order to understand the following illustration, which shows the basic structure of the Workbench. The process group PG main
contains the data flow to be tested.
Splitting the flow files into their attributes and contents as separate inputs allows them to be defined as separate files. The workbench requires the attributes as JSON, while the content can be in any file format. The flow files of all inputs are related via the assert.group.identifier
attribute. This ensures that the correct parts are put together.
An example input can look like this:
[input] Attributes |
[input] Content |
[input] Expected Attributes |
[input] Expected Content |
|
---|---|---|---|---|
[attribute] assert.group.identifier |
test | test | test | test |
[content] |
{ "done": false } |
Hello | { "done": true } |
Hello world |
Workflow
As everywhere in software engineering, the quality of the software depends heavily on a proper design. For this reason, a clear and modular structure of the data flows is strongly recommended. In particular, it makes sense to decouple the data sources and data targets from the processing logic.
--------------- ----------- ---------------
| Data Source | --> | PG_main | --> | Data Target |
--------------- ----------- ---------------
If that's the scenario, the processing logic PG_main
can be integrated into the workbench as an isolated module, allowing us to define simulated inputs along with their corresponding expected outputs.
However, the workbench truly unleashes its full potential when paired with the Apache NiFi Registry. Once changes are made to PG_main
, they can be committed to version management and seamlessly integrated into productive data flows.
Implementation
TL;DR
In this section the implementation of the workbench is explained in more detail.
If you like to test and explorer the workbench yourself, you can directly jump to the conclusion to download the workbench template.
The workbench consists of two core modules (implemented as process groups):
-
Build FlowFile
: Builds aFlowFile
from givenattributes
and a specificcontent
. -
Assert FlowFile
: Compares theattributes
and thecontent
of two flow files and fails if they are different or pass if they are equal.
The implementation of the two core modules is described in more detail in the following two sections.
Process Group: Build FlowFile
This module merges the attributes
with their corresponding content
into a single FlowFile
. Sounds simple for common programming languages, but is tricky for data flows, as there is no defined order in which files arrive.
NiFi provides a standard processor (MergeContent
) for this purpose. The Create fragments
processors ensure that the required attributes of the MergeContent
processor are properly configured for incoming flow files. After the merge, the mime type is restored and some helper attributes are cleaned up.
Everything is actually pretty straight forward, but you may have noticed that I skipped the JSON to attributes
processor. Let's take a closer look. Unfortunately there is no standard processor for extracting a JSON from the flow file content as attributes. So it needs to be implemented.
This can be done with the ExecuteScript
processor, which makes it possible to execute user-defined code on incoming flow files. There are various programming languages to choose from and I decided to use Jython (python running on the Java plattform).
The following code retrieves a flow file, reads and parses its content as JSON and adds the keys with their values as attributes.
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback
class JsonInputStreamCallback(InputStreamCallback):
def __init__(self):
pass
def process(self, inputStream):
jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
self.dict = json.loads(jsonStr)
flowFile = session.get()
if flowFile != None:
try:
callback = JsonInputStreamCallback()
session.read(flowFile, callback)
newFlowFile = session.create(flowFile)
newFlowFile = session.putAllAttributes(newFlowFile, callback.dict)
session.remove(flowFile)
session.transfer(newFlowFile, REL_SUCCESS)
except Exception as e:
log.error("An error occured", e)
session.transfer(flowFile, REL_FAILURE)
Process Group: Assert FlowFile
The second core module compares the actual flow file against the expected one. The subsequent flow expands on the idea by adding the hash of the file's content as an attribute, and then proceeds to compare all existing attributes.
There are some core attributes like uuid
and filename
(see CoreAttributes for the complete list) that are ignored in the comparison.
The heart of the data flow is the processor Assert FlowFile
which compares two flow files with each other. As Apache NiFi does not provide a standard processor for this either, this logic is also implemented using an ExecuteScript
processor.
It tries to find the corresponding actual or expected flow file to be compared. If there is no matching file with the same attribute assert.group.identifier
it is pushed back to the queue. Otherwise the attributes of both files get compared and asserted.
from java.util import HashMap
from org.apache.nifi.flowfile.attributes import CoreAttributes
from org.apache.nifi.processor import FlowFileFilter
from org.apache.nifi.processor.FlowFileFilter import FlowFileFilterResult
FILE_TYPE_KEY = 'assert.file.type'
GROUP_IDENTIFIER_KEY = 'assert.group.identifier'
class FlowFileTypeFilter(FlowFileFilter):
def __init__(self, type, identifier = None):
self._type = type
self._identifier = identifier
def filter(self, flowFile):
type = flowFile.getAttribute(FILE_TYPE_KEY)
if type == self._type:
if self._identifier:
identifier = flowFile.getAttribute(GROUP_IDENTIFIER_KEY)
if identifier == self._identifier:
return FlowFileFilterResult.ACCEPT_AND_TERMINATE
else:
return FlowFileFilterResult.REJECT_AND_CONTINUE
else:
return FlowFileFilterResult.ACCEPT_AND_TERMINATE
else:
return FlowFileFilterResult.REJECT_AND_CONTINUE
def getFlowFile(type, identifier = None):
flowFiles = session.get(FlowFileTypeFilter(type, identifier))
if not flowFiles.isEmpty():
return flowFiles[0]
else:
return None
def getAttributes(flowFile):
map = HashMap(flowFile.getAttributes())
map.remove(FILE_TYPE_KEY)
for attr in CoreAttributes.values():
attrName = attr.key()
if map.containsKey(attrName):
map.remove(attrName)
return map
def compare(actual, expected):
actualAttrs = getAttributes(actual)
expectedAttrs = getAttributes(expected)
if len(actualAttrs) != len(expectedAttrs):
return "The number of attributes differs"
for key in actualAttrs:
if expectedAttrs[key] != actualAttrs[key]:
return 'Attribute "{}" differs (actual / expected): {} / {}'.format(
key, actualAttrs[key], expectedAttrs[key])
return None
actual = getFlowFile("actual")
if actual != None:
expected = getFlowFile(
"expected", actual.getAttribute(GROUP_IDENTIFIER_KEY))
if expected != None:
try:
diffMsg = compare(actual, expected)
session.remove(expected)
actual = session.removeAttribute(actual, FILE_TYPE_KEY)
if diffMsg:
actual = session.putAttribute(actual, 'assert_message', diffMsg)
session.transfer(actual, REL_FAILURE)
else:
session.transfer(actual, REL_SUCCESS)
except Exception as e:
log.error('Something went wrong', e)
session.rollback(True)
else:
session.rollback(True)
Conclusion
The workbench presented in this article can be used universally for all isolated dataflows in Apache NiFi. It therefore offers a great extension for the development and testing of data flows and thus makes a valuable contribution to increasing the quality of data flows.
The attributes and content of the flow files can be input separately in the Workbench. This characteristic makes the concept easily extendable to a test suite for Apache NiFi, which I will present in a follow-up article.
If you want to look at the workbench in detail feel free to download the workbench as an Apache NiFi 1.25 template:
Top comments (5)
Thank you so much for this.
Sir, i am a newbie in Nifi, please do you have a plan to do a short video tutorial on this workbench for us newbie to learn hands-on.
Thank you for your interest. I have never created a tutorial video before. However, if any other readers would be interested in such a tutorial, I would be happy to give it a try 😊
Thank you sir for the response.
I believed many would be happy to see such a video because so many people have been asking on Nifi Slack Channel how to conduct testing on their Nifi flow pipelines.
I would like to get your permission to link this article on Nifi Slack Channel.
Regards.
Thank you for pointing out the Slack channel. I have now joined it. You definitely have the permission to share the article but it is already linked in slack. That's great 😊
Article is awesome.
Would be really keen to join a session.
Cheers!