pub struct Receiver<T> { /* 私有字段 */ }展开描述
从关联的 Sender 接收一个值。
一对 Sender 和 Receiver 由 channel 函数创建。
此通道没有 recv 方法,因为接收者本身实现了 Future trait。要接收 Result<T, error::RecvError>,请直接 .await Receiver 对象。
即使消息已发送,Future trait 上的 poll 方法也可能伪返回 Poll::Pending。如果发生此类伪失败,则当伪失败已解决时,调用者将被唤醒,以便调用者可以再次尝试接收消息。请注意,收到此类唤醒并不能保证下一次调用会成功——它可能以另一个伪失败而失败。(伪失败并不意味着消息已丢失。它只是被延迟了。)
§Cancellation safety
Receiver 可安全取消。如果在 tokio::select! 语句中将其用作事件,并且其他某个分支先完成,则可以保证此通道上未接收到任何消息。
§示例
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
if let Err(_) = tx.send(3) {
println!("the receiver dropped");
}
});
match rx.await {
Ok(v) => println!("got = {:?}", v),
Err(_) => println!("the sender dropped"),
}如果 sender 在未发送的情况下被 drop,则 receiver 将失败并返回 error::RecvError:
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel::<u32>();
tokio::spawn(async move {
drop(tx);
});
match rx.await {
Ok(_) => panic!("This doesn't happen"),
Err(_) => println!("the sender dropped"),
}要在 tokio::select! 循环中使用 Receiver,请在通道前面添加 &mut。
use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};
let (send, mut recv) = oneshot::channel();
let mut interval = interval(Duration::from_millis(100));
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
send.send("shut down").unwrap();
});
loop {
tokio::select! {
_ = interval.tick() => println!("Another 100ms"),
msg = &mut recv => {
println!("Got message: {}", msg.unwrap());
break;
}
}
}实现§
源代码§impl<T> Receiver<T>
impl<T> Receiver<T>
源代码pub fn close(&mut self)
pub fn close(&mut self)
阻止关联的 Sender 句柄发送值。
调用 close 之后发生的任何 send 操作都保证会失败。调用 close 后,如果有一个值在调用 close 之前已发送,则应调用 try_recv 来接收该值。
此函数可用于执行优雅关闭,并确保不会将值发送到通道中而永远不会被接收。
如果已接收到消息或通道已关闭,则 close 是空操作。
§示例
防止值被发送
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
let (tx, mut rx) = oneshot::channel();
assert!(!tx.is_closed());
rx.close();
assert!(tx.is_closed());
assert!(tx.send("never received").is_err());
match rx.try_recv() {
Err(TryRecvError::Closed) => {}
_ => unreachable!(),
}接收在调用 close 之前发送的值
use tokio::sync::oneshot;
let (tx, mut rx) = oneshot::channel();
assert!(tx.send("will receive").is_ok());
rx.close();
let msg = rx.try_recv().unwrap();
assert_eq!(msg, "will receive");源代码pub fn is_terminated(&self) -> bool
pub fn is_terminated(&self) -> bool
检查此接收者是否已终止。
如果此接收者已产生一个 Poll::Ready 结果,则此函数返回 true。如果是这样,则不应再对此接收者进行 poll。
§示例
发送一个值并对其轮询。
use tokio::sync::oneshot;
use std::task::Poll;
let (tx, mut rx) = oneshot::channel();
// A receiver is not terminated when it is initialized.
assert!(!rx.is_terminated());
// A receiver is not terminated it is polled and is still pending.
let poll = futures::poll!(&mut rx);
assert_eq!(poll, Poll::Pending);
assert!(!rx.is_terminated());
// A receiver is not terminated if a value has been sent, but not yet read.
tx.send(0).unwrap();
assert!(!rx.is_terminated());
// A receiver *is* terminated after it has been polled and yielded a value.
assert_eq!((&mut rx).await, Ok(0));
assert!(rx.is_terminated());关闭 sender。
use tokio::sync::oneshot;
let (tx, mut rx) = oneshot::channel::<()>();
// A receiver is not immediately terminated when the sender is dropped.
drop(tx);
assert!(!rx.is_terminated());
// A receiver *is* terminated after it has been polled and yielded an error.
let _ = (&mut rx).await.unwrap_err();
assert!(rx.is_terminated());源代码pub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
检查通道是否为空。
如果通道中没有消息,此方法返回 true。
对一个空的接收者(可能已产生一个值)进行 poll 不一定是安全的。请改用 is_terminated() 来检查接收者是否可以安全地 poll。
§示例
发送一个值。
use tokio::sync::oneshot;
let (tx, mut rx) = oneshot::channel();
assert!(rx.is_empty());
tx.send(0).unwrap();
assert!(!rx.is_empty());
let _ = (&mut rx).await;
assert!(rx.is_empty());关闭 sender。
use tokio::sync::oneshot;
let (tx, mut rx) = oneshot::channel::<()>();
// A channel is empty if the sender is dropped.
drop(tx);
assert!(rx.is_empty());
// A closed channel still yields an error, however.
(&mut rx).await.expect_err("should yield an error");
assert!(rx.is_empty());已终止的通道为空。
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, mut rx) = oneshot::channel();
tx.send(0).unwrap();
let _ = (&mut rx).await;
// NB: an empty channel is not necessarily safe to poll!
assert!(rx.is_empty());
let _ = (&mut rx).await;
}源代码pub fn try_recv(&mut self) -> Result<T, TryRecvError>
pub fn try_recv(&mut self) -> Result<T, TryRecvError>
尝试接收一个值。
如果通道中存在挂起的值,则返回该值。如果尚未发送任何值,则当前任务不会被注册以接收未来的通知。
此函数在异步任务上下文之外调用时非常有用。
请注意,与 poll 方法不同,try_recv 方法不会伪失败。在此次调用 try_recv 之前发生的任何发送或关闭事件都将正确返回给调用者。
§返回值
Ok(T)if a value is pending in the channel.Err(TryRecvError::Empty)if no value has been sent yet.Err(TryRecvError::Closed)if the sender has dropped without sending a value, or if the message has already been received.
§示例
在值发送之前调用 try_recv,然后在值发送之后调用。
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
let (tx, mut rx) = oneshot::channel();
match rx.try_recv() {
// The channel is currently empty
Err(TryRecvError::Empty) => {}
_ => unreachable!(),
}
// Send a value
tx.send("hello").unwrap();
match rx.try_recv() {
Ok(value) => assert_eq!(value, "hello"),
_ => unreachable!(),
}当 sender 在发送值之前被 drop 时调用 try_recv
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
let (tx, mut rx) = oneshot::channel::<()>();
drop(tx);
match rx.try_recv() {
// The channel will never receive a value.
Err(TryRecvError::Closed) => {}
_ => unreachable!(),
}源代码pub fn blocking_recv(self) -> Result<T, RecvError>
pub fn blocking_recv(self) -> Result<T, RecvError>
阻塞接收,可在异步上下文之外调用。
§恐慌
如果在异步执行上下文中调用,此函数会发生 panic。
§示例
use std::thread;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<u8>();
let sync_code = thread::spawn(move || {
assert_eq!(Ok(10), rx.blocking_recv());
});
let _ = tx.send(10);
sync_code.join().unwrap();
}