OpenHarmony 并发 taskpool 和 worker

2024-11-24 16:07:41
2次阅读
0个评论

OpenHarmony 并发 taskpool 和 worker

总览

image-20241110140848578

介绍

并发,指的是同一时间内,多段代码同时执行。在ArkTs编程中,并发分为异步并发和多线程并发。

异步并发

异步并发并不是真正的并发,比如在单核设备中,同时执行多端代码其实是通过CPU快速调度来实现的。比如一个司机,它在同一时间只

能开一辆车。做不到同时开两辆车。如果举一个极端的例子。这位司机可以看起来像在开两辆车。

我们把开车分成两个步骤:

  1. 上车
  2. 开动

如果我们把司机上车的时间极限缩小,比如为0.00001秒中,那么这个司机就可以做到同时开两辆车

开小米su7

  1. 上车(0.00001秒)
  2. 开车

马上下车,跑到另外一辆车旁边,然后

开问界M7

  1. 上车(0.00001秒)
  2. 开车

可以看到,只要切换任务的速度够快,就能理解成同时在开两辆车。这个就是我们的忙碌的CPU要做的事情了。

我们可以将 setTimeout 和 Promise/Async 都看成是 异步并发的技术就行。

多线程并发

多线程并发的模型常见的分成基于内存共享的并发模型和基于消息通信的并发模型。

Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,

因此得到了广泛的支持和使用。

当前ArkTS提供了TaskPoolWorker两种并发能力,TaskPoolWorker都基于Actor并发模型实现。 TaskPoolWorker

TaskPool 其实是基于 Worker 做到的一层封装。

TaskPool和Worker的实现特点对比

主要是使用方法上的区别

  1. TaskPool可以直接传递参数、Worker需要自行封装参数
  2. TaskPool可以直接接收返回数据,Worker通过 onmessage 接收
  3. TaskPool任务池个数上限自动管理,Worker最多64个线程,具体看内存
  4. TaskPool任务执行时长 同步代码上限3分钟,异步代码无限。Worker无限
  5. TaskPool可以设置设置任务的优先级,Workder不支持
  6. TaskPool支持取消任务,Worker不支持

image-20241110105619585

TaskPool和Worker的适用场景对比

TaskPool和Worker均支持多线程并发能力。由于TaskPool的工作线程会绑定系统的调度优先级,并且支持负载均衡(自动扩缩容),而

Worker需要开发者自行创建,存在创建耗时以及不支持设置调度优先级,故在性能方面使用TaskPool会优于Worker,因此大多数场景推

荐使用TaskPool

TaskPool偏向独立任务维度,该任务在线程中执行,无需关注线程的生命周期,超长任务(大于3分钟且非长时任务)会被系统自动回

收;而Worker偏向线程的维度,支持长时间占据线程执行,需要主动管理线程生命周期。

常见的一些开发场景及适用具体说明如下:

  • 运行时间超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时)的任务。例如后台

    进行1小时的预测算法训练等CPU密集型任务,需要使用Worker。

  • 有关联的一系列同步任务。例如在一些需要创建、使用句柄的场景中,句柄创建每次都是不同的,该句柄需永久保存,保证使用该句

    柄进行操作,需要使用Worker。

  • 需要设置优先级的任务。例如图库直方图绘制场景,后台计算的直方图数据会用于前台界面的显示,影响用户体验,需要高优先级处理,需要使用TaskPool

  • 需要频繁取消的任务。例如图库大图浏览场景,为提升体验,会同时缓存当前图片左右侧各2张图片,往一侧滑动跳到下一张图片

    时,要取消另一侧的一个缓存任务,需要使用TaskPool。

  • 大量或者调度点较分散的任务。例如大型应用的多个模块包含多个耗时任务,不方便使用Worker去做负载管理,推荐采用

    TaskPool。

线程间通信对象

线程间通信指的是并发多线程间存在的数据交换行为。由于ArkTS语言兼容TS/JS,其运行时的实现与其它所有的JS引擎一样,都是基于

Actor内存隔离的并发模型提供并发能力。

普通对象

普通对象跨线程时通过拷贝形式传递

ArrayBuffer对象

ArrayBuffer内部包含一块Native内存。其JS对象壳与普通对象一样,需要经过序列化与反序列化拷贝传递,但是Native内存有两种传输方

式:拷贝和转移。

SharedArrayBuffer对象

SharedArrayBuffer内部包含一块Native内存,支持跨并发实例间共享,但是访问及修改需要采用Atomics类,防止数据竞争

Transferable

Transferable对象(也称为NativeBinding对象)指的是一个JS对象,绑定了一个C++对象,且主体功能由C++提供

可以实现共享和转移模式

Sendable对象

在传统JS引擎上,对象的并发通信开销的优化方式只有一种,就是把实现下沉到Native侧,通过Transferable对象的转移或共享方式降低并发通信开销。而开发者仍然还有大量对象并发通信的诉求,这个问题在业界的JS引擎实现上并没有得到解决。

