跳到主要内容

Notify

搜索

结构体 Notify 

源代码
pub struct Notify { /* 私有字段 */ }
展开描述

通知单个任务唤醒。

Notify 提供了一种用于向单个任务通知事件的基本机制。Notify 本身不携带任何数据。相反,它用于向另一个任务发出执行操作的信号。

可以将 Notify 视为一个以 0 个许可开始的 Semaphorenotified().await 方法等待许可变为可用,而 notify_one() 在当前没有可用许可时设置一个许可。

Notify 的同步细节类似于 std 中的 thread::parkThread::unparkNotify 值包含单个许可。notified().await 等待许可可用,消费该许可,然后恢复运行。notify_one() 设置许可,如果有挂起的任务则唤醒它。

如果在 notified().await 之前调用 notify_one(),则下一次调用 notified().await 将立即完成,消费该许可。对 notified().await 的任何后续调用都将等待新的许可。

如果在 notified().await 之前多次调用 notify_one(),则仅存储单个许可。下一次调用 notified().await 将立即完成,但再下一次调用将等待新的许可。

§示例

基本用法。

use tokio::sync::Notify;
use std::sync::Arc;

let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

let handle = tokio::spawn(async move {
    notify2.notified().await;
    println!("received notification");
});

println!("sending notification");
notify.notify_one();

// Wait for task to receive notification.
handle.await.unwrap();

无界多生产者单消费者 (mpsc) 通道。

使用此通道时不会丢失任何唤醒,因为对 notify_one() 的调用会将许可存储在 Notify 中,随后的 notified() 调用将消费该许可。

use tokio::sync::Notify;

use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    values: Mutex<VecDeque<T>>,
    notify: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, value: T) {
        self.values.lock().unwrap()
            .push_back(value);

        // Notify the consumer a value is available
        self.notify.notify_one();
    }

    // This is a single-consumer channel, so several concurrent calls to
    // `recv` are not allowed.
    pub async fn recv(&self) -> T {
        loop {
            // Drain values
            if let Some(value) = self.values.lock().unwrap().pop_front() {
                return value;
            }

            // Wait for values to be available
            self.notify.notified().await;
        }
    }
}

无界多生产者多消费者 (mpmc) 通道。

调用 enable 很重要,因为否则如果你并行调用两个 recv 和两个 send,则可能会发生以下情况:

  1. Both calls to try_recv return None.
  2. Both new elements are added to the vector.
  3. The notify_one method is called twice, adding only a single permit to the Notify.
  4. Both calls to recv reach the Notified future. One of them consumes the permit, and the other sleeps forever.

如果在 try_recv 之前通过调用 enableNotified future 添加到列表中,则第三步中的 notify_one 调用会从列表中移除这些 future 并将它们标记为已通知,而不是向 Notify 添加许可。这确保了两个 future 都会被唤醒。

请注意,只有在存在两个对 recv 的并发调用时才会发生此失败。这就是为什么上面的 mpsc 示例不需要调用 enable

use tokio::sync::Notify;

use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    messages: Mutex<VecDeque<T>>,
    notify_on_sent: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, msg: T) {
        let mut locked_queue = self.messages.lock().unwrap();
        locked_queue.push_back(msg);
        drop(locked_queue);

        // Send a notification to one of the calls currently
        // waiting in a call to `recv`.
        self.notify_on_sent.notify_one();
    }

    pub fn try_recv(&self) -> Option<T> {
        let mut locked_queue = self.messages.lock().unwrap();
        locked_queue.pop_front()
    }

    pub async fn recv(&self) -> T {
        let future = self.notify_on_sent.notified();
        tokio::pin!(future);

        loop {
            // Make sure that no wakeup is lost if we get
            // `None` from `try_recv`.
            future.as_mut().enable();

            if let Some(msg) = self.try_recv() {
                return msg;
            }

            // Wait for a call to `notify_one`.
            //
            // This uses `.as_mut()` to avoid consuming the future,
            // which lets us call `Pin::set` below.
            future.as_mut().await;

            // Reset the future in case another call to
            // `try_recv` got the message before us.
            future.set(self.notify_on_sent.notified());
        }
    }
}

实现§

源代码§

impl Notify

源代码

pub fn new() -> Notify

创建一个新的 Notify,初始化时没有许可。

§示例
use tokio::sync::Notify;

let notify = Notify::new();
源代码

pub const fn const_new() -> Notify

创建一个新的 Notify,初始化时没有许可。

当使用 tracing 不稳定特性时,使用 const_new 创建的 Notify 将不会被检测。因此,它将不会出现在 tokio-console 中。如果需要,应改用 Notify::new 创建一个已检测的对象。

§示例
use tokio::sync::Notify;

static NOTIFY: Notify = Notify::const_new();
源代码

pub fn notified(&self) -> Notified<'_>

等待通知。

等效于:

async fn notified(&self);

每个 Notify 值都持有一个许可。如果通过先前调用 notify_one() 已有可用许可,则 notified().await 将立即完成,消费该许可。否则,notified().await 将等待通过下一次调用 notify_one() 提供许可。

如果 Notified future 尚未被 poll,则不能保证它会收到来自对 notify_one() 调用的唤醒。有关更多详细信息,请参阅 Notified::enable() 的文档。

Notified future 保证在创建后立即就能收到来自 notify_waiters() 的唤醒,即使它尚未被 poll。

§取消安全性

此方法使用一个队列来按请求顺序公平地分发通知。取消对 notified 的调用会使你失去在队列中的位置。

§示例
use tokio::sync::Notify;
use std::sync::Arc;

let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

tokio::spawn(async move {
    notify2.notified().await;
    println!("received notification");
});

println!("sending notification");
notify.notify_one();
源代码

pub fn notified_owned(self: Arc<Self>) -> OwnedNotified

使用拥有的 Future 等待通知。

与返回绑定到 Notify 生命周期的 future 的 Self::notified 不同,notified_owned 创建了一个拥有其通知状态的独立 future,使其可以在线程之间安全移动。

有关更多详细信息,请参阅 Self::notified

§取消安全性

此方法使用一个队列来按请求顺序公平地分发通知。取消对 notified_owned 的调用会使你失去在队列中的位置。

§示例
use std::sync::Arc;
use tokio::sync::Notify;

let notify = Arc::new(Notify::new());

for _ in 0..10 {
    let notified = notify.clone().notified_owned();
    tokio::spawn(async move {
        notified.await;
        println!("received notification");
    });
}

println!("sending notification");
notify.notify_waiters();
源代码

pub fn notify_one(&self)

通知第一个等待任务。

如果有任务当前正在等待,则会通知该任务。否则,许可将存储在此 Notify 值中,并且下一次调用 notified().await 将立即完成,消费此次调用 notify_one() 提供的许可。

Notify 最多可以存储一个许可。对 notify_one 的多次顺序调用将导致存储单个许可。下一次调用 notified().await 将立即完成,但再下一次调用将等待。

§示例
use tokio::sync::Notify;
use std::sync::Arc;

let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

tokio::spawn(async move {
    notify2.notified().await;
    println!("received notification");
});

println!("sending notification");
notify.notify_one();
源代码

pub fn notify_last(&self)

通知最后一个等待任务。

此函数的行为类似于 notify_one。唯一的区别是它唤醒最近添加的等待者,而不是最早的等待者。

有关更多信息和示例,请参阅 notify_one() 文档。

源代码

pub fn notify_waiters(&self)

通知所有等待任务。

如果有任务当前正在等待,则会通知该任务。与 notify_one() 不同,此方法不会存储许可供下一次调用 notified().await 使用。此方法的目的是通知所有已注册的等待者。通过调用 notified() 获取 Notified future 实例即可完成通知注册。

§示例
use tokio::sync::Notify;
use std::sync::Arc;

let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

let notified1 = notify.notified();
let notified2 = notify.notified();

let handle = tokio::spawn(async move {
    println!("sending notifications");
    notify2.notify_waiters();
});

notified1.await;
notified2.await;
println!("received notifications");

trait 实现§

源代码§

impl Debug for Notify

源代码§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息
源代码§

impl 默认值 for Notify

源代码§

fn default() -> Notify

Returns the “default value” for a type. 更多信息
源代码§

impl RefUnwindSafe for Notify

源代码§

impl UnwindSafe for Notify

自动 trait 实现§

§

impl !Freeze for Notify

§

impl Send for Notify

§

impl Sync for Notify

§

impl Unpin for Notify

§

impl UnsafeUnpin for Notify

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。