pub struct LocalSet { /* 私有字段 */ }展开描述
在同一线程上执行的任务的集合。
在某些情况下,有必要运行一个或多个未实现 Send 的 future,因此在线程之间发送它们是不安全的。在这些情况下,可以使用本地任务集来调度一个或多个 !Send future 在同一线程上一起运行。
例如,以下代码将无法编译:
use std::rc::Rc;
#[tokio::main]
async fn main() {
// `Rc` does not implement `Send`, and thus may not be sent between
// threads safely.
let nonsend_data = Rc::new("my nonsend data...");
let nonsend_data = nonsend_data.clone();
// Because the `async` block here moves `nonsend_data`, the future is `!Send`.
// Since `tokio::spawn` requires the spawned future to implement `Send`, this
// will not compile.
tokio::spawn(async move {
println!("{}", nonsend_data);
// ...
}).await.unwrap();
}§Use with run_until
要生成 !Send future,我们可以使用本地任务集来在调用 Runtime::block_on 的线程上调度它们。在本地任务集中运行时,我们可以使用 task::spawn_local,它可以生成 !Send future。例如:
use std::rc::Rc;
use tokio::task;
let nonsend_data = Rc::new("my nonsend data...");
// Construct a local task set that can run `!Send` futures.
let local = task::LocalSet::new();
// 运行本地任务集。
local.run_until(async move {
let nonsend_data = nonsend_data.clone();
// `spawn_local` ensures that the future is spawned on the local
// task set.
task::spawn_local(async move {
println!("{}", nonsend_data);
// ...
}).await.unwrap();
}).await;注意:run_until 方法只能在 #[tokio::main]、#[tokio::test] 中使用,或者直接在 Runtime::block_on 调用中使用。它不能在通过 tokio::spawn 生成的任务内部使用。
§Awaiting a LocalSet
此外,LocalSet 本身实现了 Future,当在 LocalSet 上生成的所有任务都完成时完成。这可用于在 LocalSet 上运行多个 future 并驱动整个集合直至它们完成。例如:
use tokio::{task, time};
use std::rc::Rc;
let nonsend_data = Rc::new("world");
let local = task::LocalSet::new();
let nonsend_data2 = nonsend_data.clone();
local.spawn_local(async move {
// ...
println!("hello {}", nonsend_data2)
});
local.spawn_local(async move {
time::sleep(time::Duration::from_millis(100)).await;
println!("goodbye {}", nonsend_data)
});
// ...
local.await;注意:等待 LocalSet 只能在 #[tokio::main]、#[tokio::test] 中进行,或者直接在 Runtime::block_on 调用中进行。它不能在通过 tokio::spawn 生成的任务内部使用。
§Use inside tokio::spawn
上面提到的两个方法不能在 tokio::spawn 内部使用,因此要在 tokio::spawn 内部生成 !Send future,我们需要采用其他方法。解决方案是在其他地方创建 LocalSet,并使用 mpsc 通道与之通信。
下面的示例将 LocalSet 放在一个新线程中。
use tokio::runtime::Builder;
use tokio::sync::{mpsc, oneshot};
use tokio::task::LocalSet;
// This struct describes the task you want to spawn. Here we include
// some simple examples. The oneshot channel allows sending a response
// to the spawner.
#[derive(Debug)]
enum Task {
PrintNumber(u32),
AddOne(u32, oneshot::Sender<u32>),
}
#[derive(Clone)]
struct LocalSpawner {
send: mpsc::UnboundedSender<Task>,
}
impl LocalSpawner {
pub fn new() -> Self {
let (send, mut recv) = mpsc::unbounded_channel();
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
std::thread::spawn(move || {
let local = LocalSet::new();
local.spawn_local(async move {
while let Some(new_task) = recv.recv().await {
tokio::task::spawn_local(run_task(new_task));
}
// If the while loop returns, then all the LocalSpawner
// objects have been dropped.
});
// This will return once all senders are dropped and all
// spawned tasks have returned.
rt.block_on(local);
});
Self {
send,
}
}
pub fn spawn(&self, task: Task) {
self.send.send(task).expect("Thread with LocalSet has shut down.");
}
}
// This task may do !Send stuff. We use printing a number as an example,
// but it could be anything.
//
// The Task struct is an enum to support spawning many different kinds
// of operations.
async fn run_task(task: Task) {
match task {
Task::PrintNumber(n) => {
println!("{}", n);
},
Task::AddOne(n, response) => {
// We ignore failures to send the response.
let _ = response.send(n + 1);
},
}
}
#[tokio::main]
async fn main() {
let spawner = LocalSpawner::new();
let (send, response) = oneshot::channel();
spawner.spawn(Task::AddOne(10, send));
let eleven = response.await.unwrap();
assert_eq!(eleven, 11);
}实现§
源代码§impl LocalSet
impl LocalSet
源代码pub fn enter(&self) -> LocalEnterGuard
pub fn enter(&self) -> LocalEnterGuard
进入此 LocalSet 的上下文。
spawn_local 方法将在你所处的上下文中的 LocalSet 上生成任务。
源代码pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> ⓘ
pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> ⓘ
将 !Send 任务生成本地任务集上。
此任务保证在当前线程上运行。
与自由函数 spawn_local 不同,此方法可在 LocalSet 未运行时用于生成本地任务。一旦下次启动 LocalSet,所提供的 future 将开始运行,即使你没有等待返回的 JoinHandle。
§示例
use tokio::task;
let local = task::LocalSet::new();
// Spawn a future on the local set. This future will be run when
// we call `run_until` to drive the task set.
local.spawn_local(async {
// ...
});
// 运行本地任务集。
local.run_until(async move {
// ...
}).await;
// When `run` finishes, we can spawn _more_ futures, which will
// run in subsequent calls to `run_until`.
local.spawn_local(async {
// ...
});
local.run_until(async move {
// ...
}).await;源代码pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Outputwhere
F: Future,
pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Outputwhere
F: Future,
在所提供的运行时上运行 future 直至完成,并在当前线程上驱动此任务集上生成的任何本地 future。
此函数在运行时上运行给定的 future,阻塞直至完成,并产生其已解析的结果。future 在内部生成的任何任务或计时器都将在运行时上执行。该 future 也可以调用 spawn_local 在当前线程上 spawn_local 额外的本地 future。
此方法不应在异步上下文中调用。
§恐慌
如果执行器已满、所提供的 future 发生 panic,或者在异步执行上下文中调用,则此函数会发生 panic。
§注意
由于此函数在内部调用 Runtime::block_on,并在 block_on 调用内部驱动本地任务集中的 future,因此本地 future 不能使用就地阻塞。如果需要从本地任务发出阻塞调用,则可以使用 spawn_blocking API。
例如,这将会发生 panic:
use tokio::runtime::Runtime;
use tokio::task;
let rt = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
let join = task::spawn_local(async {
let blocking_result = task::block_in_place(|| {
// ...
});
// ...
});
join.await.unwrap();
})但是,这不会发生 panic:
use tokio::runtime::Runtime;
use tokio::task;
let rt = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
let join = task::spawn_local(async {
let blocking_result = task::spawn_blocking(|| {
// ...
}).await;
// ...
});
join.await.unwrap();
})源代码pub async fn run_until<F>(&self, future: F) -> F::Outputwhere
F: Future,
pub async fn run_until<F>(&self, future: F) -> F::Outputwhere
F: Future,
在本地集上运行 future 直至完成,并返回其输出。
此方法返回一个 future,它使用本地集运行给定的 future,允许它调用 spawn_local 生成额外的 !Send future。在本地集上生成的任何本地 future 都将在后台驱动,直至传递给 run_until 的 future 完成。当传递给 run_until 的 future 完成时,任何尚未完成的本地 future 将保留在本地集上,并将在后续对 run_until 的调用中或等待本地集本身时被驱动。
§取消安全性
当 future 可安全取消时,此方法是可安全取消的。
§示例
use tokio::task;
task::LocalSet::new().run_until(async {
task::spawn_local(async move {
// ...
}).await.unwrap();
// ...
}).await;