DEV Community

Cover image for Workbench for Apache NiFi data flows
Jannik Rebmann
Jannik Rebmann

Posted on • Originally published at jannikrebmann.de

Workbench for Apache NiFi data flows

This article presents the concept and implementation of a universal workbench for Apache NiFi data flows.

The workbench is intended to improve the quality of your data flows by increasing their:

  1. Testability - How to test the specified functionality of my data flows?
  2. Extensibility - How to ensure the functionality of my data flows after changes?
  3. Reliability - How to define and test edge cases of my data flows?

The article is primarily aimed at advanced Apache NiFi users, but is also of interest to beginners who are in the process of learning basic development concepts.

Motivation

Following up my last post Setup a secure Apache NiFi cluster in Kubernetes, I would now like to cover an important topic regarding the quality of the Apache NiFi data flow.

Before I began using Apache NiFi, I took a close look at its tools and concepts. While I was excited about its potential, I found the standards and tools for developing data flows lacking. To be honest, I was expecting something like this:

The picture above is a partial screenshot of the nodejs project [gitex-flow](https://github.com/gitex-flow/gitex-flow-vscode) unit tests in visual studio code.

The green lights give me a feeling of security and well-being. It signals to me that I can make adjustments and changes to the code without having to worry about destroying the functionality. Also, unit tests give me an entry point to understand programs and develop new features.

Even if it is a bit challenging at first, test-driven development is a very stable approach and prevents problems before they occur. Especially when extensions and refactorings are required after the first releases, the test-driven approach is unbeatable in my eyes.

That doesn't mean that there aren't other great approaches. But unfortunately, I couldn't find any established approaches or best practices for developing data flows in Apache NiFi. Apart from a few articles on unit testing custom processors and using GenerateFlowFile processors for basic smoke testing, the number of articles about securing the functionality of data flows is small.

Working in an environment that's always changing, where adapting functionality and ensuring reliability are key, I realized I needed to develop my own structured approach.

That's when I came up with the idea of a Data Flow Workbench.

The basic idea is straightforward:
Simply encapsulate a data flow within a predefined and immutable context. Inputs and expected outputs remain the same, while only the data flow itself changes. This approach is similar to the principles of unit testing but needs to be tailored for data flow scenarios.

Concept

Lets dive deeper into the concept of the Apache NiFi Workbench.

Apache NiFi data flows can be organized within process groups, which is similar to organize code within functions. A ProcessGroup has defined inputs and outputs (represented by input and output ports) that are connected by processing pipelines of process groups and processors. A FlowFile is the data which flows through the piplines and consists of attributes and a content.

Now you know everything you need to know about data flows in order to understand the following illustration, which shows the basic structure of the Workbench. The process group PG main contains the data flow to be tested.

Apache NiFi Workbench Concept

Splitting the flow files into their attributes and contents as separate inputs allows them to be defined as separate files. The workbench requires the attributes as JSON, while the content can be in any file format. The flow files of all inputs are related via the assert.group.identifier attribute. This ensures that the correct parts are put together.

An example input can look like this:

[input] Attributes [input] Content [input] Expected Attributes [input] Expected Content
[attribute] assert.group.identifier test test test test
[content] { "done": false } Hello { "done": true } Hello world

Workflow

As everywhere in software engineering, the quality of the software depends heavily on a proper design. For this reason, a clear and modular structure of the data flows is strongly recommended. In particular, it makes sense to decouple the data sources and data targets from the processing logic.

---------------      -----------     ---------------
| Data Source |  --> | PG_main | --> | Data Target |
---------------      -----------     ---------------
Enter fullscreen mode Exit fullscreen mode

If that's the scenario, the processing logic PG_main can be integrated into the workbench as an isolated module, allowing us to define simulated inputs along with their corresponding expected outputs.

However, the workbench truly unleashes its full potential when paired with the Apache NiFi Registry. Once changes are made to PG_main, they can be committed to version management and seamlessly integrated into productive data flows.

Implementation

TL;DR In this section the implementation of the workbench is explained in more detail.
If you like to test and explorer the workbench yourself, you can directly jump to the conclusion to download the workbench template.

The workbench consists of two core modules (implemented as process groups):

  1. Build FlowFile: Builds a FlowFile from given attributes and a specific content.
  2. Assert FlowFile: Compares the attributes and the content of two flow files and fails if they are different or pass if they are equal.

The implementation of the two core modules is described in more detail in the following two sections.

Process Group: Build FlowFile

This module merges the attributes with their corresponding content into a single FlowFile. Sounds simple for common programming languages, but is tricky for data flows, as there is no defined order in which files arrive.

NiFi provides a standard processor (MergeContent) for this purpose. The Create fragments processors ensure that the required attributes of the MergeContent processor are properly configured for incoming flow files. After the merge, the mime type is restored and some helper attributes are cleaned up.

Module Build FlowFile

Everything is actually pretty straight forward, but you may have noticed that I skipped the JSON to attributes processor. Let's take a closer look. Unfortunately there is no standard processor for extracting a JSON from the flow file content as attributes. So it needs to be implemented.
This can be done with the ExecuteScript processor, which makes it possible to execute user-defined code on incoming flow files. There are various programming languages to choose from and I decided to use Jython (python running on the Java plattform).

The following code retrieves a flow file, reads and parses its content as JSON and adds the keys with their values as attributes.

import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback

class JsonInputStreamCallback(InputStreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream):
        jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        self.dict = json.loads(jsonStr)

flowFile = session.get()
if flowFile != None:
    try:
        callback = JsonInputStreamCallback()
        session.read(flowFile, callback)
        newFlowFile = session.create(flowFile)
        newFlowFile = session.putAllAttributes(newFlowFile, callback.dict)
        session.remove(flowFile)
        session.transfer(newFlowFile, REL_SUCCESS)
    except Exception as e:
        log.error("An error occured", e)
        session.transfer(flowFile, REL_FAILURE)
Enter fullscreen mode Exit fullscreen mode

Process Group: Assert FlowFile

The second core module compares the actual flow file against the expected one. The subsequent flow expands on the idea by adding the hash of the file's content as an attribute, and then proceeds to compare all existing attributes.
There are some core attributes like uuid and filename (see CoreAttributes for the complete list) that are ignored in the comparison.

Module Assert FlowFile

The heart of the data flow is the processor Assert FlowFile which compares two flow files with each other. As Apache NiFi does not provide a standard processor for this either, this logic is also implemented using an ExecuteScript processor.

It tries to find the corresponding actual or expected flow file to be compared. If there is no matching file with the same attribute assert.group.identifier it is pushed back to the queue. Otherwise the attributes of both files get compared and asserted.

from java.util import HashMap
from org.apache.nifi.flowfile.attributes import CoreAttributes
from org.apache.nifi.processor import FlowFileFilter
from org.apache.nifi.processor.FlowFileFilter import FlowFileFilterResult

FILE_TYPE_KEY = 'assert.file.type'
GROUP_IDENTIFIER_KEY = 'assert.group.identifier'

class FlowFileTypeFilter(FlowFileFilter):
    def __init__(self, type, identifier = None):
        self._type = type
        self._identifier = identifier
    def filter(self, flowFile):
        type = flowFile.getAttribute(FILE_TYPE_KEY)
        if type == self._type:
            if self._identifier:
                identifier = flowFile.getAttribute(GROUP_IDENTIFIER_KEY)
                if identifier == self._identifier:
                    return FlowFileFilterResult.ACCEPT_AND_TERMINATE
                else:
                    return FlowFileFilterResult.REJECT_AND_CONTINUE
            else:
                return FlowFileFilterResult.ACCEPT_AND_TERMINATE
        else:
            return FlowFileFilterResult.REJECT_AND_CONTINUE

def getFlowFile(type, identifier = None):
    flowFiles = session.get(FlowFileTypeFilter(type, identifier))
    if not flowFiles.isEmpty():
        return flowFiles[0]
    else:
        return None

def getAttributes(flowFile):
    map = HashMap(flowFile.getAttributes())
    map.remove(FILE_TYPE_KEY)
    for attr in CoreAttributes.values():
        attrName = attr.key()
        if map.containsKey(attrName):
            map.remove(attrName)
    return map

def compare(actual, expected):
    actualAttrs = getAttributes(actual)
    expectedAttrs = getAttributes(expected)
    if len(actualAttrs) != len(expectedAttrs):
        return "The number of attributes differs"
    for key in actualAttrs:
        if expectedAttrs[key] != actualAttrs[key]:
            return 'Attribute "{}" differs (actual / expected): {} / {}'.format(
                key, actualAttrs[key], expectedAttrs[key])
    return None

actual = getFlowFile("actual")
if actual != None:
    expected = getFlowFile(
        "expected", actual.getAttribute(GROUP_IDENTIFIER_KEY))
    if expected != None:
        try:
            diffMsg = compare(actual, expected)
            session.remove(expected)
            actual = session.removeAttribute(actual, FILE_TYPE_KEY)
            if diffMsg:
                actual = session.putAttribute(actual, 'assert_message', diffMsg)
                session.transfer(actual, REL_FAILURE)
            else:
                session.transfer(actual, REL_SUCCESS)
        except Exception as e:
            log.error('Something went wrong', e)
            session.rollback(True)
    else:
        session.rollback(True)
Enter fullscreen mode Exit fullscreen mode

Conclusion

The workbench presented in this article can be used universally for all isolated dataflows in Apache NiFi. It therefore offers a great extension for the development and testing of data flows and thus makes a valuable contribution to increasing the quality of data flows.

The attributes and content of the flow files can be input separately in the Workbench. This characteristic makes the concept easily extendable to a test suite for Apache NiFi, which I will present in a follow-up article.

If you want to look at the workbench in detail feel free to download the workbench as an Apache NiFi 1.25 template:

Download Workbench

Top comments (5)

Collapse
 
oyeyemi_owolabi_72d3cdc3b profile image
oyeyemi owolabi

Thank you so much for this.
Sir, i am a newbie in Nifi, please do you have a plan to do a short video tutorial on this workbench for us newbie to learn hands-on.

Collapse
 
jrebmann profile image
Jannik Rebmann • Edited

Thank you for your interest. I have never created a tutorial video before. However, if any other readers would be interested in such a tutorial, I would be happy to give it a try 😊

Collapse
 
oyeyemi_owolabi_72d3cdc3b profile image
oyeyemi owolabi

Thank you sir for the response.
I believed many would be happy to see such a video because so many people have been asking on Nifi Slack Channel how to conduct testing on their Nifi flow pipelines.
I would like to get your permission to link this article on Nifi Slack Channel.

Regards.

Thread Thread
 
jrebmann profile image
Jannik Rebmann

Thank you for pointing out the Slack channel. I have now joined it. You definitely have the permission to share the article but it is already linked in slack. That's great 😊

Collapse
 
lavesh90 profile image
Lavesh

Article is awesome.

Would be really keen to join a session.
Cheers!