ArkTS提供了Sendable对象类型,在并发通信时支持通过引用传递来解决上述问题。

Sendable对象为可共享的,其跨线程前后指向同一个JS对象,如果其包含了JS或者Native内容,均可以直接共享,如果底层是Native实现

的,则需要考虑线程安全性。

多线程并发场景

针对常见的业务场景,主要可以对应分为三种并发任务:耗时任务长时任务常驻任务

耗时任务并发场景简介

耗时任务指的是需要长时间执行的任务,如果在主线程执行可能导致应用卡顿掉帧、响应慢等问题。典型的耗时任务有CPU密集型任务、I/O密集型任务以及同步任务。

对应耗时任务,常见的业务场景分类如下所示:

常见业务场景 具体业务描述 CPU密集型 I/O密集型 同步任务
图片/视频编解码 将图片或视频进行编解码再展示。 ×
压缩/解压缩 对本地压缩包进行解压操作或者对本地文件进行压缩操作。 ×
JSON解析 对JSON字符串的序列化和反序列化操作。 × ×
模型运算 对数据进行模型运算分析等。 × ×
网络下载 密集网络请求下载资源、图片、文件等。 × ×
数据库操作 将聊天记录、页面布局信息、音乐列表信息等保存到数据库,或者应用二次启动时,读取数据库展示相关信息。 × ×

长时任务并发场景简介

在应用业务实现过程中,对于需要较长时间不定时运行的任务,称为长时任务。长时任务如果放在主线程中执行会阻塞主线程的UI业务,出现卡顿丢帧等影响用户体验的问题。因此通常需要将这个独立的长时任务放到单独的子线程中执行。

典型的长时任务场景如下所示:

常见业务场景 具体业务描述
定期采集传感器数据 周期性采集一些传感器信息(例如位置信息、速度传感器等),应用运行阶段长时间不间断运行。
监听Socket端口信息 长时间监听Socket数据,不定时需要响应处理。

上述业务场景均为独立的长时任务,任务执行周期长,跟外部交互简单,分发到后台线程后,需要不定期响应,以获取结果。这些类型的

任务使用TaskPool可以简化开发工作量,避免管理复杂的生命周期,避免线程泛滥,开发者只需要将上述独立的长时任务放入TaskPool

队列,再等待结果即可。

常驻任务并发场景

在应用业务实现过程中,对于一些长耗时(大于3min)且并发量不大的常驻任务场景,使用Worker在后台线程中运行这些耗时逻辑,避免阻塞主线程而导致出现丢帧卡顿等影响用户体验性的问题 。

常驻任务是指相比于短时任务,时间更长的任务,可能跟主线程生命周期一致。相比于长时任务,常驻任务更倾向于跟线程绑定的任务,单次运行时间更长(比如超过3分钟)。

对应常驻任务,较为常见的业务场景如下:

常见业务场景 具体业务描述
游戏中台场景 启动子线程作为游戏业务的主逻辑线程,UI线程只负责渲染。
长耗时任务场景 后台长时间的模型预测任务、或者硬件测试等

TaskPool使用

TaskPool基本使用 1

TaskPool最简单的用法如下

其中需要注意的是执行任务的函数必须使用 @Concurrent 来修饰

 taskpool.execute(函数, 参数)

image-20241110123246585

import { taskpool } from '@kit.ArkTS'

// 任务具体逻辑
@Concurrent
function func1(n: number) {
  return 1 + n
}

@Entry
@Component
struct Index {
  @State
  num: number = 0
  fn1 = async () => {
    // 执行任务
    const res = await taskpool.execute(func1, this.num)
    this.num = Number(res)
  }

  build() {
    Column() {
      Button("TaskPool基本使用 " + this.num)
        .onClick(this.fn1)
    }
    .width("100%")
    .height("100%")
    .justifyContent(FlexAlign.Center)
  }
}

TaskPool基本使用 2

TaskPool的基本使用还可以指定任务的优先级 如:

  taskpool.execute(task, 优先级)

image-20241110124508588

import { taskpool } from '@kit.ArkTS'

// 任务具体逻辑
@Concurrent
function func1(n: number) {
  console.log(`任务 ${n}`)
}

@Entry
@Component
struct Index {
  @State
  num: number = 0
  fn1 = async () => {
    // 同时创建和执行10个任务  指定第10个任务是最高优先级
    for (let index = 1; index <= 10; index++) {
      // 创建任务,传入要执行的函数和该函数的形参
      const task = new taskpool.Task(func1, index)
      if (index !== 10) {
        // 执行任务,并且制定优先级 taskpool.Priority.LOW
        taskpool.execute(task, taskpool.Priority.LOW)
      } else {
        taskpool.execute(task, taskpool.Priority.HIGH)
      }
    }
  }

  build() {
    Column() {
      Button("TaskPool基本使用 " + this.num)
        .onClick(this.fn1)
    }
    .width("100%")
    .height("100%")
    .justifyContent(FlexAlign.Center)
  }
}

