DEV Community

Jens
Jens

Posted on

对于 Embassy 框架的探索

1. 什么是 Embassy

Embassy 使用 Rust 语言的 async / .await 元语实现,本质上来说是一个 Rust异步运行时(Runtime),通过实现 Rust 提供的接口加上硬件抽象层(HAL) 支持对嵌入式设备的的协助式多任务支持。

根据 Dion 对 FreeRTOS 和 Embassy 的比较,Embassy 在中断时间,中断延迟,程序大小等方面表现相当不错。

Async Rust vs RTOS showdown! - Blog - Tweede golf

2. Rust 对 Zero-cost async/.await 的探索

因为 Embassy 基于 Rust 提供的异步抽象,我们先对 Rust 的异步抽象概念进行解释。内容参考了 OS-Blog 中 Async/Await 的理解

2.1 Future 对象

Future 表明了一个最终会完成计算的值,一个 Future 只定义了一个计算过程,但是不会开始计算,需要通过运行时不断通过 Future::poll 对它进行执行。

它的接口定义为

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Enter fullscreen mode Exit fullscreen mode

其中 poll 为其核心函数,我们将它理解为 “try to make some progress”。每一次对 Future::poll 的调用返回一个 Poll 作为结果,它被定义为

pub enum Poll<T> {
    Ready(T),
    Pending,
}
Enter fullscreen mode Exit fullscreen mode

当返回结果为 Poll::Ready 时,其元组的内容为计算值,表示这个 Future 已经完成,当返回内容为 Poll::Pending 时,表示仍为进行完成,需要再次调用 Future::poll。至于在什么时候调用就是像 Embassy 这样的运行时决定了。

我们需要注意到 Future::poll 的函数定义,其中第一个参数为 Pin<&mut Self>,第二个参数为 &mut Context 的执行上下文。我们下面对它们进行一一分析。

self: Pin<&mut Self> 为一个 self 对象,这里使用的 arbitrary self typeself 范化成一个 “可以通过一系列解引用得到 self 的对象“,例如

fn do_something(self: Rc<Self>);
fn do_something(self: Arc<Self>);
fn do_something(self: Box<Self>);
fn do_something(self: MutexGuard<Self>);
Enter fullscreen mode Exit fullscreen mode

2.2 移动/地址不敏感类型

对于更详细的解释,可以参照 Rust 官方文档

我们需要注意到 Pin<Ptr> 这个对象,它能够保证对象内部的内容不会被移动,因为在异步Rust中很多状态会对自己的某个 field 或自己进行自引用 (self-reference),因为编译器不能在编译时分辨出一个类型是否包含自引用,所以我们通过 Pin<Ptr> 来保证:它的指向对象 要么永远不会移动,要么实现 Unpin (i.e. 对移动不敏感)

我们接下来举例说明对移动敏感的类型:

对于一个类型,它有一个数组,并且有一个指向数组第一个元素的指针,定义如下

struct AddressSensitiveType {
    data: [u8; 1024],
    first_element: *const u8,
}
Enter fullscreen mode Exit fullscreen mode

如果我们在栈上初始化(局部变量)后希望将它通过 Box 移动到堆空间 (recall: 对于 Copy trait 只会进行 bitwise copy,其 first_element 仍指向栈的空间的无效的 data,而不是移动后的堆空间的 data。就如下图展示的:

移动前

Before moving

移动后

After moving

需要注意,Unpin 是一个自动接口 (auto trait),在编译器眼中,一个类型的任意一个 field 如果实现 !Unpin,整个类型为 !Unpin(不能对其进行 Unpin, i.e. 对移动敏感)我们可以通过一个无大小的标记 field 对这个类型进行标记,这个标记实现了 !Unpin

struct AddressSensitiveType {
    data: [u8; 1024],
    first_element: *const u8,
    _pin: PhantomPinned
}
Enter fullscreen mode Exit fullscreen mode

Pin<Ptr> 的实现十分简单,即对任何 Ptr: Deref 实现解引用,但仅对实现了 UnpinPtr:Target 进行可变解引用。

impl<Ptr: DerefMut<Ptr::Target: Unpin>> DerefMut for Pin<Ptr> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut *self.ptr
    }
}
Enter fullscreen mode Exit fullscreen mode

我们仍然可以通过 unsafe 方法得到对类型的可变引用,但是我们需要保证不会对地址敏感内容进行修改和对这个类型进行移动

2.3 Future 执行上下文

当前的执行上下文仅用于保存 Waker 对象,我们通过 Waker 对象唤醒运行时,以免它进行忙等消耗资源。Waker 通过执行器 Executor 创建,这样我们才知道唤醒那一个 Executor (可以有多个执行器并行执行,这需要线程抽象)。Waker 实现了 Clone, SendSync 确保它能够在不同线程间传递。

当我们返回 Poll::Pending 时需要保证存在一个 Waker 在这个 Future 准备好被 poll again 时唤醒执行器。否则它将不会再被执行。

需要注意,每次我们执行一次 Feature::poll 后都需要更新执行上下文/Waker,因为一个 Future 可以在多个 Executor 间移动,我们需要保证后面 Waker 唤醒的是最新的一个 Executor 而不是移动之前的 Executor(我们不进行细节上更深入的讨论)。

pub struct Context<'a> {
    waker: &'a Waker,
    // Ensure we future-proof against variance changes by forcing
    // the lifetime to be invariant (argument-position lifetimes
    // are contravariant while return-position lifetimes are
    // covariant).
    _marker: PhantomData<fn(&'a ()) -> &'a ()>,
    // Ensure `Context` is `!Send` and `!Sync` in order to allow
    // for future `!Send` and / or `!Sync` fields.
    _marker2: PhantomData<*mut ()>,
}
Enter fullscreen mode Exit fullscreen mode

其中 Waker 是对 RawWaker 的一个封装,RawWaker 内部实现机制为虚表(vtable),对不同的操作调用函数进行保存。对 Waker 的唤醒被代理为对 RawWaker保存函数的调用。

pub struct Waker {
    waker: RawWaker,
}

impl Waker {
    pub fn wake(self) {
        // The actual wakeup call is delegated through a virtual function call
        // to the implementation which is defined by the executor.
        let wake = self.waker.vtable.wake;
        let data = self.waker.data;

        // Don't call `drop` -- the waker will be consumed by `wake`.
        crate::mem::forget(self);

        // SAFETY: This is safe because `Waker::from_raw` is the only way
        // to initialize `wake` and `data` requiring the user to acknowledge
        // that the contract of `RawWaker` is upheld.
        unsafe { (wake)(data) };
    }

    pub fn wake_by_ref(&self) {
        // The actual wakeup call is delegated through a virtual function call
        // to the implementation which is defined by the executor.

        // SAFETY: see `wake`
        unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) }
    }

    // ...
}
Enter fullscreen mode Exit fullscreen mode

对于 RawWaker 的实现我们简单给出源码

pub struct RawWaker {
    /// A data pointer, which can be used to store arbitrary data as required
    /// by the executor. This could be e.g. a type-erased pointer to an `Arc`
    /// that is associated with the task.
    /// The value of this field gets passed to all functions that are part of
    /// the vtable as the first parameter.
    data: *const (),
    /// Virtual function pointer table that customizes the behavior of this waker.
    vtable: &'static RawWakerVTable,
}

pub struct RawWakerVTable {
    /// This function will be called when the [`RawWaker`] gets cloned, e.g. when
    /// the [`Waker`] in which the [`RawWaker`] is stored gets cloned.
    ///
    /// The implementation of this function must retain all resources that are
    /// required for this additional instance of a [`RawWaker`] and associated
    /// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup
    /// of the same task that would have been awoken by the original [`RawWaker`].
    clone: unsafe fn(*const ()) -> RawWaker,

    /// This function will be called when `wake` is called on the [`Waker`].
    /// It must wake up the task associated with this [`RawWaker`].
    ///
    /// The implementation of this function must make sure to release any
    /// resources that are associated with this instance of a [`RawWaker`] and
    /// associated task.
    wake: unsafe fn(*const ()),

    /// This function will be called when `wake_by_ref` is called on the [`Waker`].
    /// It must wake up the task associated with this [`RawWaker`].
    ///
    /// This function is similar to `wake`, but must not consume the provided data
    /// pointer.
    wake_by_ref: unsafe fn(*const ()),

    /// This function gets called when a [`Waker`] gets dropped.
    ///
    /// The implementation of this function must make sure to release any
    /// resources that are associated with this instance of a [`RawWaker`] and
    /// associated task.
    drop: unsafe fn(*const ()),
}
Enter fullscreen mode Exit fullscreen mode

2.4 Future 是一个状态机

我们并没有在执行上下文中保存寄存器内容,每个任务也没有独立的栈,这样虽然大大减少了分配任务和上下文切换需要的时间,但我们如何保存这些任务的执行状态,使得在每次放弃执行和继续执行时的状态相同呢?这需要 Rust 编译器的支持。

首先我们将探讨 Future 是如何可以被嵌套形成多种执行逻辑的。

因为对于执行器来说,每个 Future 都可以被调度执行,如果我们希望先执行 A 在执行 B 应该如何表示呢?我们仍然用一个新的 Future 表示,定义为 AndThen。我们将尝试用伪代码实现它,尽管其中有很多细节没有考虑

