跳到主要内容

Sender

搜索

结构体 Sender 

源代码
pub struct Sender<T> { /* 私有字段 */ }
展开描述

向关联的 Receiver 发送值。

实例由 channel 函数创建。

若要将 Sender 转换为 Sink 或在 poll 函数中使用它,可以使用 PollSender 工具。

实现§

源代码§

impl<T> Sender<T>

源代码

pub async fn send(&self, value: T) -> Result<(), SendError<T>>

发送一个值,一直等待直到有可用容量。

当确定通道的另一端尚未挂起时,发送成功。不成功的发送指的是相应的接收端已被关闭的情况。请注意,返回 Err 意味着数据将永远不会被接收,但返回 Ok 并不意味着数据一定会被接收。完全有可能在此函数返回 Ok 后,相应的接收端立即挂起。

§错误

如果通道的接收半部分已关闭(无论是由于调用了 close,还是由于 Receiver 句柄被 drop),则函数返回错误。错误中包含传递给 send 的值。

§取消安全性

如果在 tokio::select! 语句中将 send 用作事件,并且其他某个分支先完成,那么可以保证该消息未被发送。但是,在这种情况下,消息会被丢弃并丢失。

为避免消息丢失,请使用 reserve 预留容量,然后使用返回的 Permit 发送消息。

此通道使用一个队列来确保对 sendreserve 的调用按其请求顺序完成。取消对 send 的调用会使你失去在队列中的位置。

§示例

在以下示例中,每次调用 send 都会阻塞,直到先前发送的值被接收。

use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
    for i in 0..10 {
        if let Err(_) = tx.send(i).await {
            println!("receiver dropped");
            return;
        }
    }
});

while let Some(i) = rx.recv().await {
    println!("got = {}", i);
}
源代码

pub async fn closed(&self)

当 receiver 被 drop 时完成。

这允许生产者在对所产生值的兴趣被取消时收到通知,并立即停止工作。

§取消安全性

此方法可安全取消。一旦通道关闭,它将永远保持关闭状态,并且所有对 closed 的未来调用都将立即返回。

§示例
use tokio::sync::mpsc;

let (tx1, rx) = mpsc::channel::<()>(1);
let tx2 = tx1.clone();
let tx3 = tx1.clone();
let tx4 = tx1.clone();
let tx5 = tx1.clone();
tokio::spawn(async move {
    drop(rx);
});

futures::join!(
    tx1.closed(),
    tx2.closed(),
    tx3.closed(),
    tx4.closed(),
    tx5.closed()
);
println!("Receiver dropped");
源代码

pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>>

尝试在此 Sender 上立即发送一条消息。

此方法与 send 的区别在于:如果通道的缓冲区已满或没有接收者等待获取数据,则会立即返回。与 send 相比,此函数有两种失败情况而不是一种(一种表示断开连接,一种表示缓冲区已满)。

§错误

如果已达到通道容量,即通道有 n 个已缓冲的值,其中 n 是传递给 channel 的参数,则会返回一个错误。

如果通道的接收半部分已关闭(无论是由于调用了 close,还是由于 Receiver 句柄被 drop),则函数返回错误。错误中包含传递给 send 的值。

§示例
use tokio::sync::mpsc;

// Create a channel with buffer size 1
let (tx1, mut rx) = mpsc::channel(1);
let tx2 = tx1.clone();

tokio::spawn(async move {
    tx1.send(1).await.unwrap();
    tx1.send(2).await.unwrap();
    // task waits until the receiver receives a value.
});

tokio::spawn(async move {
    // This will return an error and send
    // no message if the buffer is full
    let _ = tx2.try_send(3);
});

let mut msg;
msg = rx.recv().await.unwrap();
println!("message {} received", msg);

msg = rx.recv().await.unwrap();
println!("message {} received", msg);

// Third message may have never been sent
match rx.recv().await {
    Some(msg) => println!("message {} received", msg),
    None => println!("the third message was never sent"),
}
源代码