TaskPool基本使用 3

TaskPool 还可以支持传入一些任务组,类似Promise.All, 可以等待全部的耗时任务结束。

import { taskpool } from '@kit.ArkTS'

// 任务具体逻辑
@Concurrent
function func1(n: number) {
  const promise = new Promise<number>(resolve => {
    resolve(n)
    setTimeout(() => {
    }, 1000 + n * 100)
  })
  return promise
}

@Entry
@Component
struct Index {
  fn1 = async () => {
    // 创建任务组
    const taskGroup = new taskpool.TaskGroup()
    for (let index = 1; index <= 10; index++) {
      // 创建任务,传入要执行的函数和该函数的形参
      const task = new taskpool.Task(func1, index)
      // 添加到任务组中
      taskGroup.addTask(task)
    }
    // 执行任务组
    const res = await taskpool.execute(taskGroup)
    console.log("res", JSON.stringify(res))


  }

  build() {
    Column() {
      Button("TaskPool基本使用 执行一组任务 ")
        .onClick(this.fn1)
    }
    .width("100%")
    .height("100%")
    .justifyContent(FlexAlign.Center)
  }
}

TaskPool和主线程通信

TaskPool可以直接通过返回结果,将数据传递回来。但是如果,是持续监听的任务,那么此时可以使用 sendDataonReceiveData 和主线程进行通信

image-20241110131624375

import { taskpool } from '@kit.ArkTS'

// 任务具体逻辑
@Concurrent
function func1(n: number) {
  // 假设这里是持续发送的数据
  taskpool.Task.sendData((new Date).toLocaleString())
  return 100
}

@Entry
@Component
struct Index {
  fn1 = async () => {
    const task = new taskpool.Task(func1, 100)

    //   主线程监听到的数据
    task.onReceiveData((result: string) => {
      console.log("主线程监听到的数据", result)
    })

    //   直接得到func1的结果
    const res = await taskpool.execute(task)
    console.log("直接返回的结果", res)
  }

  build() {
    Column() {
      Button("TaskPool基本使用 和主线程通信 ")
        .onClick(this.fn1)
    }
    .width("100%")
    .height("100%")
    .justifyContent(FlexAlign.Center)
  }
}

TaskPool使用限制

TaskPool使用过程中,不能直接使用 TaskPool 所在同一文件中的变量、函数或者类,否则会报错误

image-20241110132305457

在使用 @Concurrent 修饰的函数中,只能使用导入的变量和局部变量。<ArkTSCheck>

Only imported variables and local variables can be used in @Concurrent decorated functions. <ArkTSCheck>

解决方案是 导出使用

image-20241110132421889

Worker使用

Worker主要作用是为应用程序提供一个多线程的运行环境,可满足应用程序在执行过程中与主线程分离,在后台线程中运行一个脚本进

行耗时操作,极大避免类似于计算密集型或高延迟的任务阻塞主线程的运行

workder主要通过

  1. onmessage 监听数据
  2. postMessage 发送数据

image-20241110135833698

1. 新建workder文件

在你的模块上鼠标右键新建worker模块即可

image-20241110135437606

entry/src/main/ets/workers/Worker.ets

import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';

const workerPort: ThreadWorkerGlobalScope = worker.workerPort;


workerPort.onmessage = (e: MessageEvents) => {
  // 接收数据
  const data = e.data as Record<string, number>
  // 发送给主线程
  console.log("子线程监听到了信息,并且发送信息给主线程")
  setTimeout(() => {
    workerPort.postMessage(data.a + data.b)
  }, 1000)
}


workerPort.onmessageerror = (e: MessageEvents) => {
}


workerPort.onerror = (e: ErrorEvent) => {
}

2. 主线程使用woker进行通信

import { MessageEvents, worker } from '@kit.ArkTS';

// 1 声明workder
let workerStage1: worker.ThreadWorker | null

@Entry
@Component
struct Index {
  build() {
    Column() {
      Button("创建")
        .onClick(() => {
          // 2. 创建worker
          workerStage1 = new worker.ThreadWorker('entry/ets/workers/Worker.ets');
        })
      Button("监听数据")
        .onClick(() => {
          // 3 监听workder接收到的信息
          workerStage1!.onmessage = (e: MessageEvents) => {
            console.log("总和", e.data)
          }
        }
        )
      Button("发送数据")
        .onClick(() => {
          // 4 向 workder发送信息
          workerStage1!.postMessage({ a: 10, b: 20 })
        })

    }
    .width("100%")
    .height("100%")
    .justifyContent(FlexAlign.Center)
  }
}

总结

image-20241110140840938

作者

作者:万少

链接:https://www.nutpi.net/

來源:坚果派 著作权归作者所有。

商业转载请联系作者获得授权,非商业转载请注明出处。

收藏00

登录 后评论。没有帐号? 注册 一个。