DEV Community

SameX
SameX

Posted on

多进程协同的实时数据采集与共享系统

本文旨在深入探讨 HarmonyOS IPC Kit 中的进程间通信(IPC)机制,基于实际开发实践实现多进程数据采集与共享系统的设计与开发。本文主要面向开发者,分享数据采集系统架构设计和代码实现,并通过案例讲解如何在 HarmonyOS 中高效处理多进程数据通信。

1. 案例背景与需求分析

在物联网和大数据时代,实时数据采集成为许多应用的核心功能,例如智能家居的多传感器采集、工业设备状态监控等场景。在这些系统中,采集进程通常运行在不同进程空间内,采集的传感器数据需要被整合到一个进程中实时处理,因此需要一种高效的数据共享和通信机制。

目标需求

  • 多进程数据采集:不同采集进程收集来自多个传感器的数据。
  • 实时数据共享:数据采集进程将采集数据传递给数据整合进程进行实时更新。
  • 高效的数据传输:在进程间传输较大的数据量时不影响系统性能。
  • 资源管理:能够在采集进程意外终止时,清理资源并触发数据恢复操作。

涉及技术

  • IPC Kit:通过Client-Server模式实现多进程数据通信。
  • 共享内存:在多个进程间传输较大数据量时,使用共享内存避免数据复制开销。
  • 异步调用与多线程管理:异步调用使进程不会因数据传输而阻塞。
  • 进程消亡通知:远端消亡通知机制用于监控进程状态,确保资源安全回收。

2. 系统架构设计

在本系统中,采集数据的进程(采集进程)作为 IPC 客户端,而负责数据整合的进程(整合进程)作为 IPC 服务端。通过共享内存和 IPC Kit,数据可以在进程间快速传输。架构分为以下模块:

  1. 采集进程模块:每个采集进程配置独立的IPC通信接口,将采集到的数据传递至整合进程。
  2. 整合进程模块:整合所有采集进程的数据,通过共享内存进行数据管理与实时处理。
  3. 远端对象消亡通知:通过 registerDeathRecipient 监控进程状态,处理异常退出的采集进程。

3. 数据采集进程的多线程异步通信

数据采集进程负责从传感器收集数据,并通过异步方式将数据传输给整合进程。每个采集进程作为IPC客户端,需要实现如下通信流程:

  1. 配置 IPC 客户端接口:通过 IPC Kit 创建进程代理(Proxy),发送数据至整合进程。
  2. 异步调用避免阻塞:每个采集进程独立运行数据采集和传输的异步线程,以提高效率。
  3. 数据封装与发送:将数据封装到 IPCParcel 中,异步发送至整合进程。

示例代码

以下为采集进程中的异步数据发送代码示例,采集的数据通过 SendRequest 异步发送到整合进程。

#include <IPCKit/ipc_kit.h>
#include <thread>
#include <mutex>
#include <condition_variable>

class DataCollector {
public:
    DataCollector(OHIPCRemoteProxy* proxy) : proxy_(proxy) {}

    void CollectAndSendData(int sensorData) {
        std::thread([this, sensorData]() {
            OHIPCParcel *data = OH_IPCParcel_Create();
            if (data != nullptr) {
                OH_IPCParcel_WriteInt32(data, sensorData);  // 写入传感器数据
                SendDataAsync(data);
                OH_IPCParcel_Destroy(data);
            }
        }).detach();
    }

private:
    void SendDataAsync(OHIPCParcel* data) {
        OH_IPC_MessageOption option = { OH_IPC_REQUEST_MODE_ASYNC, 0 };
        OHIPCParcel *reply = OH_IPCParcel_Create();
        int result = OH_IPCRemoteProxy_SendRequest(proxy_, 1, data, reply, &option);

        if (result == OH_IPC_SUCCESS) {
            printf("Data sent successfully!\n");
        } else {
            printf("Failed to send data!\n");
        }

        OH_IPCParcel_Destroy(reply);
    }

    OHIPCRemoteProxy* proxy_;
};
Enter fullscreen mode Exit fullscreen mode

4. 数据整合进程的数据管理与同步