pub async fn send_timeout( &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>>

发送一个值,一直等待直到有可用容量,但仅限于有限的时间。

send 共享相同的成功和错误条件,并增加了一个不成功发送的条件:当所提供的超时已过且没有可用容量时。

§错误

如果通道的接收端已关闭——无论是由于调用了 close 还是 Receiver 被 drop——函数都会返回一个错误。该错误包含传递给 send 的值。

§恐慌

如果在没有启用时间功能的 Tokio 运行时上下文之外调用此函数,则会发生 panic。

§示例

在以下示例中,每次调用 send_timeout 都会阻塞,直到先前发送的值被接收,除非超时已过。

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

let (tx, mut rx) = mpsc::channel(1);

tokio::spawn(async move {
    for i in 0..10 {
        if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
            println!("send error: #{:?}", e);
            return;
        }
    }
});

while let Some(i) = rx.recv().await {
    println!("got = {}", i);
    sleep(Duration::from_millis(200)).await;
}
源代码

pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>>

用于在异步上下文之外调用的阻塞发送。

此方法适用于从同步代码向异步代码发送数据的用例,即使接收端未使用 blocking_recv 来接收消息也能正常工作。

§恐慌

如果在异步执行上下文中调用,此函数会发生 panic。

§示例
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

fn main() {
    let (tx, mut rx) = mpsc::channel::<u8>(1);

    let sync_code = thread::spawn(move || {
        tx.blocking_send(10).unwrap();
    });

    Runtime::new().unwrap().block_on(async move {
        assert_eq!(Some(10), rx.recv().await);
    });
    sync_code.join().unwrap()
}
源代码

pub fn is_closed(&self) -> bool

检查通道是否已关闭。当 Receiver 被 drop 时,或调用 Receiver::close 方法时,会发生这种情况。

let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
assert!(!tx.is_closed());

let tx2 = tx.clone();
assert!(!tx2.is_closed());

drop(rx);
assert!(tx.is_closed());
assert!(tx2.is_closed());
源代码

pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>>

等待通道容量。一旦有一个消息的发送容量可用,就会为调用者预留。

如果通道已满,此函数会等待未接收消息的数量小于通道容量。发送一个消息的容量将为调用者预留。返回一个 Permit 来跟踪已预留的容量。Permit 上的 send 函数会消费预留的容量。

在不发送消息的情况下 drop Permit 会将容量释放回通道。

§取消安全性

此通道使用一个队列来确保对 sendreserve 的调用按其请求顺序完成。取消对 reserve 的调用会使你失去在队列中的位置。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(1);

// 预留容量
let permit = tx.reserve().await.unwrap();

// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());

// 在 permit 上发送成功
permit.send(456);

// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
源代码

pub async fn reserve_many( &self, n: usize, ) -> Result<PermitIterator<'_, T>, SendError<()>>

等待通道容量。一旦有发送 n 条消息的容量可用,就会为调用者预留。

如果通道已满或可用许可少于 n 个,函数会等待未接收消息的数量小于通道容量 n 个。发送 n 条消息的容量随后将为调用者预留。

返回一个 PermitIterator 来跟踪已预留的容量。你可以调用此 Iterator 直到它耗尽,以获取一个 Permit,然后调用 Permit::send。此函数类似于 try_reserve_many,不同之处在于它会等待槽位变为可用。

如果通道已关闭,此函数将返回 SendError

在不完整消费的情况下 drop PermitIterator 会将剩余的许可释放回通道。

§取消安全性

此通道使用一个队列来确保对 sendreserve_many 的调用按其请求顺序完成。取消对 reserve_many 的调用会使你失去在队列中的位置。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(2);

// 预留容量
let mut permit = tx.reserve_many(2).await.unwrap();

// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());

// Sending with the permit iterator succeeds
permit.next().unwrap().send(456);
permit.next().unwrap().send(457);

// The iterator should now be exhausted
assert!(permit.next().is_none());

// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
assert_eq!(rx.recv().await.unwrap(), 457);
源代码

pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>>

等待通道容量,移动 Sender 并返回一个拥有的 permit。一旦有一个消息的发送容量可用,就会为调用者预留。

此方法按值移动 sender,并返回一个可用于向通道发送消息的拥有的 permit。与 Sender::reserve 不同,此方法可用于 permit 必须对 'static 生命周期有效的场景。Sender 可以廉价地克隆(Sender::clone 本质上是一个引用计数增加,可与 Arc::clone 媲美),因此在需要多个 OwnedPermit 或无法移动 Sender 时,可以在调用 reserve_owned 之前对其进行克隆。

如果通道已满,此函数会等待未接收消息的数量小于通道容量。发送一个消息的容量将为调用者预留。返回一个 OwnedPermit 来跟踪已预留的容量。OwnedPermit 上的 send 函数会消费预留的容量。

在不发送消息的情况下 drop OwnedPermit 会将容量释放回通道。

§取消安全性

此通道使用一个队列来确保对 sendreserve 的调用按其请求顺序完成。取消对 reserve_owned 的调用会使你失去在队列中的位置。

§示例

使用 OwnedPermit 发送消息:

use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(1);

// Reserve capacity, moving the sender.
let permit = tx.reserve_owned().await.unwrap();

// Send a message, consuming the permit and returning
// the moved sender.
let tx = permit.send(123);

// The value sent on the permit is received.
assert_eq!(rx.recv().await.unwrap(), 123);

// The sender can now be used again.
tx.send(456).await.unwrap();

当需要多个 OwnedPermit,或者 sender 无法按值移动时,可以在调用 reserve_owned 之前廉价地克隆它:

use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(1);

// Clone the sender and reserve capacity.
let permit = tx.clone().reserve_owned().await.unwrap();

// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());

// Sending on the permit succeeds.
permit.send(456);

// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
源代码

pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>>

尝试在通道中获取一个槽位,而不会等待槽位变为可用。

如果通道已满,此函数将返回 TrySendError;否则,如果有可用槽位,它将返回一个 Permit,从而允许你在保证有槽位的情况下在通道上 send。此函数类似于 reserve,不同之处在于它不会等待槽位变为可用。

在不发送消息的情况下 drop Permit 会将容量释放回通道。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(1);

// 预留容量
let permit = tx.try_reserve().unwrap();

// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());

// Trying to reserve an additional slot on the `tx` will
// fail because there is no capacity.
assert!(tx.try_reserve().is_err());

// 在 permit 上发送成功
permit.send(456);

// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
源代码

pub fn try_reserve_many( &self, n: usize, ) -> Result<PermitIterator<'_, T>, TrySendError<()>>

尝试在通道中获取 n 个槽位,而不会等待槽位变为可用。

返回一个 PermitIterator 来跟踪已预留的容量。你可以调用此 Iterator 直到它耗尽,以获取一个 Permit,然后调用 Permit::send。此函数类似于 reserve_many,不同之处在于它不会等待槽位变为可用。

如果通道上可用许可少于 n 个,则此函数将返回 TrySendError::Full。如果通道已关闭,此函数将返回 TrySendError::Closed

在不完整消费的情况下 drop PermitIterator 会将剩余的许可释放回通道。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(2);

// 预留容量
let mut permit = tx.try_reserve_many(2).unwrap();

// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());

// Trying to reserve an additional slot on the `tx` will
// fail because there is no capacity.
assert!(tx.try_reserve().is_err());

// Sending with the permit iterator succeeds
permit.next().unwrap().send(456);
permit.next().unwrap().send(457);

// The iterator should now be exhausted
assert!(permit.next().is_none());

// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
assert_eq!(rx.recv().await.unwrap(), 457);

// Trying to call try_reserve_many with 0 will return an empty iterator
let mut permit = tx.try_reserve_many(0).unwrap();
assert!(permit.next().is_none());

// Trying to call try_reserve_many with a number greater than the channel
// capacity will return an error
let permit = tx.try_reserve_many(3);
assert!(permit.is_err());

// Trying to call try_reserve_many on a closed channel will return an error
drop(rx);
let permit = tx.try_reserve_many(1);
assert!(permit.is_err());

let permit = tx.try_reserve_many(0);
assert!(permit.is_err());
源代码

pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>>

尝试在通道中获取一个槽位,而不会等待槽位变为可用,并返回一个拥有的 permit。

