跳到主要内容

Module broadcast

搜索

Module broadcast 

源代码
展开描述

多生产者、多消费者的广播队列。每个发送的值会被所有消费者看到。

Sender 用于将值广播给所有连接的 ReceiverSender 句柄是可克隆的,允许并发发送和接收操作。只要 TSendSenderReceiver 都实现 SendSync

当发送一个值时,所有 Receiver 句柄都将收到通知并接收该值。该值在 channel 内存储一次,并根据需要为每个接收器克隆。一旦所有接收器都收到了该值的克隆,该值就会从 channel 中释放。

通过调用 channel 创建 channel,指定 channel 在任何给定时间可以保留的最大消息数。

通过调用 Sender::subscribe 创建新的 Receiver 句柄。返回的 Receiver 将接收在调用 subscribe 之后发送的值。

此 channel 也适用于单生产者多消费者用例,其中单个 sender 向许多 receiver 广播值。

§Lagging

由于发送的消息必须保留到所有 Receiver 句柄都接收到一个克隆,因此 broadcast channel 容易出现"慢接收者"问题。在这种情况下,除一个接收者外,所有接收者都能够以发送速率接收值。因为一个接收者停滞不前,channel 开始填满。

此 broadcast channel 实现通过设置 channel 在任何给定时间可以保留的值的硬上限来处理这种情况。此上限作为参数传递给 channel 函数。

如果 channel 达到容量时发送一个值,则会释放 channel 当前持有的最旧值。这为新值腾出了空间。任何尚未看到已释放值的接收器在下一次调用 recv 时将返回 RecvError::Lagged

一旦返回 RecvError::Lagged,滞后接收器的位置将更新为 channel 中包含的最旧值。下一次调用 recv 将返回此值。

此行为使接收器能够检测到何时已经滞后到数据已被丢弃的程度。调用者可以决定如何响应:要么中止其任务,要么容忍丢失的消息并恢复 channel 的消费。

§Closing

所有 Sender 句柄都已被 drop 时,不能再发送新值。此时,channel 已"关闭"。一旦接收器已接收 channel 保留的所有值,则下次调用 recv 将返回 RecvError::Closed

Receiver 句柄被 drop 时,接收器尚未读取的任何消息都将标记为已读。如果该接收器是唯一尚未读取该消息的接收器,则此时该消息将被丢弃。

§示例

基本用法

use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tokio::spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
});

tokio::spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), 10);
    assert_eq!(rx2.recv().await.unwrap(), 20);
});

tx.send(10).unwrap();
tx.send(20).unwrap();

处理延迟

use tokio::sync::broadcast;

let (tx, mut rx) = broadcast::channel(2);

tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();

// The receiver lagged behind
assert!(rx.recv().await.is_err());

// At this point, we can abort or continue with lost messages

assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());

模块§

error
Broadcast error types

结构体§

Receiver
Receiving-half of the broadcast channel.
Sender
Sending-half of the broadcast channel.
WeakSender
A sender that does not prevent the channel from being closed.

函数§

channel
Create a bounded, multi-producer, multi-consumer channel where each sent value is broadcasted to all active receivers.