跳到主要内容

Receiver

搜索

结构体 Receiver 

源代码
pub struct Receiver<T> { /* 私有字段 */ }
展开描述

broadcast channel 的接收端。

不能并发使用。可以使用 recv 检索消息。

要将此接收器转换为 Stream,可以使用 BroadcastStream 包装器。

§示例

use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tokio::spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
});

tokio::spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), 10);
    assert_eq!(rx2.recv().await.unwrap(), 20);
});

tx.send(10).unwrap();
tx.send(20).unwrap();

实现§

源代码§

impl<T> Receiver<T>

源代码

pub fn len(&self) -> usize

返回已发送到 channel 中但此 Receiver 尚未接收的消息数。

如果 len 返回的值大于 channel 容量的下一个最大 2 的幂,则对 recv 的任何调用都将返回 Err(RecvError::Lagged),并且对 try_recv 的任何调用都将返回 Err(TryRecvError::Lagged)。例如,如果 channel 的容量为 10,则一旦 len 返回大于 16 的值,recv 将开始返回 Err(RecvError::Lagged)

§示例
use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);

tx.send(10).unwrap();
tx.send(20).unwrap();

assert_eq!(rx1.len(), 2);
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.len(), 1);
assert_eq!(rx1.recv().await.unwrap(), 20);
assert_eq!(rx1.len(), 0);
源代码

pub fn is_empty(&self) -> bool

如果 channel 中没有任何 Receiver 尚未接收的消息,则返回 true。

§示例
use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);

assert!(rx1.is_empty());

tx.send(10).unwrap();
tx.send(20).unwrap();

assert!(!rx1.is_empty());
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
assert!(rx1.is_empty());
源代码

pub fn same_channel(&self, other: &Self) -> bool

如果这些 receiver 属于同一个通道,则返回 true

§示例
use tokio::sync::broadcast;

let (tx, rx) = broadcast::channel::<()>(16);
let rx2 = tx.subscribe();

assert!(rx.same_channel(&rx2));

let (_tx3, rx3) = broadcast::channel::<()>(16);

assert!(!rx3.same_channel(&rx2));
源代码

pub fn sender_strong_count(&self) -> usize

返回 Sender 句柄的数量。

源代码

pub fn sender_weak_count(&self) -> usize

返回 WeakSender 句柄的数量。

源代码

pub fn is_closed(&self) -> bool

检查通道是否已关闭。

如果 channel 已关闭,则此方法返回 true。当所有 Sender 都已被 drop 时,channel 关闭。

§示例
use tokio::sync::broadcast;

let (tx, rx) = broadcast::channel::<()>(10);
assert!(!rx.is_closed());

drop(tx);

assert!(rx.is_closed());
源代码§

impl<T: Clone> Receiver<T>

源代码

pub fn resubscribe(&self) -> Self

从当前尾部元素开始重新订阅 channel。

Receiver 句柄将接收在重新订阅之后发送的所有值的克隆。这不包括当前接收器队列中的元素。请考虑以下示例。

§示例
use tokio::sync::broadcast;

let (tx, mut rx) = broadcast::channel(2);

tx.send(1).unwrap();
let mut rx2 = rx.resubscribe();
tx.send(2).unwrap();

assert_eq!(rx2.recv().await.unwrap(), 2);
assert_eq!(rx.recv().await.unwrap(), 1);
源代码

pub async fn recv(&mut self) -> Result<T, RecvError>

接收此接收器的下一个值。

每个 Receiver 句柄将接收在订阅之后发送的所有值的克隆。

当所有 Sender 端都已被 drop 时,返回 Err(RecvError::Closed),表示无法再在 channel 上发送任何值。

如果 Receiver 句柄滞后,一旦 channel 已满,新发送的值将覆盖旧值。此时,对 recv 的调用将返回 Err(RecvError::Lagged),并且 Receiver 的内部游标将更新为指向 channel 中仍保存的最旧值。随后对 recv 的调用将返回此值,除非它此后已被覆盖。

§取消安全性

此方法可安全取消。如果 recvtokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。

§示例
use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tokio::spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
});

tokio::spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), 10);
    assert_eq!(rx2.recv().await.unwrap(), 20);
});

tx.send(10).unwrap();
tx.send(20).unwrap();

处理延迟

use tokio::sync::broadcast;

let (tx, mut rx) = broadcast::channel(2);

tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();

// The receiver lagged behind
assert!(rx.recv().await.is_err());

// At this point, we can abort or continue with lost messages

assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());
源代码

pub fn try_recv(&mut self) -> Result<T, TryRecvError>

尝试在不等待的情况下返回此接收器上的待处理值。

这对于在决定等待接收器之前的"乐观检查"很有用。

recv 相比,此函数有三种失败情况而不是两种(关闭、空缓冲区和滞后接收器各一种)。

当所有 Sender 端都已被 drop 时,返回 Err(TryRecvError::Closed),表示无法再在 channel 上发送任何值。

如果 Receiver 句柄滞后,一旦 channel 已满,新发送的值将覆盖旧值。此时,对 recv 的调用将返回 Err(TryRecvError::Lagged),并且 Receiver 的内部游标将更新为指向 channel 中仍保存的最旧值。随后对 try_recv 的调用将返回此值,除非它此后已被覆盖。如果没有可接收的值,则返回 Err(TryRecvError::Empty)

§示例
use tokio::sync::broadcast;

let (tx, mut rx) = broadcast::channel(16);

assert!(rx.try_recv().is_err());

tx.send(10).unwrap();

let value = rx.try_recv().unwrap();
assert_eq!(10, value);
源代码

pub fn blocking_recv(&mut self) -> Result<T, RecvError>

阻塞接收,可在异步上下文之外调用。

§恐慌

如果在异步执行上下文中调用,此函数会发生 panic。

§示例
use std::thread;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel(16);

    let sync_code = thread::spawn(move || {
        assert_eq!(rx.blocking_recv(), Ok(10));
    });

    let _ = tx.send(10);
    sync_code.join().unwrap();
}

trait 实现§

源代码§

impl<T> Debug for Receiver<T>

源代码§

fn fmt(&self, fmt: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息
源代码§

impl<T> Drop for Receiver<T>

源代码§

fn drop(&mut self)

执行此类型的析构函数。 更多信息

自动 trait 实现§

§

impl<T> Freeze for Receiver<T>

§

impl<T> RefUnwindSafe for Receiver<T>

§

impl<T> Send for Receiver<T>
where T: Send,

§

impl<T> Sync for Receiver<T>
where T: Send,

§

impl<T> Unpin for Receiver<T>

§

impl<T> UnsafeUnpin for Receiver<T>

§

impl<T> UnwindSafe for Receiver<T>

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。