此方法按值移动 sender,并返回一个可用于向通道发送消息的拥有的 permit。与 Sender::try_reserve 不同,此方法可用于 permit 必须对 'static 生命周期有效的场景。Sender 可以廉价地克隆(Sender::clone 本质上是一个引用计数增加,可与 Arc::clone 媲美),因此在需要多个 OwnedPermit 或无法移动 Sender 时,可以在调用 try_reserve_owned 之前对其进行克隆。

如果通道已满,此函数将返回 TrySendError。由于 sender 是按值获取的,因此在这种情况下返回的 TrySendError 包含 sender,以便可以再次使用。否则,如果有可用槽位,此方法将返回一个 OwnedPermit,然后可用于在保证有槽位的情况下在通道上 send。此函数类似于 reserve_owned,不同之处在于它不会等待槽位变为可用。

在不发送消息的情况下 drop OwnedPermit 会将容量释放回通道。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(1);

// 预留容量
let permit = tx.clone().try_reserve_owned().unwrap();

// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());

// Trying to reserve an additional slot on the `tx` will
// fail because there is no capacity.
assert!(tx.try_reserve().is_err());

// 在 permit 上发送成功
permit.send(456);

// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
源代码

pub fn same_channel(&self, other: &Self) -> bool

如果这些 sender 属于同一个通道,则返回 true

§示例
let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
let  tx2 = tx.clone();
assert!(tx.same_channel(&tx2));

let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
assert!(!tx3.same_channel(&tx2));
源代码

pub fn capacity(&self) -> usize

返回通道的当前容量。

容量在通过调用 send 发送值或通过 reserve 预留容量时减少。容量在 Receiver 接收值时增加。这与 max_capacity 不同,后者始终返回最初调用 channel 时指定的缓冲区容量。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel::<()>(5);

assert_eq!(tx.capacity(), 5);

// Making a reservation drops the capacity by one.
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.capacity(), 4);

// Sending and receiving a value increases the capacity by one.
permit.send(());
rx.recv().await.unwrap();
assert_eq!(tx.capacity(), 5);
源代码

pub fn downgrade(&self) -> WeakSender<T>

Sender 转换为 WeakSender,该 WeakSender 不计入 RAII 语义,即如果通道的所有 Sender 实例都已 drop,仅剩 WeakSender 实例,则通道将关闭。

源代码

pub fn max_capacity(&self) -> usize

返回通道的最大缓冲区容量。

最大容量是最初调用 channel 时指定的缓冲区容量。这与 capacity 不同,后者返回当前可用的缓冲区容量:随着消息的发送和接收,capacity 返回的值会上下波动,而 max_capacity 返回的值则保持不变。

§示例
use tokio::sync::mpsc;

let (tx, _rx) = mpsc::channel::<()>(5);

// both max capacity and capacity are the same at first
assert_eq!(tx.max_capacity(), 5);
assert_eq!(tx.capacity(), 5);

// Making a reservation doesn't change the max capacity.
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.max_capacity(), 5);
// but drops the capacity by one
assert_eq!(tx.capacity(), 4);
源代码

pub fn strong_count(&self) -> usize

返回 Sender 句柄的数量。

源代码

pub fn weak_count(&self) -> usize

返回 WeakSender 句柄的数量。

trait 实现§

源代码§

impl<T> Clone for Sender<T>

源代码§

fn clone(&self) -> Self

返回值的副本。 更多信息
1.0.0 · 源代码§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. 更多信息
源代码§

impl<T> Debug for Sender<T>

源代码§

fn fmt(&self, fmt: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息

自动 trait 实现§

§

impl<T> Freeze for Sender<T>

§

impl<T> RefUnwindSafe for Sender<T>

§

impl<T> Send for Sender<T>
where T: Send,

§

impl<T> Sync for Sender<T>
where T: Send,

§

impl<T> Unpin for Sender<T>

§

impl<T> UnsafeUnpin for Sender<T>

§

impl<T> UnwindSafe for Sender<T>

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> CloneToUninit for T
where T: Clone,

源代码§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<T> ToOwned for T
where T: Clone,

源代码§

type Owned = T

获得所有权后的类型。
源代码§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. 更多信息
源代码§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. 更多信息
源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。