Rust-理解Future执行器与任务调度

这篇内容主要是对底层探秘: Future 执行器与任务调度教程的一些个人理解记录。

Async 简介

现代的编程语言一般都对并发提供了支持,每种语言可能提供不同的并发模型,包括多线程、事件驱动、协程、async/await 等。在 Rust 中同时提供了多线程编程和 async 编程。

Rust 中提供的 async 模型,相比较其他语言还是有些区别:

  • Future 在 Rust 中是惰性的,只有在被轮询(poll)时才会运行,因此丢弃一个 future 会阻止它未来再被运行,可以将Future理解为一个在未来某个时间点被调度执行的任务;
  • Async 在 Rust 中使用开销是零, 意味着只有你能看到的代码(自己的代码)才有性能损耗,你看不到的(async 内部实现)都没有性能损耗;
  • Rust 没有内置异步调用所必需的运行时,但是无需担心,Rust 社区生态中已经提供了非常优异的运行时实现,如 tokio;
  • 运行时同时支持单线程和多线程。

事实上, async 底层也是基于线程实现,但是它基于线程封装了一个运行时,可以将多个任务映射到少量线程上,然后将线程切换变成了任务切换,后者仅仅是内存中的访问,因此要高效的多。

Future 惰性简单理解

对“Future 在 Rust 中是惰性的”我们通过下面的程序来理解下。

#[test]
fn future_test() {
    f1();
}

pub async fn f1() {
    println!("f1");
    f2();
}

pub async fn f2() {
    println!("f2");
}
running 1 test
test tmp::only_test::future_test ... ok

successes:

successes:
    tmp::only_test::future_test

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

我们运行上面的程序,会发现不会打印任何信息。同时编译器会有这样的提示

unused implementer of futures::Future that must be used futures do nothing unless you .await or poll them #[warn(unused_must_use)] on by default。

这就是上面所说“Future 在 Rust 中是惰性的”。如果没有使用 .await 或者其他方式去推它一把,它不会自己主动执行。

想要 Future 执行,需要使用 .wait 或者使用 futures 中的 block_on 函数。让我们改写下代码

#[test]
fn future_test() {
  block_on(f1());
}

pub async fn f1() {
    println!("f1");
    f2().await;
}

pub async fn f2() {
    println!("f2");
}
running 1 test
f1
f2
test tmp::only_test::future_test ... ok

successes:

successes:
    tmp::only_test::future_test

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

可以看到程序如我们期望,进行了信息打印。

那么 Future 到底是如何被执行的呢?这个就是我们这篇内容需要去了解的。

Future 执行过程

Future执行流程

首先放一张整体的执行流程大图,这个是对文章中代码执行流程的梳理。不同于书中的分析过程,我试着从自己的视角来串一下执行流程,希望能说的更清晰一点。这里是先沿着黑线再到蓝线来进行串联。

Spawner

Spawner
/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

// 方法用于接收 Future , 然后将它放入任务通道中
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("任务队列已满");
    }
}

首先是 Spawner,顾名思义,Spawner 就是生产任务的。Spawner 其实很简单,只有一个 spawn 方法,接收 Future 类型的传入参数 ,将传入的 Future 包装成一个 Task 对象,然后放入到 Channel 中。

Executor

Executor

/// 任务执行器,负责从通道中接收任务然后执行
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

// 执行器将从通道中获取任务,然后进行 poll 执行:
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 基于任务自身创建一个 `LocalWaker`
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
                // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
                if future.as_mut().poll(context).is_pending() {
                    // Future还没执行完,因此将它放回任务中,等待下次被poll
                    *future_slot = Some(future);
                }
            }
        }
    }
}

Executor 的 run 方法一直循环从 Channel 中拉取 Task。在获取到 Task 之后,将使用其引用创建一个 Waker 对象,这个对象很重要,是后面流程串联的重要内容

接着使用 Waker 对象创建一个 Context 对象,然后将这个对象作为参数调用 future.poll() 方法。如果 poll() 的结果是 Pending,那么会将 Future 对象继续放入 Task 中,等待下次执行。否则,这个 Task 的执行流程到此结束。

Future

Future

SharedState

/// 在Future和等待的线程间共享状态
struct SharedState {
    /// 定时(睡眠)是否结束
    completed: bool,

    /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
    waker: Option<Waker>,
}

SharedState 并没有什么特别的逻辑,只有两个简单的属性,作为辅助 Future 的状态容器而已。

Future#poll

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 通过检查共享状态,来确定定时器是否已经完成
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
            //
            // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
            // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
            // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

终于到了今天的主角:Future,首先看 Future 类中的 poll() 方法。是不跟预想中的不一样?这个方法可以说,实在是太简单了。只是根据持有的 SharedState 对象判断了下是否完成而已,再没有任何其他逻辑。在状态为未完成的时候,会将 poll() 方法的入参 Context 对象中持有的 Waker 对象赋值给 SharedState,这个可以算作是唯一的逻辑。

在我看来这个才是 Future 调度器设计的精巧之处,因为只有这样极其简单的逻辑,才能支撑起 Executor 的高效执行。作为一个高效的 Executor,每时每刻都要接收海量的 Task 输入,如果这个 poll() 执行比较耗时,势必降低 Task 的调度效率,造成积压。

Future#new

Future#new
impl TimerFuture {
     /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
     pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // 创建新线程
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            // 睡眠指定时间实现计时功能
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

再看 TimerFuture 的 new() 方法,这个方法是 TimerFuture 的实例化方法。这里首先新建了一个 SharedState 对象,然后新建了一个线程。新建的线程 sleep 了指定时间,主要是为了模拟了 Future 中的耗时操作。在耗时操作结束后,将 SharedState 对象改成已完成状态,标记这个 Future 已经完成。因为 Future 已经执行完成,所需要调用 Waker 对象的 wake() 方法,来唤醒任务继续执行。

Task

Task
/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
    /// 进行中的Future,在未来的某个时间点会被完成
    ///
    /// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
    /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
    /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
    ///
    /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// 可以将该任务自身放回到任务通道中,等待执行器的poll
    task_sender: SyncSender<Arc<Task>>,
}

// 在执行器 poll 一个 Future 之前,首先需要调用 wake 方法进行唤醒,然后再由 Waker 负责调度该任务并将其放入任务通道中
// 当任务实现了 ArcWake 特征后,它就变成了 Waker ,在调用 wake() 对其唤醒后会将任务复制一份所有权( Arc ),然后将其发送到任务通道中
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("任务队列已满");
    }
}

Task 类可以看做是对 Future 的简单包装。需要注意的是 Task 实现了 ArcWake 特征,意味着 Task 也是一个 Waker。看下 wake_by_ref 方法,只是简单的把自己又放入 Channel 中而已。不过,这就够了,有了这个操作就可以实现流程的完整闭环。因为我们前面已经知道,有 Executor 负责不断的从 Channel 中拉取任务来执行。

测试运行

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 任务通道允许的最大缓冲数(任务队列的最大长度)
    // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = mpsc::sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

#[test]
fn future_test() {
    let (executor, spawner) = new_executor_and_spawner();

    // 生成一个任务
    spawner.spawn(async {
        println!("{} howdy!", Local::now().format(LONG_DATE_PATTERN));
        // 创建定时器Future,并等待它完成
        TimerFuture::new(Duration::new(10, 0)).await;
        println!("{} done!", Local::now().format(LONG_DATE_PATTERN));
    });

    // drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来
    drop(spawner);

    // 运行执行器直到任务队列为空
    // 任务运行后,会先打印`howdy!`, 暂停10秒,接着打印 `done!`
    executor.run();

    println!("{} executor over!", Local::now().format(LONG_DATE_PATTERN));

    // let b = async {};
}
running 1 test
2024-05-29 10:11:48 howdy!
2024-05-29 10:11:58 done!
2024-05-29 10:11:58 executor over!
test tmp::future_test::future_test ... ok

successes:

successes:
    tmp::future_test::future_test

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 1 filtered out; finished in 10.00s

最后,我们来测试下这个调度器的运行情况,可以看到执行的效果也完全符合预期。

到这里大家应该就可以理清整个流程了吧,实际的 Future 调度过程应该比这个复杂很多,但是整体逻辑估计不会差很多。整个过程的设计还是非常精巧,非常值得我们去学习。

Async 编程简介

底层探秘: Future 执行器与任务调度

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容