DEV Community

Kohei Kawata
Kohei Kawata

Posted on

Azure IoT Edge Integration Test template - Part.2

Summary

This article is following Part.1 and sharing important tips for IoT Edge modules in azure-iot-edge-integration-test-template.

TOC

Overview

This sample template has six IoT Edge modules and two integration test data flows. One is Azure Pipelines agent sending direct method and receiving a weather report and weather report files uploaded to Azure Blob Storage. Another integration test flow is the direct method request asks to download the archive weather files.

Image description

IothubConnector

def module_termination_handler(signal, frame):
    print ("IoTHubClient sample stopped")
    stop_event.set()

stop_event = threading.Event()
signal.signal(signal.SIGTERM, module_termination_handler)
try:
    stop_event.wait()
except Exception as e:
    print("Unexpected error %s " % e)
    raise
finally:
    print("Shutting down client")
    module_client.shutdown()
Enter fullscreen mode Exit fullscreen mode
  • Direct method callback from IoT Hub is implemented with method_request_handler(). On receiving a direct method request, it sends a request message to WeatherReporter through Azure Iot Edge route communication.
async def method_request_handler(method_request):
    print('Direct Method: ', method_request.name, method_request.payload)
    if method_request.name in request_method_list:
        response_payload, response_status = await request_method_list[method_request.name](method_request.payload, module_client)
    else:
        response_payload = {"Response": "Direct method {} not defined".format(method_request.name)}
        response_status = 404

    method_response = MethodResponse.create_from_method_request(method_request, response_status, response_payload)
    await module_client.send_method_response(method_response)

module_client.on_method_request_received = method_request_handler
Enter fullscreen mode Exit fullscreen mode
async def request_weather_report(payload: dict, module_client: IoTHubModuleClient):
    try:
        json_string = json.dumps(payload)
        message = Message(json_string)
        await module_client.send_message_to_output(message, 'reportRequest')
        return {"Response": "Send weather report request for {}".format(payload['city'])}, 200
    except Exception as e:
        print(e)
        return {"Response": "Invalid parameter"}, 400
Enter fullscreen mode Exit fullscreen mode
  • Azure IoT Edge route communication callback is implemented with receive_message_handler(). It receives messages from WeatherReport module and sends requests to FileGenerator module through ROS2 topic communication.
async def receive_message_handler(message_received):
    if message_received.input_name == 'reportResponse':
        message_telemetry = Message(
            data=message_received.data.decode('utf-8'),
            content_encoding='utf-8',
            content_type='application/json',
        )
        await module_client.send_message_to_output(message_telemetry, 'telemetry')
        print("Weather report sent to IoT Hub")

        ros2_publisher_client.ros2_publisher(message_received.data.decode('utf-8'))
        print("ROS2 message sent to FileGenerator")

    elif  message_received.input_name == 'updateResponse':
        message_telemetry = Message(
            data=message_received.data.decode('utf-8'),
            content_encoding='utf-8',
            content_type='application/json',
        )
        await module_client.send_message_to_output(message_telemetry, 'telemetry')
        print("Download status sent to IoT Hub")
    else:
        print("Message received on unknown input: {}".format(message_received.input_name))

module_client.on_message_received = receive_message_handler
Enter fullscreen mode Exit fullscreen mode

WeatherObserver

  • WeatherObserver is written with .NET6 console app in C#. This is simple and independent from Azure IoT Edge or ROS2 and you can run it locally for debugging purpose.

  • The callback that processes requests from IothubConnector is implemented with ioTHubModuleClient.SetInputMessageHandlerAsync().

await ioTHubModuleClient.SetInputMessageHandlerAsync(routeC2W, ParseRequests, ioTHubModuleClient).ConfigureAwait(false);
Enter fullscreen mode Exit fullscreen mode

FileGenerator

  • FileGenerator is a module written in Python that receives a request from IothubConnector and generates weather report files in the edge host machine.

  • This module uses rclpy.spin() to keep the application itself running and waiting for the requests.

rclpy.init(args=args)
file_generator = FileGenerator()
rclpy.spin(file_generator)
file_generator.destroy_node()
rclpy.shutdown()
Enter fullscreen mode Exit fullscreen mode
  • The callback for ROS2 requests is implemented by create_subscription() of ROS2 topic subscription.
def __init__(self):
    super().__init__('file_generator')
    self.subscription = self.create_subscription(
        String,
        os.getenv('ROS_TOPIC_NAME'),
        self.listener_callback,
        10)
    self.subscription

def listener_callback(self, msg):
Enter fullscreen mode Exit fullscreen mode

FileUploader

  • FileUploader is a module written in .NET6 C# that extracts files generated by FileGenerator and sends them to LocalBlobStorage.

FileUpdater

  • FileUpdater modules is writtein in .NET6 C# and downloads archive files from Azure Blob Storage.

  • It uses SAS token that allows the module only to access a specific directory so this system can work for multi-tenant and multi edge device environments. This SAS token is generated by ManifestGenerator which is explained in Part.3.

LocalBlobStorage

  • This sample template uses LocalBlobStorage that is built by Microsoft and used from mcr.microsoft.com/azure-blob-storage:latest. The reason for using the built-in local blob is that Azure IoT Edge runtime and default modules including EdgeHub and EdgeAgent do not support file uploading features.

Top comments (0)