DEV Community

SameX
SameX

Posted on

鸿蒙案例实践:高并发数据采集系统的设计与实现

本文旨在深入探讨华为鸿蒙HarmonyOS Next系统(截止目前API12)的技术细节,基于实际开发实践进行总结。

主要作为技术分享与交流载体,难免错漏,欢迎各位同仁提出宝贵意见和问题,以便共同进步。

本文为原创内容,任何形式的转载必须注明出处及原作者。

1. 系统架构与需求分析

背景

在物联网和大数据的时代,许多应用场景需要从多个传感器中并发采集数据,这类系统通常面对 I/O 密集型任务。高效的数据采集系统不仅要保证高并发处理的性能,还需要处理数据的一致性与安全性,确保在多线程环境中不会发生数据竞争问题。

需求

  • 系统需要同时从多个传感器并发采集数据。
  • 使用多线程模型来处理 I/O 密集型任务,以提高系统的响应速度。
  • 确保数据采集的安全性和一致性,避免数据竞争。
  • 使用异常处理和重试机制来应对数据传输失败的情况。
  • 系统扩展性良好,能够支持更多传感器的添加和管理。

功能需求

  • 并发采集传感器数据。
  • 数据的实时传输与状态反馈。
  • 确保数据一致性与系统稳定性。

2. TaskPool 并发任务的管理与执行

TaskPool 概述

在 ArkTS 中,TaskPool 提供了一种高效的多线程并发任务调度机制。通过 TaskPool,开发者可以创建并发任务并让这些任务在后台线程执行。TaskPool 的优势在于,它能够让开发者专注于任务的执行逻辑,而无需关心线程的管理与生命周期。

任务管理与调度

在多传感器数据采集中,传感器的数据采集任务是 I/O 密集型的。因此,我们可以通过 TaskPool 将每个传感器的数据采集任务放入不同的线程中执行,提升任务并发处理能力。

TaskPool 数据采集任务示例:

import { taskpool } from '@kit.ArkTS';

// 模拟从传感器采集数据的并发任务
@Concurrent
async function collectSensorData(sensorId: string): Promise<string> {
  // 模拟 I/O 操作:从传感器采集数据
  console.log(`采集传感器 ${sensorId} 的数据中...`);
  await delay(1000);  // 模拟延迟
  return `传感器 ${sensorId} 数据`;
}

// 模拟延迟函数
function delay(ms: number) {
  return new Promise(resolve => setTimeout(resolve, ms));
}
Enter fullscreen mode Exit fullscreen mode

3. Sendable 数据传输机制在并发中的应用

Sendable 概述

在 ArkTS 中,Sendable 数据是能够在并发实例间进行传输的安全数据类型。当我们在多线程环境下传输数据时,可以使用 Sendable 来保证数据在不同线程之间传输时的安全性和一致性。Sendable 数据可以通过引用传递和拷贝传递两种方式进行传输。

数据传输的设计

对于每个传感器采集到的数据,我们可以使用 Sendable 数据结构,将其安全地传输到主线程,避免在数据传递过程中发生并发冲突。

Sendable 数据传输示例:

import { taskpool } from '@kit.ArkTS';

// 定义 Sendable 数据类
@Sendable
class SensorData {
  constructor(public sensorId: string, public data: string) {}
}

// 采集传感器数据并将数据传递给主线程
@Concurrent
async function collectSensorData(sensorId: string): Promise<SensorData> {
  // 模拟 I/O 操作:采集数据
  const data = await delayAndCollect(sensorId);
  return new SensorData(sensorId, data);
}

// 模拟 I/O 任务中的数据采集
async function delayAndCollect(sensorId: string): Promise<string> {
  await delay(1000);  // 模拟 I/O 延迟
  return `传感器 ${sensorId} 数据`;
}
Enter fullscreen mode Exit fullscreen mode

4. I/O 密集型任务优化

I/O 密集型任务的特点

