异步运行时
project1
实现了一个最小的 runtime
框架
- executor(MiniTokio): 保存任务队列,不断尝试 poll 每个 task,如果任务完成就移除队列,如果没有完成就加到队尾
struct MiniTokio { tasks: VecDeque<Task>, } impl MiniTokio { pub fn new() -> Self { Self { tasks: VecDeque::new(), } } fn run(&mut self) { let waker = futures::task::noop_waker(); let mut cx = Context::from_waker(&waker); while let Some(mut task) = self.tasks.pop_front() { if task.as_mut().poll(&mut cx).is_pending() { println!("a"); self.tasks.push_back(task); } } } }
- task: 封装了 future
type Task = Pin<Box<dyn Future<Output = ()>>>;
- spawner: 作为 runtime 的函数,将 task 添加到队尾
fn spawn<F>(&mut self, f: F) where F: Future<Output = ()> + 'static, { self.tasks.push_back(Box::pin(f)); }
project2
execotor 本身的 push_back 操作就是 wake 的实现
只要没有 ready 就重新加入队列,这种做法执行失败就立即重会占用大量 cpu 资源,应该等到 ready 是在重新唤醒 (加入队列)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if (Instant::now() >= self.when) {
Poll::Ready("aaa")
} else {
Poll::Pending
}
}
project2
框架
- execotor: 只需要一个 receiver,不断尝试接受任务去 poll,结果是什么无所谓
struct MiniTokio { sender: Sender<Arc<Task>>, // 等会在说这个 receiver: Receiver<Arc<Task>>, } impl MiniTokio { pub fn new() -> Self { let (cx, rx) = crossbeam::channel::unbounded(); Self { sender: cx, receiver: rx, } } fn run(&self) { while let Ok(task) = self.receiver.recv() { let waker = futures::task::waker(task.clone()); let mut cx = Context::from_waker(&waker); let mut future = task.future.lock().expect("加锁失败"); let _ = future.as_mut().poll(&mut cx); } } }
- task: 除了 future 还有一个 sender, task 实现了 Waker,当 task pending 时会按照策略调用 wake 方法,
把自己 send 到 execotor
struct Task { future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>, sender: Sender<Arc<Task>>, } impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { arc_self .sender .send(arc_self.clone()) .expect("send 会 queue 失败了"); } }
- spawner: 因为 execotor 现在同时保留着 sender 和 receiver,两者都不会被 drop,程序不能正常退出,下一步需要将这两个分离
fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static, { let task = Task { future: Mutex::new(Box::pin(future)), sender: self.sender.clone(), }; self.sender .send(Arc::new(task)) .expect("spawner send new task failed"); }
wake 实现
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.when {
Poll::Ready("aaa")
} else {
// 这里调用 wake 依赖的是 task 实现的 wake 方法
// 1. 立即 send
cx.waker().wake_by_ref();
// 2. 这个是稍微优化过的 wake 策略
let waker = cx.waker().clone();
let when = self.when;
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
不优雅的关闭
fn main() {
let mut runtime = MiniTokio::new();
runtime.spawn(async {
let when = Instant::now() + Duration::from_secs(2);
let future = Delay::new(when);
let out = future.await;
assert_eq!(out, "aaa");
println!("{out}");
std::process::exit(0); // 需要手动退出
});
runtime.run();
}
project3
- 分离 executor(receiver) 和 spawner(sender), 当 receiver 运行结束后 receiver 就销毁
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}