DEV Community

Tech Community for Software AG Tech Community

Posted on • Originally published at tech.forums.softwareag.com on

An Introduction to our new messaging tools

products versions - Version 11.0 preview or better

Introduction

In this article, I will introduce the messaging tools that have been introduced as part of the preview version of Service Designer 11.0 released in December 2023.

The new tools complement our existing messaging services to recognize the changing landscape that IoT and the increasing multitude of different messaging providers bring to integration. More and more of our customers are facing a situation where there is more than one messaging platform or protocol to integrate with, and the volume and the type of events are changing massively.

event-streaming-diagram

Our new messaging tools are designed to work natively with any number of different platforms and offers both the traditional transactional event processing, but also a new event streaming model. The latter pattern is designed to cope with processing large volumes of events and combine them together as part of the processing.

Message processing versus Event Streaming

Traditional message processing inferred that the individual messages represent critical events such as an Order or Invoice, that must be processed as part of a safe, reliable transaction, where the clients are enterprise-level applications.

However, with the rise of IoT and event-driven architectures both the volume, the type of event, and the variety of producers is much more varied. The events in themselves also might hold little value and it is only in the overall trends or patterns that they produce where the value can be obtained. This new pattern requires a very different messaging platform and a very different processing model in order to be able to process the required volumes.

Plethora of messaging platforms

As stated the state of messaging is much more varied and imposes a greater range of requirements than historically. Many different messaging solutions have arisen to meet these different requirements but few are optimized to cater to all requirements. As an example, IoT devices need an efficient low-energy protocol to send events and the receiving messaging hub must be able to scale to manage the 1000’s of clients with minimal overhead. At the same time SaaS applications are becoming more and more event-driven and often use a messaging solution that is provided by the SaaS platform itself, they tend to be designed for low volume, high latency, and secure connectivity. Developers need to be able to easily integrate and process events from both these different solutions.

Messaging reboot

Currently, webMethods offers you both native messaging with Universal Messaging or JMS messaging to allow you to use any JMS messaging platform of your choice. Neither solutions are designed with event streaming in mind, and JMS often lacks access to the features that distinguish modern messaging solutions.

With this in mind, we decided to introduce a whole new messaging framework that would allow you an agnostic method for sending and receiving events from the messaging platform of your choice and introduce a processing mode to make processing event streams easier. The new implementation is provided with a modular package called WmStreaming that will soon be available via our new package registry. You can read more about this in the article Our new delivery channel, webMethods Package Manager.

The new messaging framework has been designed to be agnostic about the messaging platform with the use of a plugin for each provider. The current version is bundled with a plugin for Kafka, but we will be providing both plug-ins for other solutions as well as documenting the plugin SDK to allow developers to integrate natively with any messaging platform of their choice.

Pre-requisite

You can follow this article without requiring any prerequisites but if you want to experiment and follow some of the steps to creating your own event triggers as below then you will need to install the following components locally.

Steps to follow

In the following section, we will examine how you can use the new streaming package bundled with Service Designer to natively integrate with Kafka and process large volumes of events using our new event trigger.

Download the preview version of Service Designer 11.0

Download and install Service Designer 11.0 on your computer. It is already bundled with the new streaming package and the Kafka plug-in.

Spin up a local Kafka environment

Use the following docker-compose template to spin up your own Kafka platform

version: '1'
services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-ui:
    container_name: kafkaui
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - 8080:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
    volumes:
      - /Users/johncarter/Docker/volumes/kafka/dynamic_config.yaml 

Enter fullscreen mode Exit fullscreen mode

Paste the template above into a file called docker-compose.yml and then run the following command from the same directory

$ docker compose up
...
kafkaui | 2024-01-15 10:53:06,158 INFO [parallel-1] o.a.k.c.u.AppInfoParser: Kafka version: 3.3.1
kafkaui | 2024-01-15 10:53:06,158 INFO [parallel-1] o.a.k.c.u.AppInfoParser: Kafka commitId: e23c59d00e687ff5
kafkaui | 2024-01-15 10:53:06,158 INFO [parallel-1] o.a.k.c.u.AppInfoParser: Kafka startTimeMs: 1705315986158
kafkaui | 2024-01-15 10:53:06,426 DEBUG [parallel-1] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: default
kafkaui | 2024-01-15 10:53:36,050 DEBUG [parallel-2] c.p.k.u.s.ClustersStatisticsScheduler: Start getting metrics for kafkaCluster: default
kafkaui | 2024-01-15 10:53:36,147 DEBUG [parallel-1] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: default
kafkaui | 2024-01-15 10:54:06,020 DEBUG [parallel-2] c.p.k.u.s.ClustersStatisticsScheduler: Start getting metrics for kafkaCluster: default
kafkaui | 2024-01-15 10:54:06,107 DEBUG [parallel-2] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: default
kafkaui | 2024-01-15 10:54:36,015 DEBUG [parallel-2] c.p.k.u.s.ClustersStatisticsScheduler: Start getting metrics for kafkaCluster: default
kafkaui | 2024-01-15 10:54:36,130 DEBUG [parallel-1] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: default
kafkaui | 2024-01-15 10:55:06,018 DEBUG [parallel-2] c.p.k.u.s.ClustersStatisticsScheduler: Start getting metrics for kafkaCluster: default
kafkaui | 2024-01-15 10:55:06,135 DEBUG [parallel-1] c.p.k.u.s.ClustersStatisticsScheduler: Metrics updated for cluster: default

Enter fullscreen mode Exit fullscreen mode

Verify that it is up and running by accessing the Kafka console and adding your local cluster e.g.

kafka-ui

Click on the “Configure new cluster” button and simply provide an alias name .e.g. default and a single bootstrap server with the value kafka" and the port “9092”.

We will need a topic to publish events to, you can do this in multiple ways. For our purposes we will use the web interface. Click on the topics link and then the “+ Add a Topic” button. Give it the name temperature and set the number of partitions to 1 and that’s it.

Alternatively, you could use the following command from within the Kafka container.

./kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --partitions 1 --replication-factor 1 \
    --topic temperature

Enter fullscreen mode Exit fullscreen mode

Install the Kafka client libraries

To establish a connection with Kafka, you need to add the Kafka client JAR file and its dependencies to the Integration Server’s class path. Copy the following JAR files from Apache Kafka’s libs folder into your packages jars folder e.g. /IKntegrationServer/packages/JcKafkaExample/code/jars/static

  • kafka-clients-*.jar
  • lz4-java-*.jar (required for compression)
  • snappy-java-*.jar (required for compression)

The Kafka client also utilizes the "slf4j-API-.jar", but there is no need to copy it because it is already included in the common JAR distribution of the Integration Server. You can obtain these JARs from either the “libs” folder of your installed Kafka server or the “libs” folder of a downloaded binary distribution. For example, visit Apache Kafka, pick your desired version, and choose the file from the binary downloads section. Once you open the downloaded file, go to the “libs” folder and copy the JARs from there.

Create a new package to manage your producer and streamer

Create a package to manage both our producer and consumer with the following structure

package-structure

The folders are

  • jc.kafka.example - root folder
  • jc.kafka.example.common - Assets common to both the producer and consumer such as the event structure
  • jc.kafka.example.producer - This is where we will emit events and send them to kafka
  • jc.kafka.example.consumer - We will create our trigger here to the consume the producer emitted events

You can of course replace jc with your initials and note in a real use case it would not be recommended to use the word kafka as the whole point of the new framework is to be provider agnostic i.e. our solution could be easily used with any event streaming/messaging platform by simply swapping in the correct plugin, libraries and updating the connection.

Also, drop the downloaded jar files into the code/jars/static folder of this new package. The static folder won’t exist so you will need to create this yourself before copying.

Creating the Document type

Most events will have some kind of structure so it makes sense to model that structure with a Document in webMethods. The transformation between documents and the events is managed implicitly for events formatted as either JSON or XML.

Create a document type in the documents folder called Temperature with 2 attributes as below

Temperature

The attributes;

  • timestamp - Object with java wrapper attribute set to java.util.Date
  • temperature - Object with java wrapper attribute set to java.lang.Double

Creating the connection

We now need to establish a connection between our integration runtime and Kafka. Open a web page to admin UI of the local Integration Server; localhost:5555 → Streaming → Provider settings → Create Connection Alias as below;

connection

Make sure the alias name is unique, I would recommend including the namespace of the package to make sure that this will also be case the when importing the package into another runtime later. Make sure to select Kafka as the provider type and your package as well.

The provider URI will be ‘localhost’ and ‘29092’ if you didn’t make any changes to the docker-compose.yml file that was provided. Replace the client prefix with something unique but more legible and set the publish mode to synchronous. More reliable but infers more overhead for the producer, which in our case doesn’t matter.

Save your changes and then verify that the connection is enabled correctly. Any errors will show up in the event-streaming.log file under wmServiceDesigner/IntegrationServer/logs

Creating the event specification

The event specification is a means to formalize the complexity of knowing the event structure. As already mentioned event streaming can implicate complex JSON or XML structures or in the case of very simple events a single typed value. Creating event specifications allows developers to share the details of the event structure and allow easier collaboration. Open a web page to admin UI of the local Integration runtime; localhost:5555 → Streaming → Event specifications → Create event specification, name it temperature as below;

event-spec

The key will be a simple string and the value a JSON string as specified by our earlier created Document Type. The translation of the document from JSON or XML will be done automatically when sending or receiving events.

Emitting events via a producer

We can easily publish events to Kafka via the service “pub.streaming:send” provided in the new WmStreaming package. The service is also designed to take an array so that you can emit a collection of events together.

From Service Designer → right-click on the producer folder → New → Flow Service and give the name “emitTemperatures” and format the signature as follows;

emit-temperatures

Make sure that the attribute “temperatures” is based on a Document Reference List". Then in the implementation add a loop to map /temperatures to /events. Add a map step inside the loop to map the temperatures to a value object inside events and also set the key to %keyPrefix%-%$iteration% using variable substitution.

producer-map

Once done add an invoke step for the service “pub.streaming:send”. The events should map automatically to the input, you need only specify the connectionAliasName and eventSpecificationName. They should match the name of the connection alias and specification that you created earlier.

Run the service and it should execute without error. Of course, nothing much will happen as we haven’t yet built any consumer to take advantage of these events.

Creating a trigger to consume events

To consume the above events we need to use our new event trigger. Right-click on the consumer folder → New → Event Trigger and name it collectTemperatures with the following settings

The new trigger is very powerful and has two variations;

  • Transactional - Nearly identical in functionality to our existing UM based trigger, only that this one can be used with any messaging provider for which we have a plug-in.
  • Event streaming - Consumes multiple events and allows you to easily filter and aggregate them together so that they can be consumed more easily as a stream.

The steps to completing your trigger are;

  1. Select the connection alias.
  2. Indicate what events you want to process and how via the Receive and decode section.
  3. Set the trigger type in the process section.
  4. Implement multiple processing options on the stream such as filtering, transforming or calculating.
  5. Pass the processed stream onto a flow service or forward to a different queue/topic.
  6. Tune trigger performance via the properties section.

Select the connection & event specification

The connection is the alias that you created earlier in the admin UI. You should not reference a connection in a different package and ensure that your package is self-contained unless you like the dependency nightmare that ensues. Remember packages should be easily deployable and ideally be 100% operational after deployment.

connection-spec

The trigger needs to know what topic or queue from which it will consume events and also be able to know how the events are structured so that they can be filtered and aggregated easily. That is why we created the event specification earlier and needs to be selected in the “receive and decode” section.

Processing the stream

The next step is to choose how you want to process events, you can choose between;

  1. Process a single event - A single event will retrieved from the queue or topic.
  2. Process a count-based window of events - You need to specify a maximum number of events that will be fetched and forwarded to the processing section, switch to time-based if there is an insufficient number of events in the queue and use the value from “max waiting time”.
  3. Process a time-based window of events - This will fetch all events from the queue or topic since the last interval.

Option (1) is almost identical in behavior to a traditional UM or JMS trigger and allows you to process individual transactions as required. Note that the calculate processing option is disabled in this case as it makes no sense.

Option (2) and (3) are new processing behaviors that are aimed at event streaming, where you need to efficiently process large volumes of transient events, aggregate them and easily perform calculations. Be aware that they will attempt to fetch as many events dictated by your configuration; for count based the upper limit is known. However, for time-based there is potentially no known upper limit, so you must ensure that you have a reasonable interval in relation to the volume of documents.

In the example below I have picked the option for “count-based” window, where a maximum of 100 events will be fetched at a time, or after 10 seconds if less than 100 events are available in the queue or topic.

process-type

These 100 events will be then streamed into the processing section, where you can choose what to do with each event; namely

  • filter - Exclude certain events from the stream based on criteria specified here, using the same format as a flow branch statement e.g. %/stream/value/temperature% > 10.0
  • transform - Allow the event structure and attributes to be mapped via a generated flow service, importantly all following steps will then have to operate based on the new structure
  • calculate - Aggregate numerical data from the events, such as calculating averages, etc.
  • collect - group data into silos

calculate-1

For instance, the following example showcases a use case where we average out the temperature, whilst excluding all temperatures under 10.0 degrees, before finally invoking a flow service with the results.

filter

Invoking the stream target

The last step is to either forward the final result to a queue or topic of your choice or invoke a generated flow service called the stream_target as shown below

stream-target

The service is auto-generated and includes the signature to ensure that the transformations and calculations that you have built in the process section can be easily referenced. Any changes you make to this service should be retained if you make changes. However, it is good practice to minimize the code that you include in this generated service and instead delegate it to your own services.

For simplicity’s sake, you can simply add a trace pipeline to the generated flow service and then run the ‘emitTemperatures’ service that you created earlier; You should see something similar to the following in your server log.

2024-01-17 09:50:36 CET [ISP.0090.0001C] (tid=275) --- START tracePipeline [17/01/2024, 09:50] ---
2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 0 stream_calculations[0] {[Lcom.wm.data.IData;} =>
2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 element {java.lang.String} = '/stream/value/temperature'
2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 count {java.lang.String} = '6'
2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 average {java.lang.String} = '12.9'
2024-01-17 09:50:36 CET [ISP.0090.0008C] (tid=275) 1 min {java.lang.String} = '11.1'
2024-01-17 09:50:36 CET [ISP.0090.0002C] (tid=275) --- END tracePipeline ---

Enter fullscreen mode Exit fullscreen mode

The first thing you will notice is you no longer receive the actual events if using a calculated step in your trigger, only the final calculations, the count, and the average, and will exclude any events where the temperature was less than 10 degrees.

Next steps

You can download the example package JcKafkaExample if you don’t want to write the code yourself. Please remember you can only install this package as of today in the preview version of Service Designer 11.0

We will soon provide the new event streaming feature as a standalone package WmStreaming available from our package registry using wpm.

Useful links | Relevant resources

JcKafkaExample - Demo package.

webMethods Package Manager - Tech article on our new delivery channel for packages.

Read full topic

Top comments (0)