跳到主要内容

LocalSet

搜索

结构体 LocalSet 

源代码
pub struct LocalSet { /* 私有字段 */ }
展开描述

在同一线程上执行的任务的集合。

在某些情况下,有必要运行一个或多个未实现 Send 的 future,因此在线程之间发送它们是不安全的。在这些情况下,可以使用本地任务集来调度一个或多个 !Send future 在同一线程上一起运行。

例如,以下代码将无法编译:

use std::rc::Rc;

#[tokio::main]
async fn main() {
    // `Rc` does not implement `Send`, and thus may not be sent between
    // threads safely.
    let nonsend_data = Rc::new("my nonsend data...");

    let nonsend_data = nonsend_data.clone();
    // Because the `async` block here moves `nonsend_data`, the future is `!Send`.
    // Since `tokio::spawn` requires the spawned future to implement `Send`, this
    // will not compile.
    tokio::spawn(async move {
        println!("{}", nonsend_data);
        // ...
    }).await.unwrap();
}

§Use with run_until

要生成 !Send future,我们可以使用本地任务集来在调用 Runtime::block_on 的线程上调度它们。在本地任务集中运行时,我们可以使用 task::spawn_local,它可以生成 !Send future。例如:

use std::rc::Rc;
use tokio::task;

let nonsend_data = Rc::new("my nonsend data...");

// Construct a local task set that can run `!Send` futures.
let local = task::LocalSet::new();

// 运行本地任务集。
local.run_until(async move {
    let nonsend_data = nonsend_data.clone();
    // `spawn_local` ensures that the future is spawned on the local
    // task set.
    task::spawn_local(async move {
        println!("{}", nonsend_data);
        // ...
    }).await.unwrap();
}).await;

注意:run_until 方法只能在 #[tokio::main]#[tokio::test] 中使用,或者直接在 Runtime::block_on 调用中使用。它不能在通过 tokio::spawn 生成的任务内部使用。

§Awaiting a LocalSet

此外,LocalSet 本身实现了 Future,当在 LocalSet 上生成的所有任务都完成时完成。这可用于在 LocalSet 上运行多个 future 并驱动整个集合直至它们完成。例如:

use tokio::{task, time};
use std::rc::Rc;

let nonsend_data = Rc::new("world");
let local = task::LocalSet::new();

let nonsend_data2 = nonsend_data.clone();
local.spawn_local(async move {
    // ...
    println!("hello {}", nonsend_data2)
});

local.spawn_local(async move {
    time::sleep(time::Duration::from_millis(100)).await;
    println!("goodbye {}", nonsend_data)
});

// ...

local.await;

注意:等待 LocalSet 只能在 #[tokio::main]#[tokio::test] 中进行,或者直接在 Runtime::block_on 调用中进行。它不能在通过 tokio::spawn 生成的任务内部使用。

§Use inside tokio::spawn

上面提到的两个方法不能在 tokio::spawn 内部使用,因此要在 tokio::spawn 内部生成 !Send future,我们需要采用其他方法。解决方案是在其他地方创建 LocalSet,并使用 mpsc 通道与之通信。

下面的示例将 LocalSet 放在一个新线程中。

use tokio::runtime::Builder;
use tokio::sync::{mpsc, oneshot};
use tokio::task::LocalSet;

// This struct describes the task you want to spawn. Here we include
// some simple examples. The oneshot channel allows sending a response
// to the spawner.
#[derive(Debug)]
enum Task {
    PrintNumber(u32),
    AddOne(u32, oneshot::Sender<u32>),
}

#[derive(Clone)]
struct LocalSpawner {
   send: mpsc::UnboundedSender<Task>,
}

impl LocalSpawner {
    pub fn new() -> Self {
        let (send, mut recv) = mpsc::unbounded_channel();

        let rt = Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn(move || {
            let local = LocalSet::new();

            local.spawn_local(async move {
                while let Some(new_task) = recv.recv().await {
                    tokio::task::spawn_local(run_task(new_task));
                }
                // If the while loop returns, then all the LocalSpawner
                // objects have been dropped.
            });

            // This will return once all senders are dropped and all
            // spawned tasks have returned.
            rt.block_on(local);
        });

        Self {
            send,
        }
    }

    pub fn spawn(&self, task: Task) {
        self.send.send(task).expect("Thread with LocalSet has shut down.");
    }
}

// This task may do !Send stuff. We use printing a number as an example,
// but it could be anything.
//
// The Task struct is an enum to support spawning many different kinds
// of operations.
async fn run_task(task: Task) {
    match task {
        Task::PrintNumber(n) => {
            println!("{}", n);
        },
        Task::AddOne(n, response) => {
            // We ignore failures to send the response.
            let _ = response.send(n + 1);
        },
    }
}