struct<F> AndThenState {
    WaittingOnFirstFut,
    WaittingOnSecondFut {
        first_fut_result: F 
    },
    Done,
}

struct<F, S> AndThen {
    first_fut: Future<F>,
    second_fut: Future<S>,
    state: AndThenState 
}

impl<F, S> Future for AndThen {
      type Output = S
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.state {
            AndThenState::WaittingOnFirstFut => {
                match self.first_fut.poll(...) {
                    Poll::Pending => {
                        do_something_with_the_waker();
                        return Poll::Pending,
                    }
                    Poll::Ready(reuslt) => {
                        self.state = AndThenState::WrittingOnSecondFut {
                            first_fut_result = result
                        };
                        do_something_with_the_waker();
                        return Poll::Pending;
                    }
                }
            },
            AndThenState::WaittingOnSecondFut => {
                match self.second_fut.poll(...) {
                    Poll::Pending => {
                        do_some_thing_with_the_waker();
                        return Poll::Pending,
                    }
                    Poll::Ready(result) => {
                        self.state = AndThenState::Done;
                        return Poll::Ready(result)
                    }
                }
            },
            AndThenState::Done => {
                panic!("Already Done!")
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

我们其实创建了一个状态机,在每一个状态中保存了这个状态执行所需要的内容。对于 Rust 编译器来说,每一个 .await 就是一个状态的节点,我们将在这里保存当前状态并将执行权交还执行器。下面的例子来自于 OS Blog

async fn example(min_len: usize) -> String {
    let content = async_read_file("foo.txt").await;
    if content.len() < min_len {
        content + &async_read_file("bar.txt").await
    } else {
        content
    }
}
Enter fullscreen mode Exit fullscreen mode

编译器将生成一个有限状态机对每个状态进行保存。至于这些状态依赖的变量怎么得到,怎么裁减掉对于这个状态无用的变量等问题超出了本文的范畴,这将由编译器决定。

状态转换图如下

State Transformation

我们可以生成以下四个状态。注意到 WaitingOnBarTxtStatemin_len 并没有需要,它不会保存在这个状态中。但是因为会使用 constent + &async_read_file("bar.txt"),将 content保存下来。

这是可能的编译器生成的状态

// The compiler-generated state structs:

struct StartState {
    min_len: usize,
}

struct WaitingOnFooTxtState {
    min_len: usize,
    foo_txt_future: impl Future<Output = String>,
}

struct WaitingOnBarTxtState {
    content: String,
    bar_txt_future: impl Future<Output = String>,
}

struct EndState {}
Enter fullscreen mode Exit fullscreen mode

于是我们可以创建状态机

enum ExampleStateMachine {
    Start(StartState),
    WaitingOnFooTxt(WaitingOnFooTxtState),
    WaitingOnBarTxt(WaitingOnBarTxtState),
    End(EndState),
}
Enter fullscreen mode Exit fullscreen mode

我们需要对原代码分段加入不同的状态,并在执行完成这一段代码后改变状态并保存下一个状态依赖的内容。因为编译器生成条件语句时会将它转变为 goto, 这对于实现状态机十分友好。

impl Future for ExampleStateMachine {
    type Output = String; // return type of `example`

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        loop {
            match self { // TODO: handle pinning
                ExampleStateMachine::Start(state) => {
                    // from body of `example`
                    let foo_txt_future = async_read_file("foo.txt");
                    // `.await` operation
                    let state = WaitingOnFooTxtState {
                        min_len: state.min_len,
                        foo_txt_future,
                    };
                    *self = ExampleStateMachine::WaitingOnFooTxt(state);
                }
                ExampleStateMachine::WaitingOnFooTxt(state) => {
                    match state.foo_txt_future.poll(cx) {
                        Poll::Pending => return Poll::Pending,
                        Poll::Ready(content) => {
                            // from body of `example`
                            if content.len() < state.min_len {
                                let bar_txt_future = async_read_file("bar.txt");
                                // `.await` operation
                                let state = WaitingOnBarTxtState {
                                    content,
                                    bar_txt_future,
                                };
                                *self = ExampleStateMachine::WaitingOnBarTxt(state);
                            } else {
                                *self = ExampleStateMachine::End(EndState);
                                return Poll::Ready(content);
                            }
                        }
                    }
                }
                ExampleStateMachine::WaitingOnBarTxt(state) => {
                    match state.bar_txt_future.poll(cx) {
                        Poll::Pending => return Poll::Pending,
                        Poll::Ready(bar_txt) => {
                            *self = ExampleStateMachine::End(EndState);
                            // from body of `example`
                            return Poll::Ready(state.content + &bar_txt);
                        }
                    }
                }
                ExampleStateMachine::End(_) => {
                    panic!("poll called after Poll::Ready was returned");
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2.5 执行器 Executor

执行器用来执行多个 Future 计算,我们将每一个 Future 计算的执行作为一个 Task 任务,执行器管理对每一个任务的调度执行。

需要清楚,执行器是非抢占性的,每个任务必须自觉的将执行权归还给执行器(返回 Poll::Pending),执行器才能进行它的工作,否则如果一个 Future 被阻塞或者一直循环,其他所有的任务都不能被继续执行(假设单个执行器)。故我们在使用系统调用时(如果操作系统)不要选择阻塞调用。

协助式多任务

  • Each future that is added to the executor is basically a cooperative task.
  • Instead of using an explicit yield operation, futures give up control of the CPU core by returning Poll::Pending (or Poll::Ready at the end).
    • There is nothing that forces futures to give up the CPU. If they want, they can never return from poll, e.g., by spinning endlessly in a loop.
    • Since each future can block the execution of the other futures in the executor, we need to trust them to not be malicious.
  • Futures internally store all the state they need to continue execution on the next poll call. With async/await, the compiler automatically detects all variables that are needed and stores them inside the generated state machine.
    • Only the minimum state required for continuation is saved.
    • Since the poll method gives up the call stack when it returns, the same stack can be used for polling other futures.

3. Embassy 的运行时(Runtime)实现

下面的解释参考了 Embassy DocumentationEmbassy 源码

Embassy Executor

我们现在探讨 Embassy 对任务的抽象 (Rust aysnc API 之外)

3.1 TaskStorage 用来封装任务状态机和调度

embassy_executor::task macro 会静态分配一个 TaskStorage 数组,这个数组保存了未初始化的任务(TaskHeader::state = !STATE_SPAWNED)

pub struct TaskPool<F: Future + 'static, const N: usize> {
    pool: [TaskStorage<F>; N],
}
Enter fullscreen mode Exit fullscreen mode

每一个 TaskStorage 保存了一个 TaskHeaderFuture

pub struct TaskStorage<F: Future + 'static> {
    raw: TaskHeader,
    future: UninitCell<F>, // Valid if STATE_SPAWNED
}

/// An uninitialized [`TaskStorage`].
pub struct AvailableTask<F: Future + 'static> {
    task: &'static TaskStorage<F>,
}
Enter fullscreen mode Exit fullscreen mode

3.2 TaskHeader 用来封装任务调度

其中 TaskHeader 起到了一个保存执行调度状态和函数逻辑的功能

  • 其中 State 为程序调度状态,分别有 spawned, run_queued, timer_queued,状态可以叠加。
  • run_queue_item 保存运行队列中的下一个任务,是运行队列的的一个节点

    RunQueueTaskHeader (pointer) → TaskRef (pointer, TaskHeader::run_queue_item) → TaskHeader (pointer) → …

  • executor 指向执行器

  • poll_fn 执行函数,这里保存任务的执行逻辑,这个函数会被调用并且完整执行(这个逻辑可以是一个对状态机的一次执行,我们将在后面对它进行解释)

/// Raw task header for use in task pointers.
pub(crate) struct TaskHeader {
    pub(crate) state: State,
    pub(crate) run_queue_item: RunQueueItem,
    pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
    poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,

    #[cfg(feature = "integrated-timers")]
    pub(crate) expires_at: SyncUnsafeCell<u64>,
    #[cfg(feature = "integrated-timers")]
    pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}
Enter fullscreen mode Exit fullscreen mode

TaskRef 是对 TaskHeader 的引用包裹,存有指向 TaskHeader 的非空指针

/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
#[derive(Clone, Copy)]
pub struct TaskRef {
    ptr: NonNull<TaskHeader>,
}
Enter fullscreen mode Exit fullscreen mode

3.3 Executors

  • arch Executor

    这里对每个支持的 CPU 架构都进行了 Executor 设计,常通过中断唤醒

    // risc-v 32 executor
    pub struct Executor {
        inner: raw::Executor,
        not_send: PhantomData<*mut ()>,
    }
    
  • raw Executor

    #[repr(transparent)]
    pub struct Executor {
        pub(crate) inner: SyncExecutor,
    
        _not_sync: PhantomData<*mut ()>,
    }
    
  • SyncExecutor

    pub(crate) struct SyncExecutor {
        run_queue: RunQueue,
        pender: Pender,
    
        #[cfg(feature = "integrated-timers")]
        pub(crate) timer_queue: timer_queue::TimerQueue,
        #[cfg(feature = "integrated-timers")]
        alarm: AlarmHandle,
    }
    

所有的poll 逻辑在 SyncExecutor 中被实现,其实现很简单,将运行队列中所有的任务出队(并在这时更新在 TaskHeader 中的任务状态为出队, i.e. run_queued 位置 0)并执行任务

注意到任务并不会被重新放回 RunQueue,如果开启时钟,我们可以将超时的任务重新放入 RunQueue,在 SyncExecutor::timer_queue::dequeue_expired 中执行 wake_task_no_pend 将这些任务在 SyncExecutor::poll 执行 SyncExecutor::run_queue::dequeue_all 前将这些任务放入运行队列 run_queue

pub fn wake_task_no_pend(task: TaskRef) {
    let header = task.header();
    if header.state.run_enqueue() {
        // We have just marked the task as scheduled, so enqueue it.
        unsafe {
            let executor = header.executor.get().unwrap_unchecked();
            executor.run_queue.enqueue(task);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

在任务执行后,再将这个任务加入时钟队列 SyncExecutor::timer_queue (此时已经移出运行队列)

pub(crate) unsafe fn update(&self, p: TaskRef) {
    let task = p.header();
    if task.expires_at.get() != u64::MAX {
        if task.state.timer_enqueue() {
            task.timer_queue_item.next.set(self.head.get());
            self.head.set(Some(p));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

下面是执行逻辑

impl SyncExecutor {
    /// # Safety
    ///
    /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
    pub(crate) unsafe fn poll(&'static self) {
        #[cfg(feature = "integrated-timers")]
        embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());

        #[allow(clippy::never_loop)]
        loop {
            #[cfg(feature = "integrated-timers")]
            self.timer_queue
                .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);

            self.run_queue.dequeue_all(|p| {
                let task = p.header();

                #[cfg(feature = "integrated-timers")]
                task.expires_at.set(u64::MAX);

                if !task.state.run_dequeue() {
                    // If task is not running, ignore it. This can happen in the following scenario:
                    //   - Task gets dequeued, poll starts
                    //   - While task is being polled, it gets woken. It gets placed in the queue.
                    //   - Task poll finishes, returning done=true
                    //   - RUNNING bit is cleared, but the task is already in the queue.
                    return;
                }

                #[cfg(feature = "rtos-trace")]
                trace::task_exec_begin(p.as_ptr() as u32);

                // Run the task
                task.poll_fn.get().unwrap_unchecked()(p);

                #[cfg(feature = "rtos-trace")]
                trace::task_exec_end();

                // Enqueue or update into timer_queue
                #[cfg(feature = "integrated-timers")]
                self.timer_queue.update(p);
            });

            #[cfg(feature = "integrated-timers")]
            {
                // If this is already in the past, set_alarm might return false
                // In that case do another poll loop iteration.
                let next_expiration = self.timer_queue.next_expiration();
                if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
                    break;
                }
            }

            #[cfg(not(feature = "integrated-timers"))]
            {
                break;
            }
        }

        #[cfg(feature = "rtos-trace")]
        trace::system_idle();
    }
}
Enter fullscreen mode Exit fullscreen mode

其中 RunQueue::dequeue_all 是一个安全抽象,实现的一个 non-blocking 链表,其算法在这里不做讨论

impl RunQueue {
        /// Empty the queue, then call `on_task` for each task that was in the queue.
    /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
    /// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
    pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
        // Atomically empty the queue.
        let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);

        // safety: the pointer is either null or valid
        let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };

        // Iterate the linked list of tasks that were previously in the queue.
        while let Some(task) = next {
            // If the task re-enqueues itself, the `next` pointer will get overwritten.
            // Therefore, first read the next pointer, and only then process the task.
            // safety: there are no concurrent accesses to `next`
            next = unsafe { task.header().run_queue_item.next.get() };

            on_task(task);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

3.4 Pender

Pender 表示执行器在对每一个能够执行的任务执行 poll 过程中有其他任务能够继续被执行,如在这个过程中 I/O 设备已经准备好缓存了。
这样我们在执行完成一遍 poll 后不进行等待(如等待中断),直接再进行一遍 poll,因为已经有任务准备好了可以进一步执行。

Pender 是 arch specific 函数,在每一个 arch 下都有对 __pender 的实现,定义如下:

impl Pender {
    pub(crate) fn pend(self) {
        extern "Rust" {
            fn __pender(context: *mut ());
        }
        unsafe { __pender(self.0) };
    }
}
Enter fullscreen mode Exit fullscreen mode

我们先了解一下 Pender 会在什么时候被调用
在开启时钟时,会在每次时钟超时后进行调用。我们在每次 SyncExecutor::poll 时会调用 set_alarm_callback,这里就会将 SyncExecutor::alarm_callback 函数传入作为回调函数,将 当前 SyncExecutor指针传入作为上下文。

impl SyncExecutor {
        pub(crate) unsafe fn poll(&'static self) {
        #[cfg(feature = "integrated-timers")]
        embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
                // polling logic ...
        }
}
Enter fullscreen mode Exit fullscreen mode
impl SyncExecutor {
        #[cfg(feature = "integrated-timers")]
    fn alarm_callback(ctx: *mut ()) {
        let this: &Self = unsafe { &*(ctx as *const Self) };
        this.pender.pend();
    }
}
Enter fullscreen mode Exit fullscreen mode

我们查看 RISC-V 32 下的实现

/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV
static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);

#[export_name = "__pender"]
fn __pender(_context: *mut ()) {
    SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
}
Enter fullscreen mode Exit fullscreen mode

暂时不知道 embassy 对 RISC-V 时中中断是怎么实现的,可能这段代码会在中断程序中执行,然后会在 U-Mode 触发中断

下面是 RISC-V 32 CPU 的 Executor 的执行实现,通过 wfi (wait for interrupt) 指令等待中断,在中断发生后就会执行。我们现在能够更加理解 SIGNAL_WORK_THREAD_MODEPender 的含义:如果在执行 poll 时出现中断,这个中断记号会被保存,在处理结束后会继续重新执行 poll,如果没有中断产生,就会进入低耗能的等待模式

Embassy Interrupt Driven Execution

pub struct Executor {
    inner: raw::Executor,
    not_send: PhantomData<*mut ()>,
}
impl Executor {
    pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
        init(self.inner.spawner());

        loop {
            unsafe {
                self.inner.poll();
                // we do not care about race conditions between the load and store operations, interrupts
                // will only set this value to true.
                critical_section::with(|_| {
                    // if there is work to do, loop back to polling
                    // TODO can we relax this?
                    if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
                        SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
                    }
                    // if not, wait for interrupt
                    else {
                        core::arch::asm!("wfi");
                    }
                });
                // if an interrupt occurred while waiting, it will be serviced here
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Q: Future 状态机的特性在 Embassy 中怎么体现?

我们会注意到,Embassy 将 我们创建的 Future 状态机保存在了 TaskStorage 中(它在一个静态数组内)

这样我们就可以将状态机和调度解耦,执行器在 Embassy 中仅负责调度,即执行那个任务,唤醒那个任务等;对每个任务,它都保存成了一个状态机,对这个状态机的执行是通过调用任务相对应的 TaskHeader::poll_fn 进行执行,这个 poll_fn 对每个任务来说都是相同的, i.e. 是状态机的执行函数,与我们之前看到的 Rust 状态机执行类似。

pub struct TaskStorage<F: Future + 'static> {
    raw: TaskHeader,
    future: UninitCell<F>, // Valid if STATE_SPAWNED
}
Enter fullscreen mode Exit fullscreen mode

我们在调用 Embassy 的执行器执行时会调用 task.poll_fn.get().unwrap_unchecked()(p),我们接下来分析这个函数

这里的 task 是一个 TaskRef,指向一个 TaskHeader

pub(crate) struct TaskHeader {
    pub(crate) state: State,
    pub(crate) run_queue_item: RunQueueItem,
    pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
    poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,

    #[cfg(feature = "integrated-timers")]
    pub(crate) expires_at: SyncUnsafeCell<u64>,
    #[cfg(feature = "integrated-timers")]
    pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}
Enter fullscreen mode Exit fullscreen mode

Task创建的初始化代码我们可以在 TaskPool 中找到,原因我们会在后面讲解。这里我们注意这个函数,它初始化了 TaskHeader::poll_fn 函数的执行逻辑

fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
    unsafe {
        self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
        self.task.future.write_in_place(future);

        let task = TaskRef::new(self.task);

        SpawnToken::new(task)
    }
}
Enter fullscreen mode Exit fullscreen mode

我们现在查看 TaskStorage::poll 的逻辑:这就是正统的 Future 状态机的一次执行模型(这个函数会被调用多次)

unsafe fn poll(p: TaskRef) {
    let this = &*(p.as_ptr() as *const TaskStorage<F>);
    let future = Pin::new_unchecked(this.future.as_mut());
    let waker = waker::from_task(p);
    let mut cx = Context::from_waker(&waker);
    match future.poll(&mut cx) {
        Poll::Ready(_) => {
            this.future.drop_in_place();
            this.raw.state.despawn();
            #[cfg(feature = "integrated-timers")]
            this.raw.expires_at.set(u64::MAX);
        }
        Poll::Pending => {}
    }
    // the compiler is emitting a virtual call for waker drop, but we know
    // it's a noop for our waker.
    mem::forget(waker);
}
Enter fullscreen mode Exit fullscreen mode

我们可以借此机会通过了解 Timer 的案例来了解以下自定义 Future 的实现

这里,我们在返回 Poll::Pending 时就通知了时间队列 time_queue,让它在一段时间后调用 waker 对执行器进行唤醒

/// A future that completes at a specified [Instant](struct.Instant.html).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Timer {
    expires_at: Instant,
    yielded_once: bool,
}

impl Unpin for Timer {}

impl Future for Timer {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.yielded_once && self.expires_at <= Instant::now() {
            Poll::Ready(())
        } else {
            embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
            self.yielded_once = true;
            Poll::Pending
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

对于 schedule_wake 的实现,我们现在给出

它将它 TaskHeader::expires_at 重新设置,使得我们在 TimerQueue 中能够在指定时间被唤醒

我们可以基于上述SyncExecutor执行器的设计得到下面的执行流程

时钟中断 → Executor::run 的循环中先将超时任务出队执行器→ 任务需要重新设置 Timer:调用 schedule_wake 更新其 TaskHeader::expires_at → 执行结束,调用 TimerQueue::update(task),如果 expires_at 仍有更新,重新加入等待队列 → …e

#[cfg(feature = "integrated-timers")]
impl embassy_time_queue_driver::TimerQueue for TimerQueue {
    fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
        let task = waker::task_from_waker(waker);
        let task = task.header();
        unsafe {
            let expires_at = task.expires_at.get();
            task.expires_at.set(expires_at.min(at));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Embassy 如何执行

我们对 Embassy 的实现做一个 high-level 的了解,通过 blinky 案例

#[embassy_executor::main]
async fn main(_spawner: Spawner) {
    let p = embassy_stm32::init(Default::default());
    info!("Hello World!");

    let mut led = Output::new(p.PC13, Level::High, Speed::Low);

    loop {
        info!("high");
        led.set_high();
        Timer::after_millis(300).await;

        info!("low");
        led.set_low();
        Timer::after_millis(300).await;
    }
}
Enter fullscreen mode Exit fullscreen mode

为了方便我们了解 Embassy 到底干了什么,我们展开这个 macro

// Recursive expansion of main_cortex_m macro
// ===========================================

#[doc(hidden)]
async fn ____embassy_main_task(_spawner: Spawner) {
    {
        let p = embassy_stm32::init(Default::default());
        info!("Hello World!");
        let mut led = Output::new(p.PC13, Level::High, Speed::Low);
        loop {
            info!("high");
            led.set_high();
            Timer::after_millis(300).await;
            info!("low");
            led.set_low();
            Timer::after_millis(300).await;
        }
    }
}
fn __embassy_main(_spawner: Spawner) -> ::embassy_executor::SpawnToken<impl Sized> {
    const POOL_SIZE: usize = 1;
    static POOL: ::embassy_executor::_export::TaskPoolRef = ::embassy_executor::_export::TaskPoolRef::new();
    unsafe {
        POOL.get::<_, POOL_SIZE>()
            ._spawn_async_fn(move || ____embassy_main_task(_spawner))
    }
}
unsafe fn __make_static<T>(t: &mut T) -> &'static mut T {
    ::core::mem::transmute(t)
}
#[doc(hidden)]
#[export_name = "main"]
pub unsafe extern "C" fn __cortex_m_rt_main_trampoline() {
    __cortex_m_rt_main()
}
fn __cortex_m_rt_main() -> ! {
    let mut executor = ::embassy_executor::Executor::new();
    let executor = unsafe { __make_static(&mut executor) };
    executor.run(|spawner| {
        spawner.must_spawn(__embassy_main(spawner));
    })
}
Enter fullscreen mode Exit fullscreen mode

简单的说,当执行交给 __cortex_m_rt_main 后,我们

  • 新建一个执行器,并初始化任务池,并
  • ____embassy_main_task 生成的 Future 传递任务池(注意到这个函数被 async 修饰,调用这个函数得到的是 Future, 函数逻辑并不会被执行,这是一个 syntactic suger)

    这个功能通过调用 TaskPool::spawn_impl 于是调用 AvailableTask::initialize_impl 实现

    impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
        fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> {
            match self.pool.iter().find_map(AvailableTask::claim) {
                Some(task) => task.initialize_impl::<T>(future),
                None => SpawnToken::new_failed(),
            }
        }
    }
    
    impl<F: Future + 'static> AvailableTask<F> {
        fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
            unsafe {
                self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
                self.task.future.write_in_place(future);
    
                let task = TaskRef::new(self.task);
    
                SpawnToken::new(task)
            }
        }
    }
    

    注意到 AvailableTask 代表一个任务池中没有被初始化的任务

    pub struct AvailableTask<F: Future + 'static> {
        task: &'static TaskStorage<F>,
    }
    
    pub struct TaskStorage<F: Future + 'static> {
        raw: TaskHeader,
        future: UninitCell<F>, // Valid if STATE_SPAWNED
    }
    

    我们先通过任务池找到一个位置,再通过 AvailableTask::claim 函数占有这个位置,最后对这个位置进行初始化

    我们需要注意到初始化中,我们设置 self.task.raw (TaskHeader) 中的任务逻辑设置为 TaskStorage::poll,我们检查这个函数

    unsafe fn poll(p: TaskRef) {
        let this = &*(p.as_ptr() as *const TaskStorage<F>);
        let future = Pin::new_unchecked(this.future.as_mut());
        let waker = waker::from_task(p);
        let mut cx = Context::from_waker(&waker);
        match future.poll(&mut cx) {
            Poll::Ready(_) => {
                this.future.drop_in_place();
                this.raw.state.despawn();
                #[cfg(feature = "integrated-timers")]
                this.raw.expires_at.set(u64::MAX);
            }
            Poll::Pending => {}
        }
        // the compiler is emitting a virtual call for waker drop, but we know
        // it's a noop for our waker.
        mem::forget(waker);
    }
    

    我们注意到在 TaskStorage 中保存了我们的 async 逻辑,即 LED 点灯逻辑(在这个案例中),因为我们的 TaskStorage 生命周期为 'static 并且生成后(除非被删除)不会被移动,当然可以直接使用 unsafe 函数 Pin::new_unchecked 创建 Feature::poll 需要的参数 self: Pin<&mut Self>

    我们接下来对这个 Feature 进行 poll 操作,这里就是 Rust 的异步接口了

    注意到当 poll 返回 Poll::Pending 的时候我们不做任何事情,因为我们可以靠着中断来唤醒 Executor

    这里对waker 使用 mem::forget 进行优化,因为 Waker 是一个接口,调用其析构函数会增加 overhead,因为我们知道这个 Waker 什么都不会做,我们使用 mem::forget 不调用其析构函数

  • 最后调用 Executor::run 来启动执行器,注意到 Spawner 是对执行器的安全抽象,我们将不安全的代码封装在闭包外,闭包内传递这个抽象 (init: impl FnOnce(Spawner))

    /// Handle to spawn tasks into an executor.
    ///
    /// This Spawner can spawn any task (Send and non-Send ones), but it can
    /// only be used in the executor thread (it is not Send itself).
    ///
    /// If you want to spawn tasks from another thread, use [SendSpawner].
    #[derive(Copy, Clone)]
    pub struct Spawner {
        executor: &'static raw::Executor,
        not_send: PhantomData<*mut ()>,
    }
    
    impl Executor {
        /// Get a spawner that spawns tasks in this executor.
        ///
        /// It is OK to call this method multiple times to obtain multiple
        /// `Spawner`s. You may also copy `Spawner`s.
        pub fn spawner(&'static self) -> super::Spawner {
            super::Spawner::new(self)
        }
    }
    
    impl Executor {
        pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
            init(self.inner.spawner());
            // do polling in a loop...
        }
    }
    

    这里的 init 逻辑就是为执行器增加任务

    executor.run(|spawner| {
        spawner.must_spawn(__embassy_main(spawner));
    })
    

    这里的 SpawnerSpawnToken 是 Embassy 对执行器和任务的安全抽象

    pub fn must_spawn<S>(&self, token: SpawnToken<S>) {
        unwrap!(self.spawn(token));
    }
    pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> {
        let task = token.raw_task;
        mem::forget(token);
        match task {
            Some(task) => {
                unsafe { self.executor.spawn(task) };
                Ok(())
            }
            None => Err(SpawnError::Busy),
        }
    }
    
    pub struct SpawnToken<S> {
        raw_task: Option<raw::TaskRef>,
        phantom: PhantomData<*mut S>,
    }
    

到此为止,Embassy 就能开始执行任务了

5. 总结

Embassy 是一个 Rust 异步运行时框架,高度结合了 Rust 语言对异步语法的优点。它借助编译器生成的状态机保存任务执行状态。它不是一个ROTS,并且不支持抢占式调度和优先级调度。由于不要求抢占式调度,它的执行是同步的(任务主动使用 .await, yeild 让出CPU,此时由Rust编译器生成状态保存的代码),不使用操作系统中常用的方法(上下文切换,独立栈)的方法保存任务进度(因为没有必要)。这大大减少了保存和恢复上下文的损耗,使得性能的以提高。在 Dion 的测试中,STM32F446 微控制器在 Embassy 的平均中断时间较 FreeRTOS 减少 51.0%,平均中断延迟较 FreeRTOS 减少 24.8%,平均线程时间较 FreeRTOS 减少 28.1%。

但是经过目前的理解和探索,并没有找到 Embassy 对抢占式优先级调度的证据,暂且认为它不支持抢占式优先级调度,这可能使得在一些场景下不太适用。

Top comments (0)