Summary
This article is following Part.1 and sharing important tips for IoT Edge modules in azure-iot-edge-integration-test-template.
TOC
Overview
This sample template has six IoT Edge modules and two integration test data flows. One is Azure Pipelines agent sending direct method and receiving a weather report and weather report files uploaded to Azure Blob Storage. Another integration test flow is the direct method request asks to download the archive weather files.
IothubConnector
This module depends both on IoTHubModuleClient and rclypy(ROS Client Library for Python). You cannot run this module locally without ROS2 environment setup or iotedgehubdev.
There some options to keep the application running and waiting for callback. IothubConnector uses the edge signal handler using threading and signal. On the other hand, FileGenerator uses
rclpy.spin()
.
def module_termination_handler(signal, frame):
print ("IoTHubClient sample stopped")
stop_event.set()
stop_event = threading.Event()
signal.signal(signal.SIGTERM, module_termination_handler)
try:
stop_event.wait()
except Exception as e:
print("Unexpected error %s " % e)
raise
finally:
print("Shutting down client")
module_client.shutdown()
- Direct method callback from IoT Hub is implemented with
method_request_handler()
. On receiving a direct method request, it sends a request message toWeatherReporter
through Azure Iot Edge route communication.
async def method_request_handler(method_request):
print('Direct Method: ', method_request.name, method_request.payload)
if method_request.name in request_method_list:
response_payload, response_status = await request_method_list[method_request.name](method_request.payload, module_client)
else:
response_payload = {"Response": "Direct method {} not defined".format(method_request.name)}
response_status = 404
method_response = MethodResponse.create_from_method_request(method_request, response_status, response_payload)
await module_client.send_method_response(method_response)
module_client.on_method_request_received = method_request_handler
async def request_weather_report(payload: dict, module_client: IoTHubModuleClient):
try:
json_string = json.dumps(payload)
message = Message(json_string)
await module_client.send_message_to_output(message, 'reportRequest')
return {"Response": "Send weather report request for {}".format(payload['city'])}, 200
except Exception as e:
print(e)
return {"Response": "Invalid parameter"}, 400
- Azure IoT Edge route communication callback is implemented with
receive_message_handler()
. It receives messages fromWeatherReport
module and sends requests toFileGenerator
module through ROS2 topic communication.
async def receive_message_handler(message_received):
if message_received.input_name == 'reportResponse':
message_telemetry = Message(
data=message_received.data.decode('utf-8'),
content_encoding='utf-8',
content_type='application/json',
)
await module_client.send_message_to_output(message_telemetry, 'telemetry')
print("Weather report sent to IoT Hub")
ros2_publisher_client.ros2_publisher(message_received.data.decode('utf-8'))
print("ROS2 message sent to FileGenerator")
elif message_received.input_name == 'updateResponse':
message_telemetry = Message(
data=message_received.data.decode('utf-8'),
content_encoding='utf-8',
content_type='application/json',
)
await module_client.send_message_to_output(message_telemetry, 'telemetry')
print("Download status sent to IoT Hub")
else:
print("Message received on unknown input: {}".format(message_received.input_name))
module_client.on_message_received = receive_message_handler
WeatherObserver
WeatherObserver is written with .NET6 console app in C#. This is simple and independent from Azure IoT Edge or ROS2 and you can run it locally for debugging purpose.
The callback that processes requests from
IothubConnector
is implemented withioTHubModuleClient.SetInputMessageHandlerAsync()
.
await ioTHubModuleClient.SetInputMessageHandlerAsync(routeC2W, ParseRequests, ioTHubModuleClient).ConfigureAwait(false);
FileGenerator
FileGenerator is a module written in Python that receives a request from
IothubConnector
and generates weather report files in the edge host machine.This module uses
rclpy.spin()
to keep the application itself running and waiting for the requests.
rclpy.init(args=args)
file_generator = FileGenerator()
rclpy.spin(file_generator)
file_generator.destroy_node()
rclpy.shutdown()
- The callback for ROS2 requests is implemented by
create_subscription()
of ROS2 topic subscription.
def __init__(self):
super().__init__('file_generator')
self.subscription = self.create_subscription(
String,
os.getenv('ROS_TOPIC_NAME'),
self.listener_callback,
10)
self.subscription
def listener_callback(self, msg):
FileUploader
-
FileUploader is a module written in .NET6 C# that extracts files generated by
FileGenerator
and sends them toLocalBlobStorage
.
FileUpdater
FileUpdater modules is writtein in .NET6 C# and downloads archive files from Azure Blob Storage.
It uses SAS token that allows the module only to access a specific directory so this system can work for multi-tenant and multi edge device environments. This SAS token is generated by ManifestGenerator which is explained in Part.3.
LocalBlobStorage
- This sample template uses LocalBlobStorage that is built by Microsoft and used from
mcr.microsoft.com/azure-blob-storage:latest
. The reason for using the built-in local blob is that Azure IoT Edge runtime and default modules includingEdgeHub
andEdgeAgent
do not support file uploading features.
Top comments (0)