整合进程作为服务端,需要接收来自多个采集进程的数据,并进行统一的存储和管理。为提高数据处理效率,整合进程使用共享内存(匿名共享内存)来存储并同步各采集进程传来的数据。同时,为了监控采集进程的状态,整合进程实现了远端对象消亡通知。

数据整合流程

  1. 接收采集进程数据:整合进程通过 IPC Kit 接收采集进程的数据请求。
  2. 共享内存存储与数据同步:使用共享内存实现数据共享,减少内存复制开销。
  3. 注册消亡通知回调:设置 registerDeathRecipient,在采集进程意外退出时,清理相应资源并记录日志。

示例代码

以下代码展示整合进程如何接收数据并存储至共享内存,同时监控采集进程的消亡状态:

#include <IPCKit/ipc_kit.h>
#include <AbilityKit/native_child_process.h>
#include <hilog/log.h>
#include <unordered_map>
#include <mutex>

class DataAggregator {
public:
    DataAggregator() {
        remoteStub_ = OH_IPCRemoteStub_Create("DATA_AGGREGATOR", OnRequestReceived, nullptr, this);
        OH_IPCRemoteStub_AddDeathRecipient(remoteStub_, &deathRecipient_);
    }

    static int OnRequestReceived(uint32_t code, const OHIPCParcel* data, OHIPCParcel* reply, void* userData) {
        int sensorData = 0;
        if (OH_IPCParcel_ReadInt32(data, &sensorData) == OH_IPC_SUCCESS) {
            auto* aggregator = static_cast<DataAggregator*>(userData);
            aggregator->StoreData(sensorData);
        }
        return OH_IPC_SUCCESS;
    }

    void StoreData(int data) {
        std::lock_guard<std::mutex> lock(dataMutex_);
        // 假设 sharedDataBuffer_ 是共享内存映射的地址
        sharedDataBuffer_.emplace_back(data);  
    }

    void RegisterDeathRecipient() {
        deathRecipient_.onRemoteDied = [](void* userData) {
            auto* aggregator = static_cast<DataAggregator*>(userData);
            aggregator->HandleProcessDeath();
        };
        OH_IPCRemoteStub_RegisterDeathRecipient(remoteStub_, &deathRecipient_);
    }

    void HandleProcessDeath() {
        // 处理采集进程消亡的资源清理操作
        printf("采集进程消亡,清理资源!\n");
    }

private:
    OHIPCRemoteStub* remoteStub_;
    std::vector<int> sharedDataBuffer_;
    std::mutex dataMutex_;
    OHIPCDeathRecipient deathRecipient_;
};
Enter fullscreen mode Exit fullscreen mode

5. 进程消亡通知机制的实现

在数据采集系统中,进程消亡会导致数据更新的中断,因此整合进程需要及时感知采集进程的状态。当采集进程意外终止时,消亡通知机制将触发资源清理和报警操作,确保系统的稳定性。

通过在 DataAggregator 中注册 DeathRecipient 对象来实现对采集进程的监控。当检测到远端采集进程消亡,整合进程即可执行相应的清理逻辑并更新系统状态。


6. 总结

本文展示了如何基于HarmonyOS的IPC Kit开发多进程实时数据采集与共享系统,应用了IPC Kit中的Client-Server通信机制、共享内存与消亡通知机制。该系统能够高效采集和整合传感器数据,并及时管理进程消亡状态,确保数据采集系统的稳定性。

通过该案例,我们可以掌握进程间通信的异步调用、共享内存传输大数据及远端对象消亡通知的实现方式,为多进程实时数据共享系统的开发提供实用参考。

Image of Datadog

Master Mobile Monitoring for iOS Apps

Monitor your app’s health with real-time insights into crash-free rates, start times, and more. Optimize performance and prevent user churn by addressing critical issues like app hangs, and ANRs. Learn how to keep your iOS app running smoothly across all devices by downloading this eBook.

Get The eBook

Top comments (0)

Qodo Takeover

Introducing Qodo Gen 1.0: Transform Your Workflow with Agentic AI

Rather than just generating snippets, our agents understand your entire project context, can make decisions, use tools, and carry out tasks autonomously.

Read full post

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay