pub struct Notify { /* 私有字段 */ }展开描述
通知单个任务唤醒。
Notify 提供了一种用于向单个任务通知事件的基本机制。Notify 本身不携带任何数据。相反,它用于向另一个任务发出执行操作的信号。
可以将 Notify 视为一个以 0 个许可开始的 Semaphore。notified().await 方法等待许可变为可用,而 notify_one() 在当前没有可用许可时设置一个许可。
Notify 的同步细节类似于 std 中的 thread::park 和 Thread::unpark。Notify 值包含单个许可。notified().await 等待许可可用,消费该许可,然后恢复运行。notify_one() 设置许可,如果有挂起的任务则唤醒它。
如果在 notified().await 之前调用 notify_one(),则下一次调用 notified().await 将立即完成,消费该许可。对 notified().await 的任何后续调用都将等待新的许可。
如果在 notified().await 之前多次调用 notify_one(),则仅存储单个许可。下一次调用 notified().await 将立即完成,但再下一次调用将等待新的许可。
§示例
基本用法。
use tokio::sync::Notify;
use std::sync::Arc;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let handle = tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify_one();
// Wait for task to receive notification.
handle.await.unwrap();无界多生产者单消费者 (mpsc) 通道。
使用此通道时不会丢失任何唤醒,因为对 notify_one() 的调用会将许可存储在 Notify 中,随后的 notified() 调用将消费该许可。
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
values: Mutex<VecDeque<T>>,
notify: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, value: T) {
self.values.lock().unwrap()
.push_back(value);
// Notify the consumer a value is available
self.notify.notify_one();
}
// This is a single-consumer channel, so several concurrent calls to
// `recv` are not allowed.
pub async fn recv(&self) -> T {
loop {
// Drain values
if let Some(value) = self.values.lock().unwrap().pop_front() {
return value;
}
// Wait for values to be available
self.notify.notified().await;
}
}
}无界多生产者多消费者 (mpmc) 通道。
调用 enable 很重要,因为否则如果你并行调用两个 recv 和两个 send,则可能会发生以下情况:
- Both calls to
try_recvreturnNone. - Both new elements are added to the vector.
- The
notify_onemethod is called twice, adding only a single permit to theNotify. - Both calls to
recvreach theNotifiedfuture. One of them consumes the permit, and the other sleeps forever.
如果在 try_recv 之前通过调用 enable 将 Notified future 添加到列表中,则第三步中的 notify_one 调用会从列表中移除这些 future 并将它们标记为已通知,而不是向 Notify 添加许可。这确保了两个 future 都会被唤醒。
请注意,只有在存在两个对 recv 的并发调用时才会发生此失败。这就是为什么上面的 mpsc 示例不需要调用 enable。
use tokio::sync::Notify;
use std::collections::VecDeque;
use std::sync::Mutex;
struct Channel<T> {
messages: Mutex<VecDeque<T>>,
notify_on_sent: Notify,
}
impl<T> Channel<T> {
pub fn send(&self, msg: T) {
let mut locked_queue = self.messages.lock().unwrap();
locked_queue.push_back(msg);
drop(locked_queue);
// Send a notification to one of the calls currently
// waiting in a call to `recv`.
self.notify_on_sent.notify_one();
}
pub fn try_recv(&self) -> Option<T> {
let mut locked_queue = self.messages.lock().unwrap();
locked_queue.pop_front()
}
pub async fn recv(&self) -> T {
let future = self.notify_on_sent.notified();
tokio::pin!(future);
loop {
// Make sure that no wakeup is lost if we get
// `None` from `try_recv`.
future.as_mut().enable();
if let Some(msg) = self.try_recv() {
return msg;
}
// Wait for a call to `notify_one`.
//
// This uses `.as_mut()` to avoid consuming the future,
// which lets us call `Pin::set` below.
future.as_mut().await;
// Reset the future in case another call to
// `try_recv` got the message before us.
future.set(self.notify_on_sent.notified());
}
}
}实现§
源代码§impl Notify
impl Notify
源代码pub const fn const_new() -> Notify
pub const fn const_new() -> Notify
创建一个新的 Notify,初始化时没有许可。
当使用 tracing 不稳定特性时,使用 const_new 创建的 Notify 将不会被检测。因此,它将不会出现在 tokio-console 中。如果需要,应改用 Notify::new 创建一个已检测的对象。
§示例
use tokio::sync::Notify;
static NOTIFY: Notify = Notify::const_new();源代码pub fn notified(&self) -> Notified<'_> ⓘ
pub fn notified(&self) -> Notified<'_> ⓘ
等待通知。
等效于:
async fn notified(&self);每个 Notify 值都持有一个许可。如果通过先前调用 notify_one() 已有可用许可,则 notified().await 将立即完成,消费该许可。否则,notified().await 将等待通过下一次调用 notify_one() 提供许可。
如果 Notified future 尚未被 poll,则不能保证它会收到来自对 notify_one() 调用的唤醒。有关更多详细信息,请参阅 Notified::enable() 的文档。
Notified future 保证在创建后立即就能收到来自 notify_waiters() 的唤醒,即使它尚未被 poll。
§取消安全性
此方法使用一个队列来按请求顺序公平地分发通知。取消对 notified 的调用会使你失去在队列中的位置。
§示例
use tokio::sync::Notify;
use std::sync::Arc;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify_one();源代码pub fn notified_owned(self: Arc<Self>) -> OwnedNotified ⓘ
pub fn notified_owned(self: Arc<Self>) -> OwnedNotified ⓘ
使用拥有的 Future 等待通知。
与返回绑定到 Notify 生命周期的 future 的 Self::notified 不同,notified_owned 创建了一个拥有其通知状态的独立 future,使其可以在线程之间安全移动。
有关更多详细信息,请参阅 Self::notified。
§取消安全性
此方法使用一个队列来按请求顺序公平地分发通知。取消对 notified_owned 的调用会使你失去在队列中的位置。
§示例
use std::sync::Arc;
use tokio::sync::Notify;
let notify = Arc::new(Notify::new());
for _ in 0..10 {
let notified = notify.clone().notified_owned();
tokio::spawn(async move {
notified.await;
println!("received notification");
});
}
println!("sending notification");
notify.notify_waiters();源代码pub fn notify_one(&self)
pub fn notify_one(&self)
通知第一个等待任务。
如果有任务当前正在等待,则会通知该任务。否则,许可将存储在此 Notify 值中,并且下一次调用 notified().await 将立即完成,消费此次调用 notify_one() 提供的许可。
Notify 最多可以存储一个许可。对 notify_one 的多次顺序调用将导致存储单个许可。下一次调用 notified().await 将立即完成,但再下一次调用将等待。
§示例
use tokio::sync::Notify;
use std::sync::Arc;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});
println!("sending notification");
notify.notify_one();源代码pub fn notify_last(&self)
pub fn notify_last(&self)
源代码pub fn notify_waiters(&self)
pub fn notify_waiters(&self)
通知所有等待任务。
如果有任务当前正在等待,则会通知该任务。与 notify_one() 不同,此方法不会存储许可供下一次调用 notified().await 使用。此方法的目的是通知所有已注册的等待者。通过调用 notified() 获取 Notified future 实例即可完成通知注册。
§示例
use tokio::sync::Notify;
use std::sync::Arc;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
let notified1 = notify.notified();
let notified2 = notify.notified();
let handle = tokio::spawn(async move {
println!("sending notifications");
notify2.notify_waiters();
});
notified1.await;
notified2.await;
println!("received notifications");