I/O 密集型任务主要包括文件读写、网络请求、数据库访问等操作。它们的特点是计算量较小,但 I/O 操作需要消耗大量时间。为了提升性能,我们可以使用异步任务以及异步锁来管理资源,避免线程之间的资源竞争。

异步锁的应用

异步锁(AsyncLock)可以用于保护并发任务中的共享资源,防止不同线程同时访问或修改同一数据导致的数据竞争问题。在数据采集中,我们可以使用异步锁确保在写入数据时不会发生冲突。

异步锁使用示例:

import { ArkTSUtils } from '@kit.ArkTS';

// 定义异步锁
const lock = new ArkTSUtils.locks.AsyncLock();

@Concurrent
async function writeDataWithLock(sensorData: SensorData): Promise<void> {
  await lock.lockAsync(() => {
    // 模拟写入操作
    console.log(`写入 ${sensorData.sensorId} 的数据:${sensorData.data}`);
  });
}
Enter fullscreen mode Exit fullscreen mode

5. 异常处理与重试机制

在并发任务中,传感器数据采集可能会因为网络、硬件等原因导致失败。因此,我们需要设计异常处理和重试机制,确保在数据采集失败时进行重试或提供其他解决方案。

异常处理与重试示例:

@Concurrent
async function collectSensorDataWithRetry(sensorId: string, retries = 3): Promise<SensorData | null> {
  for (let attempt = 1; attempt <= retries; attempt++) {
    try {
      const data = await delayAndCollect(sensorId);
      return new SensorData(sensorId, data);
    } catch (error) {
      console.error(`采集 ${sensorId} 数据失败,尝试 ${attempt} 次...`);
      if (attempt === retries) {
        console.error(`最终采集失败:${sensorId}`);
        return null;  // 返回 null 表示采集失败
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

通过这种重试机制,我们可以确保在传感器采集失败时,系统会进行一定次数的重试,并在最终失败时提供适当的处理策略。

6. 综合代码实现:多传感器数据采集系统

我们通过 TaskPool 来并发执行多个传感器的数据采集任务,并使用 Sendable 传输数据,同时保证数据的一致性和系统的稳定性。以下是综合示例代码:

@Entry
@Component
struct DataCollector {
  @State sensorDataList: Array<string> = []

  build() {
    Column() {
      Button('开始采集数据')
        .onClick(() => {
          this.startDataCollection();
        })
      // 显示采集到的传感器数据
      ForEach(this.sensorDataList, (data) => {
        Text(data)
      })
    }
  }

  startDataCollection() {
    const sensors = ['sensor1', 'sensor2', 'sensor3'];
    sensors.forEach(sensorId => {
      // 启动并发任务采集数据
      let task: taskpool.Task = new taskpool.Task(collectSensorDataWithRetry, sensorId);
      taskpool.execute(task).then((result: SensorData | null) => {
        if (result) {
          this.sensorDataList.push(`采集到数据:${result.sensorId} - ${result.data}`);
        } else {
          this.sensorDataList.push(`采集 ${sensorId} 失败`);
        }
      }).catch(error => {
        console.error("任务执行失败: " + error);
      });
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

7. 总结

在这篇文章中,我们构建了一个高并发数据采集系统,展示了如何通过 ArkTS 中的 TaskPool 进行并发任务管理,使用 Sendable 数据传输保证数据的安全性,并结合异步锁机制来防止数据竞争问题。实现了异常处理与重试机制,确保在数据采集过程中能够应对各种故障。

通过这个案例,我们可以看到 ArkTS 的强大并发处理能力,以及如何在高并发场景下保证数据一致性和传输的安全性。这种设计为多传感器数据采集、物联网和大数据采集系统的实现提供了参考。

AWS Security LIVE!

Join us for AWS Security LIVE!

Discover the future of cloud security. Tune in live for trends, tips, and solutions from AWS and AWS Partners.

Learn More

Top comments (0)

AWS GenAI LIVE image

Real challenges. Real solutions. Real talk.

From technical discussions to philosophical debates, AWS and AWS Partners examine the impact and evolution of gen AI.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay