loading...

Transgate is Agent-based taskflow framework for Node.js

tilfin profile image Toshimitsu Takahashi ・5 min read

I made Transgate Agent-based taskflow framework in Node.js .

Why did I make it?

Because the various flows got messed up during I wrote a program to operate my home appliances. They are to obtain temperature and humidity from dyson cool fan regularly, to save it in the database, and to handle messages coming from Google Home / Assistant + IFTTT and operate IRKit. According to the temperature, I also wanted to automatically operate the air conditioner via IRKit. How do you write them?

What thing?

Suddenly I want you to imagine sorting of baggage such as airport. The agent receives items from the gate, processes them, and sends them to another gate. The agent does not know what is going on the other side of the gate. When the agent comes up with an empty item, it finishes the work. The image of the framework architecture looks like this.

The agent can receive items from the gate and send new items to another gate. Item is a simple Object. An agent can concentrate on its own task. So even if the number of previous process or next process increases or decreases, it moves without problems if the schema of the item does not change. And input / output is simple so unit testing is easy. Since the agent does not know the substance of the gate, it can easily be replaced with the input source gate as the stub and the output destination gate as the mock.

Actors in this framework

  • Gate is an endpoint of Input/Output. For example, file storage, database, queue or API service.
  • Agent is a worker to process an item between Input/Output gates and does not know anything opposite gates.
  • Item is an entity as each task target, to be exchanged between gates, and an Object or a JSON. null indicates the terminator.

An example

Let's explain through the home control program that triggered making this framework. By the way this program is running as a daemon on Raspberry PI in my shoebox.

Flow diagram

Flow diagram

Main program (main.js)

const {
  Agent,
  HttpClientGate,
  HttpServerGate,
  IntervalGate,
  JointGate,
  StdoutGate,
  duplicator,
  mixer,
} = require('transgate');

const pino = require('pino')();
const config = require('konfig-yaml')();

const MongoGate = require('./lib/mongo_gate');
const IRKitGate = require('./lib/irkit_gate');

// Agent
const AnalysisCommander = require('./lib/analysis_commander');
const DysonCoolLinkRecorder = require('./lib/dyson/cool_link_recorder');
const EnvironmentalAnalyzer = require('./lib/environmental_analyzer');

// Gate
const slackGate = new HttpClientGate({ endpoint: config.slack.webhook_url });
const iftttGate = new HttpServerGate({ port: config.port });
const irkitGate = new IRKitGate(config.irkit.endpoint);
const intervalGate = new IntervalGate(60);
const mongoGate = new MongoGate(config.mongodb.endpoint, config.mongodb.collection);
const drToEaGate = new JointGate();

(async () => {
  try {
    await Agent.all(
      new AnalysisCommander(iftttGate, { irkitGate, slackGate }),
      new DysonCoolLinkRecorder(intervalGate, duplicator(mongoGate, drToEaGate)),
      new EnvironmentalAnalyzer(drToEaGate, { irkitGate, slackGate }),
    );
  } catch(err) {
    pino.error(err);  
    await iftttGate.close();
    await mongoGate.close();
  }

  intervalGate.clear();
})()
.catch(err => {
  pino.error(err);
});

7 Gates

  • slackGate posts a text message to slack. Even if it is not specially implemented, it will be an instance of HttpClientGate. The item JSON is { "text": "<text message>" }
  • iftttGate uses JSON received from IFTTT's webhook as an item. The item JSON is { "target": "TV", "text": "<speaking words>" }
  • irkitGate instructs infrared transmitter with HTTP interface. The item JSON is { "command": "celling_light_off" }
  • intervalGate creates items at regular intervals. Item is {" time ": <Date instance>}. In this case run agent processing every minute.
  • mongoGate registers items sent to MongoDB's designated collection.
  • drToEaGate is a joint flow of items from DysonCoolLinkRecorder (described later) to EnvironmentalAnalyzer.

3 Agents

  • AnalysisCommander receives JSON from the IFTTT webhook as an item and specifies the infrared signal to be sent to IRKit from the operation target and text. Post it when slack can not interpret wording.
  • DysonCoolLinkRecorder gets temperature and humidity from the Dyson PureCoolLink fan every 1 minute and sends it to the gate which becomes the joint and write to MongoDB across the duplicator.
  • When EnvironmentalAnalyzer exceeds the threshold value from the temperature through the joint, it requests IRKit to operate the air conditioner. When you operate automatically, record it in slack.

Agent implementation

Create a subclass of Agent. Write the code of processing the item on received in the main method and sending a new item to the specified gate. We use the before / after hook method to control (start / stop) the initialization process and another process (eg headless chrome) here.

The following is an implementation example of EnvironmentalAnalyzer. When the room temperature becomes 17 degrees Celsius degree or less, turn on the air conditioner.

const { Agent } = require('transgate');

module.exports = 
class EnvironmentalAnalyzer extends Agent {
  async before() {
    this._preTemp = null;
    this._airconAlive = false;
  }

  async main(item, { irkitGate, slackGate }) {
    const curTemp = item.temp;

    if (this._preTemp && this._preTemp > 17 && curTemp <= 17) {
      if (!this._airconAlive) {
        await irkitGate.sendAll({ command: 'aircon_on' });
        this._airconAlive = true;
        await slackGate.send({ text: `Turn on aircon because temp is down to ${curTemp}` });          
      }
    }

    this._preTemp = curTemp;
  }
}

The reason why the constructor and the input gate are concealed is for the implementation of the specification that when the agent receives null, it sends null to the next gate and ends himself.

The features

  • Suitable for complex daemons and batch programs.
  • Not suitable for processing large volume because it is not assumed that the same agent is running in parallel.
  • Gates and agents that appear and the task flow of items can be defined in the main program. Therefore, the whole can be grasped by only it.
  • Processing of agents can be written in sync with async / await in a pseudo manner. Even if the number of agents increases, they will not be heavy like thread-based.
  • Since it is easy to replace the gate, it is easy to write unit tests of the agent and to confirm partial execution.

Predicting answers to questions

Will the referenced services be all gates?

It's no. Between gates is limited to one way. The agent does not know the detail specification of the gate. In other words, you can not throw a request and get a response to it. It is possible to loop, not round-trip, The response is determined for the request to send out because it is stateless. The gate becomes the part that the trigger to the agent and the part where the agent sends the result.

How to inform the kicker when the series of the flow is over?

The queue system needs to send a completion notice when the task is completed. In such a case, you can turn the flow by giving the item its context. Make sure the last gate is responsible for sending completion notification.

Should the logger be a gate?

If logs are the outputs themselves, they should be gated. Then you can easily replace the gate with something that will be jointed to the Agent later and then throw it to the log analysis service from there.

How much logic can we include in the gate?

The gate should be as simple as possible. The agent is designed to make it easier to test. But if you put the logic in the gate itself, you can not replace the input / output destination and test it. However, if it is simple and the common logic in the project, it may be implemented in the gate. If it is complicated, you should make an agent for it, connect the agents by a joint gate.

I would be pleased if you are interested in Transgate.

Japanese version

Discussion

markdown guide