DEV Community

Tech Community for Software AG Tech Community

Posted on • Originally published at tech.forums.softwareag.com on

Managing device alarms in Cumulocity IoT in a functional style

Apama Streaming Analytics version 10.15.3 and above. Cumulocity IoT Streaming Analytics version 10.18 and above.

The task

The task we’re going to tackle in this post is, given a list of devices in the form of an EPL sequence<ManagedObject> , to check for any active alarms for each of the devices and then clear all of those alarms. Finally, we want to know how many alarms needed clearing and when all of those alarms have been cleared. Using just regular EPL constructs you would need to:

  • Create some sequences to hold in-flight pending events:
sequence<integer> outstandingIds := new sequence<integer>;
sequence<Alarm> allAlarms := new sequence<Alarm>;

Enter fullscreen mode Exit fullscreen mode
  • Iterate over the sequence of managed objects:
ManagedObject o;
for o in objects {

Enter fullscreen mode Exit fullscreen mode
  • Send a bunch of FindAlarmrequests:
integer reqId := Util.generateReqId();
send FindAlarm(reqId, { "source" : o.id }) to FindAlarm.SEND_CHANNEL;
outstandingIds.append(reqId);

Enter fullscreen mode Exit fullscreen mode
  • Listen for all the FindAlarmResponseevents and count the events to be cleared:
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);

on all FindAlarmResponse(reqId=reqId) as resp and not FindAlarmResponseAck(reqId=reqId) {
    allAlarms.append(resp.alarm);
}

Enter fullscreen mode Exit fullscreen mode
  • Listen for all of the FindAlarmResponseAckevents and record that we’ve received them:
on FindAlarmResponseAck(reqId=reqId) {
    outstandingIds.remove(outstandingIds.indexOf(reqId));

Enter fullscreen mode Exit fullscreen mode
  • Once all of them have been received, log that they’re all cleared:
if outstandingIds.size() = 0 then {
     monitor.unsubscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL));
     Alarm a;
     for a in allAlarms {
         a.status := "CLEARED";
         send a to Alarm.SEND_CHANNEL;
     }
     log "Cleared "+allAlarms.size().toString()+" alarms from "+objects.size().toString()+" devices" at INFO;
}

Enter fullscreen mode Exit fullscreen mode

Functional programming

The EPL Functional Library provides various functional operations, such as filter, mapand reduce, which operate on EPL container types. There are also several functor actions and predicates which can help you use these operators. There are two APIs for accessing the functional operators: com.apama.functional.Fn and com.apama.functional.Functional.

Firstly, you can use the Fn type, which provides static functions which operate on containers, for example:

sequence<integer> evens := <sequence<integer>> Fn.filter(numbers, Fn.even);

Enter fullscreen mode Exit fullscreen mode

Secondly, you can use theFunctional type, which wraps your container and provides the functional operators as instance methods, each one returning a new Functionalwrapping the result container. This allows chaining of multiple operators in a fluent style. For example, this code wraps the original sequence of numbers, filters out just the even numbers and then adds them up, yielding a single integer as the result:

integer evenSum := <integer> Functional(numbers).filter(Fn.even).reduce(Fn.sum);

Enter fullscreen mode Exit fullscreen mode

Besides operations on static containers, EPL is a language designed to handle streams of events. There are also functional-style ways of doing that. For example, if you have a variable number of outstanding operations and you need to wait for a completed event from each one before continuing you could use:

Functional(sequenceIDs).waitForAllCompleted( "Completed", "id", onCompleted).onTimeout(TIMEOUTSECS, onTimeout);

Enter fullscreen mode Exit fullscreen mode

Partially binding functions

Functional programming, and APIs that rely on callbacks, often need to provide additional arguments to functions that will be invoked by the APIs. The EPL Functional Library allows you to partially bind arguments to actions and provide those as an argument to the API. This is similar to how a lambda would be used in other languages. For example:

Fn.map(["Bob", "Alice"], Fn.partial(Fn.concat, "Hello ")) // returns ["Hello Bob", "Hello Alice"]

Enter fullscreen mode Exit fullscreen mode

Managing alarms functionally

We’re going to use all of the features above to implement the alarm clearing in a functional fashion. To do this we will write a single Functionalchain, using a few simple helper actions. The main functional component looks like this:

monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);

sequence<Alarm> allAlarms := new sequence<Alarm>;
any _ := Functional(objects)
   .map(Fn.getEntry("id")) // get the id from each object.
   .map(sendFindAlarm) // send a find alarm for each, returning the ids.
   .map(Fn.partial(receiveAlarms, [allAlarms])) // listen for all those alarms.
   .waitForAllCompleted(FindAlarmResponseAck.getName(), "reqId", Fn.partial(clearAllAlarms, [allAlarms])); // clear all the alarms we found.

Enter fullscreen mode Exit fullscreen mode

This can be read as follows:

  • Take my list of managed objects and get the id from each one.
  • For each id, call the sendFindAlarmfunctor, which will send a request and return the request ids.
  • For each request id, call receiveAlarms, which will listen for all of the alarms from that object and build the allAlarmssequence.
  • Finally wait for all of the ids to be complete and call clearAllAlarmswith the built up sequence of alarms.

There’s logic in the helper actions, but the functional style allows you to see the overall logic in a compact fashion together and then drill down into the details when you need it.

The sendFindAlarmaction is very simple and self-explanatory:

action sendFindAlarm(string source) returns integer
{
    return <integer> Fn.sendToChannel(FindAlarm.SEND_CHANNEL,
            FindAlarm(Util.generateReqId(), {"source": source.toString() }))
            .getEntry("reqId");
}

Enter fullscreen mode Exit fullscreen mode

The receiveAlarmsfunction uses another part of the FunctionalAPI:

action receiveAlarms(sequence<Alarm> allAlarms, integer reqId) returns integer
{
     any _ := Fn.getAllEvents(FindAlarmResponse.getName(), {"reqId":<any>reqId},
                     FindAlarmResponseAck.getName(), {"reqId":<any>reqId},
                     Fn.partial(appendSequence, [allAlarms]));
     return reqId;
}

Enter fullscreen mode Exit fullscreen mode

Here we’re asking Functional to collect together all of the responses received before the acknowledgement.

Finally, clearAllAlarmsis also implemented as a functional call:

action clearAllAlarms(sequence<Alarm> allAlarms)
{
   monitor.unsubscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
   any _ := Functional(allAlarms)
                    .map(Fn.setEntry("status", "CLEARED"))
                    .map(Fn.partial(Fn.sendToChannel, Alarm.SEND_CHANNEL));
}

Enter fullscreen mode Exit fullscreen mode

Further reading

You can see the full sample in the latest release of Apama in the samples along with the full documentation of using functional operations in EPL and the API doc.

Read full topic

Top comments (0)