This article is part one of the two-part series on how to write a custom extension for Azure Function. In this article, we will look into how to create a custom trigger. In part two, we will look into how to create a custom binding. Combining two articles into one will make it very large and will difficult to read and understand.
In this series of articles, we will explore how to create NATS custom extension.
NATS messaging system
NATS is a simple, secure, and highly scalable messaging system. It is a perfect fit for Microservice architecture, IoT applications, cloud-native applications, etc.
NATS can run on a large cloud instance and also in low-end devices like edge devices or IoT devices. It is a CNCF project and integrates with Kubernetes and other modern systems.
Azure Function Custom Extension
We can use Azure WebJob SDK to develop Azure Functions custom extensions.
There are two types of extension, a Trigger, and Bindings. The trigger causes a function to run. In most of the cases, a trigger will have data associated with it. The Azure Function receives this trigger data as a parameter.
Binding is a method to connect other resources to the function. Input binding receives the data from the external source, and an output binding sends the data to the external source.
An Attribute class defines every Trigger and Binding. An Attribute class defines all the parameters and configuration values for the trigger or extension.
Attribute class is a crucial component in custom extension. When a consumer defines a trigger or binding, the system looks for a corresponding attribute class and initialize it. For example, if a consumer specifies a RabbitMQTrigger trigger, the system looks for RabbitMQTriggerAttribute class. When a consumer specifies a RabbitMQ binding, the system looks for RabbitMQAttribute. Similarly, for KafkaTrigger, the system looks for KafkaTriggerAttribute, for Kafka binding, the system looks for KafkaAttribute.
Custom Trigger
To create a custom Trigger, we need to:
- Define a class that extends from Attribute. This class represents our attribute class. We define all the parameters and configuration values for our trigger. In our case, we define connection string and NATS channels.
-
Define a class that implements the interface
IListener
. This class contains the logic to connect to our external event source and wait for events. In our case, we will connect to the NATS server and look for incoming messages. The IListener interface has the following functions:- StartAsync:- The system calls this function to start our listener. This function returns one Task object that completes when our listener successfully started.
- StopAsync:- The system calls this function to stop our listener. This function returns one Task object that completes when the listener completely stopped.
- Cancel:- The system calls this function to cancel any ongoing listen operation.
- Dispose:- IDisposable's dispose function.
-
Define a class that implements the interface
ITriggerBinding
. In this class, we create our listener and bind our trigger data. The ITriggerBinding interface has the following functions:- CreateListenerAsync:- The system calls this function to create a listener. This function returns a Task object that has our listener.
- BindAsync:- This function is called to bind a specified value using a binding context. When our listener receives an event, we try to execute the function, passing the event data. This event data is encapsulated in a
TriggeredFunctionData
class and send to the Azure Function. In theBindAsync
, we will bind this value to our corresponding data. This function returns aTriggerData
class.TriggerData
class accepts a class that implements anIValueBinder
interface and a read-only dictionary. We will revisit this later in this article. - ToParameterDescriptor:- The system calls this function to get a description of the binding.
-
Define a class that implements the interface
IValueBinder
. As I explained in theBindAsync
section, we are binding the trigger data to our data class using this class. TheIValueBinder
has three methods:- GetValueAsync:- Returns a task that has the value object.
- SetValueAsync: - Returns a task that completes when the object to our data class completes.
- ToInvokeString:- Returns object string.
-
Define a class that implements the interface
ITriggerBindingProvider
. This class is a provider class that returns a class that implements theITriggerBinding
interface. This class has the following function:- TryCreateAsync:- The system call this function to get a class that implements the
ITriggerBinding
interface. The system will pass aTriggerBindingProviderContext
class as a parameter. In this function, we check whether theTriggerBindingProviderContext
object contains our custom attribute. If theAttribute
is present, we will create TriggerBinding class and return a Task.
- TryCreateAsync:- The system call this function to get a class that implements the
-
Create a class that implements the interface
IExtensionConfigProvider
. TheIExtensionConfigProvider
defines an interface enabling third party extension to register. The interface has the following function:- Initialize:- In this function, we will register all our triggers and bindings.
-
And finally, we create a class that implements the interface
IWebJobStartup
. This interface defines the configuration actions to perform when the Function Host starts up. This interface has the following function:- Configure:- The system call this function when the function host initializes. In this function, we will add our custom extension.
So basically what happens is when the system starts, it searches for a class that implements IWebJobStartup
. When it found a class that implements the interface:
- The system calls the Configure method passing the
IWebJobsBuilder
object. We add the extension using theAddExtension
method using the class that implements theIExtensionConfigProvider
interface.- The system calls the Initialise method of the
IExtensionConfigProvider
passingExtensionConfigContext
as a parameter. Our implementation of the Initialize method adds the add the binding rule using theAddBindingRule
method of theExtensionConfigContext
, which returns aBindingRule
object. We call theBindToTrigger
method to add our trigger passingTriggerBindingProvider
as a parameter. - After that system calls the
TryCreateAsync
function of theTriggerBindingProvider
passing theTriggerBindingProviderContext
as a parameter, in thisTryCreateAsync
method, we check whether ourAttribute
class present or not. We create our class that implements theITriggerBinding
interface and return a Task that contains the object. - The system then calls the
CreateListenerAsync
method of our class that implements theITriggerBinding
interface passingListenerFactoryContext
object. In ourCreateListenerAsync
, we return a class that implements theIListener
interface. TheListenerFactoryContext
object contains a class that implements theITriggeredFunctionExecutor
interface. TheITriggeredFunctionExecutor
interface has a method calledTryExecuteAsync
. Using this method, we can execute the triggered function, passing the event data andCancellationToken
.
- The system calls the Initialise method of the
Creating NATS Custom Extension
As stated before, we are creating a NATS extension here. We will use Visual Studio for development. We will also use the NATS client library MyNatsClient. MyNatsClient is a handy library for .NET to connect to NATS.
Custom Extension is a Standard .NET Library. You need to add the following packages using NuGet. Open the NuGet manager and search for:
Microsoft.Azure.WebJobs.Extensions
MyNatsClient
Create Trigger
As I mentioned before, we need to create an Attribute
class first. Our attribute class NatsTriggerAttribute is as follows:
using System;
using Microsoft.Azure.WebJobs.Description;
namespace WebJobs.Extension.Nats
{
/// <summary>
/// <c>Attribute</c> class for Trigger
/// </summary>
[AttributeUsage(AttributeTargets.Parameter)]
[Binding]
public class NatsTriggerAttribute: Attribute
{
// <summary>
// Connection string in the form of nats://<username>:<password>@localhost
// </summary>
public string Connection { get; set; }
// Channel string
public string Channel { get; set; }
// <siummary>
// Helper method to get connection string from environment variables
// </summary>
internal string GetConnectionString()
{
return Environment.GetEnvironmentVariable(Connection);
}
}
}
The class has only two members, Connection
and Channel
. The Connection represents the NATS connection string, and the Channel represents the NATS channel to listen. There is a helper method, GetEnvironmentVariable
, also defined in the class, which will retrieve the connection string from the environment variable and return.
Next, we need to create a NatsListener
class. Here is our listener class:
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using MyNatsClient.Rx;
namespace WebJobs.Extension.Nats
{
/*
The NatsListner class
Implements the <c>IListener</c> interface. Contains the code to connect
to a NATS server and subscribe a Channel.
*/
public class NatsListener: IListener
{
private readonly ITriggeredFunctionExecutor _executor;
private readonly NatsTriggerContext _context;
/// <summary>
/// NatsListener constructor
/// </summary>
///
/// <param name="executor"><c>ITriggeredFunctionExecutor</c> instance</param>
/// <param name="context"><c>NatsTriggerContext</c> instance</param>
///
public NatsListener(ITriggeredFunctionExecutor executor, NatsTriggerContext context)
{
_executor = executor;
_context = context;
}
/// <summary>
/// Cancel any pending operation
/// </summary>
public void Cancel()
{
if (_context == null || _context.Client == null) return;
_context.Client.Disconnect();
}
/// <summary>
/// Dispose method
/// </summary>
public void Dispose()
{
_context.Client.Dispose();
}
/// <summary>
/// Start the listener asynchronously. Subscribe to NATS channel and
/// wait for message. When a message is received, execute the function
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A Task returned from Subscribe method</returns>
public Task StartAsync(CancellationToken cancellationToken)
{
return _context.Client.Subscribe(_context.TriggerAttribute.Channel, stream => stream.Subscribe(msg => {
var triggerData = new TriggeredFunctionData
{
TriggerValue = msg.GetPayloadAsString()
};
var task = _executor.TryExecuteAsync(triggerData, CancellationToken.None);
task.Wait();
}));
}
/// <summary>
/// Stop current listening operation
/// </summary>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.Run(() =>{
_context.Client.Disconnect();
});
}
}
}
As you can see, the Listener class receives two parameters, ITriggeredFunctionExecutor
and a NatsTriggerContext
instance.
We use the ITriggeredFunctionExecutor
instance to execute the triggered function when we receive a message.
The NatsTriggerContext
object has two member variables. The TriggerAttribute
variable is an object of our Attribute
class. The Client
variable is an object of the NatsClient
class, which is a wrapper class around the MyNatsClient
library. Here is context class:
namespace WebJobs.Extension.Nats
{
/// <summary>
/// Trigger context class
/// </summary>
public class NatsTriggerContext
{
/// <summary>
/// <c>Attribute</c> instance
/// </summary>
public NatsTriggerAttribute TriggerAttribute;
/// <summary>
/// <c>NatsClient</c> instance to connect and subscribe to NATS
/// </summary>
public NatsClient Client;
/// <summary>
/// Constructor
/// </summary>
/// <param name="attribute">Attribute instnace</param>
/// <param name="client">NatsClient instance</param>
public NatsTriggerContext(NatsTriggerAttribute attribute, NatsClient client)
{
this.TriggerAttribute = attribute;
this.Client = client;
}
}
}
The NatsListener
class uses the NatsClient
object to subscribe to a NATS channel. When we receive a message, we invoke the function using the ITriggeredFunctionExecutor
instance.
Next, we need to create TriggerBinding
and TriggerBindingProvider
class. There are relatively simple classes. The TryCreateAsync
in the NatsTriggerBindingProvider
class create NatsTriggerBinding
instance and return. One thing to note here is, creating the context class. We create the NatsTriggerContext
class instance by calling the CreateContext
method of the NatsExtensionConfigProvider
class. We will create the context class and pass it to the NatsTriggerBinding
object.
Next, we create the NatsExtensionConfigProvider
class. Inside the Initialize method, we create a rule using the AddBindingRule
method and bind the binding provider to it.
Another major thing to note here is that the INatsServiceFactory
instance we are receiving in the constructor. We will revisit this in the next section. Just remember that the system will pass this as a parameter.
Next and finally, we need to create a startup class. Our startup class looks like this:
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Hosting;
using WebJobs.Extension.Nats;
[assembly: WebJobsStartup(typeof(NatsBinding.Startup))]
namespace WebJobs.Extension.Nats
{
/// <summary>
/// Starup object
/// </summary>
public class NatsBinding
{
/// <summary>
/// IWebJobsStartup startup class
/// </summary>
public class Startup : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder)
{
// Add NATS extensions
builder.AddNatsExtension();
}
}
}
}
As I stated in the previous section, the IWebJobsStartup
interface has only one method, Configure
. The Configure method takes one parameter, an object of IWebJobsBuilder
implementation. The system will pass this parameter to our Configure
method.
You should have noticed the AddNatsExtension
function. This function is an extension function of IWebJobsBuilder
and is in the NatsWebJobsBuilderExtensions
class. The AddNatsExtension
is just a helper method. The NatsWebJobsBuilderExtensions
class looks like this.
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.DependencyInjection;
namespace WebJobs.Extension.Nats
{
/// <summary>
/// WebJobBuilder extension to add NATS extensions
/// </summary>
public static class NatsWebJobsBuilderExtensions
{
/// <summary>
/// Extension method to add our custom extensions
/// </summary>
/// <param name="builder"><c>IWebJobsBuilder</c> instance</param>
/// <returns><c>IWebJobsBuilder</c> instance</returns>
/// <exception>Throws ArgumentNullException if builder is null</exception>
public static IWebJobsBuilder AddNatsExtension(this IWebJobsBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}
builder.AddExtension<NatsExtensionConfigProvider>();
builder.Services.AddSingleton<INatsServiceFactory, NatsServiceFactory>();
return builder;
}
}
}
As you can see in this extension method, we are adding the extension using the AddExtension
method of the IWebJobsBuilder
. The AddExtesion
method takes one parameter, our IExtensionConfigProvier
instance. We are also adding a Singleton Service to the builder. The constructor of the IExtensionConfigProvider
instance will receive this server as a parameter.
We can now build the library. If everything goes well, you can see the DLL file in the BIN folder.
Create a sample to test the NATS Trigger
Now we need to create a sample function to test our trigger. Let' create a test Azure Function that uses our trigger. Our sample function looks like this:
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using WebJobs.Extension.Nats;
namespace NatsTrigger.Sample
{
public static class NatsTriggerSample
{
[FunctionName("NatsTriggerSample")]
public static void Run(
[NatsTrigger(Connection = "NatsConnection", Channel = "SampleChannel")] string message,
ILogger log)
{
log.LogInformation($"Message Received From SampleChannel {message}");
}
}
}
This function is straightforward, just log the message we are getting. The Connection
string is from the local.settings.json
file, and the Channel
is hard-coded.
Before running our function, we need to run the NATS server. Run the server with the following command
docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats
Once it is running, let's start our function. I am using the following node.js
application to send a message to a NATS channel.
#!/usr/bin/env node
/* jslint node: true */
'use strict';
var nats = require('nats').connect("nats://<username>:<password>@localhost:4222");
var args = process.argv.slice(2)
nats.on('error', function(e) {
console.log('Error [' + nats.options.url + ']: ' + e);
process.exit();
});
var subject = args[0];
var msg = args[1];
if (!subject) {
console.log('Usage: node-pub <subject> [msg]');
process.exit();
}
nats.publish(subject, msg, function() {
console.log('Published [' + subject + '] : "' + msg + '"');
process.exit();
});
Publish a message to the Channel using the following command:
node.js publish.js SampleChannel "Aure Function and Nats are awesome."
If everything goes well, you can see the debug log from the function.
In the next part, we will look into how to create NATS bindings, till then Happy Coding!.
Top comments (0)