#[tokio::main]
async fn main() {
    let spawner = LocalSpawner::new();

    let (send, response) = oneshot::channel();
    spawner.spawn(Task::AddOne(10, send));
    let eleven = response.await.unwrap();
    assert_eq!(eleven, 11);
}

实现§

源代码§

impl LocalSet

源代码

pub fn new() -> LocalSet

返回一个新的本地任务集。

源代码

pub fn enter(&self) -> LocalEnterGuard

进入此 LocalSet 的上下文。

spawn_local 方法将在你所处的上下文中的 LocalSet 上生成任务。

源代码

pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
where F: Future + 'static, F::Output: 'static,

!Send 任务生成本地任务集上。

此任务保证在当前线程上运行。

与自由函数 spawn_local 不同,此方法可在 LocalSet 运行时用于生成本地任务。一旦下次启动 LocalSet,所提供的 future 将开始运行,即使你没有等待返回的 JoinHandle

§示例
use tokio::task;

let local = task::LocalSet::new();

// Spawn a future on the local set. This future will be run when
// we call `run_until` to drive the task set.
local.spawn_local(async {
    // ...
});

// 运行本地任务集。
local.run_until(async move {
    // ...
}).await;

// When `run` finishes, we can spawn _more_ futures, which will
// run in subsequent calls to `run_until`.
local.spawn_local(async {
    // ...
});

local.run_until(async move {
    // ...
}).await;
源代码

pub fn block_on<F>(&self, rt: &Runtime, future: F) -> F::Output
where F: Future,

在所提供的运行时上运行 future 直至完成,并在当前线程上驱动此任务集上生成的任何本地 future。

此函数在运行时上运行给定的 future,阻塞直至完成,并产生其已解析的结果。future 在内部生成的任何任务或计时器都将在运行时上执行。该 future 也可以调用 spawn_local 在当前线程上 spawn_local 额外的本地 future。

此方法不应在异步上下文中调用。

§恐慌

如果执行器已满、所提供的 future 发生 panic,或者在异步执行上下文中调用,则此函数会发生 panic。

§注意

由于此函数在内部调用 Runtime::block_on,并在 block_on 调用内部驱动本地任务集中的 future,因此本地 future 不能使用就地阻塞。如果需要从本地任务发出阻塞调用,则可以使用 spawn_blocking API。

例如,这将会发生 panic:

use tokio::runtime::Runtime;
use tokio::task;

let rt  = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
    let join = task::spawn_local(async {
        let blocking_result = task::block_in_place(|| {
            // ...
        });
        // ...
    });
    join.await.unwrap();
})

但是,这不会发生 panic:

use tokio::runtime::Runtime;
use tokio::task;

let rt  = Runtime::new().unwrap();
let local = task::LocalSet::new();
local.block_on(&rt, async {
    let join = task::spawn_local(async {
        let blocking_result = task::spawn_blocking(|| {
            // ...
        }).await;
        // ...
    });
    join.await.unwrap();
})
源代码

pub async fn run_until<F>(&self, future: F) -> F::Output
where F: Future,

在本地集上运行 future 直至完成,并返回其输出。

此方法返回一个 future,它使用本地集运行给定的 future,允许它调用 spawn_local 生成额外的 !Send future。在本地集上生成的任何本地 future 都将在后台驱动,直至传递给 run_until 的 future 完成。当传递给 run_until 的 future 完成时,任何尚未完成的本地 future 将保留在本地集上,并将在后续对 run_until 的调用中或等待本地集本身时被驱动。

§取消安全性

future 可安全取消时,此方法是可安全取消的。

§示例
use tokio::task;

task::LocalSet::new().run_until(async {
    task::spawn_local(async move {
        // ...
    }).await.unwrap();
    // ...
}).await;
源代码

pub fn id(&self) -> Id

返回当前 LocalSet 运行时的 Id

§示例
use tokio::task;

let local_set = task::LocalSet::new();
println!("Local set id: {}", local_set.id());

trait 实现§

源代码§

impl Debug for LocalSet

源代码§

fn fmt(&self, fmt: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息
源代码§

impl 默认值 for LocalSet

源代码§

fn default() -> LocalSet

Returns the “default value” for a type. 更多信息
源代码§

impl Drop for LocalSet

源代码§

fn drop(&mut self)

执行此类型的析构函数。 更多信息
源代码§

impl Future for LocalSet

源代码§

type Output = ()

Future 完成时产生的值的类型。
源代码§

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>

Attempts to resolve the future to a final value, registering the current task for wakeup if the value is not yet available. 更多信息

自动 trait 实现§

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<F> IntoFuture for F
where F: Future,

源代码§

type Output = <F as Future>::Output

Future 完成时产生的输出。
源代码§

type IntoFuture = F

我们将要把此值转变成哪种 future?
源代码§

fn into_future(self) -> <F as IntoFuture>::IntoFuture

Creates a future from a value. 更多信息
源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。