pub struct Sender<T> { /* 私有字段 */ }展开描述
实现§
源代码§impl<T> Sender<T>
impl<T> Sender<T>
源代码pub async fn send(&self, value: T) -> Result<(), SendError<T>>
pub async fn send(&self, value: T) -> Result<(), SendError<T>>
发送一个值,一直等待直到有可用容量。
当确定通道的另一端尚未挂起时,发送成功。不成功的发送指的是相应的接收端已被关闭的情况。请注意,返回 Err 意味着数据将永远不会被接收,但返回 Ok 并不意味着数据一定会被接收。完全有可能在此函数返回 Ok 后,相应的接收端立即挂起。
§错误
如果通道的接收半部分已关闭(无论是由于调用了 close,还是由于 Receiver 句柄被 drop),则函数返回错误。错误中包含传递给 send 的值。
§取消安全性
如果在 tokio::select! 语句中将 send 用作事件,并且其他某个分支先完成,那么可以保证该消息未被发送。但是,在这种情况下,消息会被丢弃并丢失。
为避免消息丢失,请使用 reserve 预留容量,然后使用返回的 Permit 发送消息。
此通道使用一个队列来确保对 send 和 reserve 的调用按其请求顺序完成。取消对 send 的调用会使你失去在队列中的位置。
§示例
在以下示例中,每次调用 send 都会阻塞,直到先前发送的值被接收。
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
for i in 0..10 {
if let Err(_) = tx.send(i).await {
println!("receiver dropped");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("got = {}", i);
}源代码pub async fn closed(&self)
pub async fn closed(&self)
当 receiver 被 drop 时完成。
这允许生产者在对所产生值的兴趣被取消时收到通知,并立即停止工作。
§取消安全性
此方法可安全取消。一旦通道关闭,它将永远保持关闭状态,并且所有对 closed 的未来调用都将立即返回。
§示例
use tokio::sync::mpsc;
let (tx1, rx) = mpsc::channel::<()>(1);
let tx2 = tx1.clone();
let tx3 = tx1.clone();
let tx4 = tx1.clone();
let tx5 = tx1.clone();
tokio::spawn(async move {
drop(rx);
});
futures::join!(
tx1.closed(),
tx2.closed(),
tx3.closed(),
tx4.closed(),
tx5.closed()
);
println!("Receiver dropped");源代码pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>>
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>>
尝试在此 Sender 上立即发送一条消息。
此方法与 send 的区别在于:如果通道的缓冲区已满或没有接收者等待获取数据,则会立即返回。与 send 相比,此函数有两种失败情况而不是一种(一种表示断开连接,一种表示缓冲区已满)。
§错误
如果已达到通道容量,即通道有 n 个已缓冲的值,其中 n 是传递给 channel 的参数,则会返回一个错误。
如果通道的接收半部分已关闭(无论是由于调用了 close,还是由于 Receiver 句柄被 drop),则函数返回错误。错误中包含传递给 send 的值。
§示例
use tokio::sync::mpsc;
// Create a channel with buffer size 1
let (tx1, mut rx) = mpsc::channel(1);
let tx2 = tx1.clone();
tokio::spawn(async move {
tx1.send(1).await.unwrap();
tx1.send(2).await.unwrap();
// task waits until the receiver receives a value.
});
tokio::spawn(async move {
// This will return an error and send
// no message if the buffer is full
let _ = tx2.try_send(3);
});
let mut msg;
msg = rx.recv().await.unwrap();
println!("message {} received", msg);
msg = rx.recv().await.unwrap();
println!("message {} received", msg);
// Third message may have never been sent
match rx.recv().await {
Some(msg) => println!("message {} received", msg),
None => println!("the third message was never sent"),
}源代码pub async fn send_timeout(
&self,
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>>
pub async fn send_timeout( &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>>
发送一个值,一直等待直到有可用容量,但仅限于有限的时间。
与 send 共享相同的成功和错误条件,并增加了一个不成功发送的条件:当所提供的超时已过且没有可用容量时。
§错误
如果通道的接收端已关闭——无论是由于调用了 close 还是 Receiver 被 drop——函数都会返回一个错误。该错误包含传递给 send 的值。
§恐慌
如果在没有启用时间功能的 Tokio 运行时上下文之外调用此函数,则会发生 panic。
§示例
在以下示例中,每次调用 send_timeout 都会阻塞,直到先前发送的值被接收,除非超时已过。
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
for i in 0..10 {
if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
println!("send error: #{:?}", e);
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("got = {}", i);
sleep(Duration::from_millis(200)).await;
}源代码pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>>
pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>>
用于在异步上下文之外调用的阻塞发送。
此方法适用于从同步代码向异步代码发送数据的用例,即使接收端未使用 blocking_recv 来接收消息也能正常工作。
§恐慌
如果在异步执行上下文中调用,此函数会发生 panic。
§示例
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
fn main() {
let (tx, mut rx) = mpsc::channel::<u8>(1);
let sync_code = thread::spawn(move || {
tx.blocking_send(10).unwrap();
});
Runtime::new().unwrap().block_on(async move {
assert_eq!(Some(10), rx.recv().await);
});
sync_code.join().unwrap()
}源代码pub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
检查通道是否已关闭。当 Receiver 被 drop 时,或调用 Receiver::close 方法时,会发生这种情况。
let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
assert!(!tx.is_closed());
let tx2 = tx.clone();
assert!(!tx2.is_closed());
drop(rx);
assert!(tx.is_closed());
assert!(tx2.is_closed());源代码pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>>
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>>
等待通道容量。一旦有一个消息的发送容量可用,就会为调用者预留。
如果通道已满,此函数会等待未接收消息的数量小于通道容量。发送一个消息的容量将为调用者预留。返回一个 Permit 来跟踪已预留的容量。Permit 上的 send 函数会消费预留的容量。
在不发送消息的情况下 drop Permit 会将容量释放回通道。
§取消安全性
此通道使用一个队列来确保对 send 和 reserve 的调用按其请求顺序完成。取消对 reserve 的调用会使你失去在队列中的位置。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(1);
// 预留容量
let permit = tx.reserve().await.unwrap();
// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());
// 在 permit 上发送成功
permit.send(456);
// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);源代码pub async fn reserve_many(
&self,
n: usize,
) -> Result<PermitIterator<'_, T>, SendError<()>>
pub async fn reserve_many( &self, n: usize, ) -> Result<PermitIterator<'_, T>, SendError<()>>
等待通道容量。一旦有发送 n 条消息的容量可用,就会为调用者预留。
如果通道已满或可用许可少于 n 个,函数会等待未接收消息的数量小于通道容量 n 个。发送 n 条消息的容量随后将为调用者预留。
返回一个 PermitIterator 来跟踪已预留的容量。你可以调用此 Iterator 直到它耗尽,以获取一个 Permit,然后调用 Permit::send。此函数类似于 try_reserve_many,不同之处在于它会等待槽位变为可用。
如果通道已关闭,此函数将返回 SendError。
在不完整消费的情况下 drop PermitIterator 会将剩余的许可释放回通道。
§取消安全性
此通道使用一个队列来确保对 send 和 reserve_many 的调用按其请求顺序完成。取消对 reserve_many 的调用会使你失去在队列中的位置。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(2);
// 预留容量
let mut permit = tx.reserve_many(2).await.unwrap();
// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());
// Sending with the permit iterator succeeds
permit.next().unwrap().send(456);
permit.next().unwrap().send(457);
// The iterator should now be exhausted
assert!(permit.next().is_none());
// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
assert_eq!(rx.recv().await.unwrap(), 457);源代码pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>>
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>>
等待通道容量,移动 Sender 并返回一个拥有的 permit。一旦有一个消息的发送容量可用,就会为调用者预留。
此方法按值移动 sender,并返回一个可用于向通道发送消息的拥有的 permit。与 Sender::reserve 不同,此方法可用于 permit 必须对 'static 生命周期有效的场景。Sender 可以廉价地克隆(Sender::clone 本质上是一个引用计数增加,可与 Arc::clone 媲美),因此在需要多个 OwnedPermit 或无法移动 Sender 时,可以在调用 reserve_owned 之前对其进行克隆。
如果通道已满,此函数会等待未接收消息的数量小于通道容量。发送一个消息的容量将为调用者预留。返回一个 OwnedPermit 来跟踪已预留的容量。OwnedPermit 上的 send 函数会消费预留的容量。
在不发送消息的情况下 drop OwnedPermit 会将容量释放回通道。
§取消安全性
此通道使用一个队列来确保对 send 和 reserve 的调用按其请求顺序完成。取消对 reserve_owned 的调用会使你失去在队列中的位置。
§示例
使用 OwnedPermit 发送消息:
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(1);
// Reserve capacity, moving the sender.
let permit = tx.reserve_owned().await.unwrap();
// Send a message, consuming the permit and returning
// the moved sender.
let tx = permit.send(123);
// The value sent on the permit is received.
assert_eq!(rx.recv().await.unwrap(), 123);
// The sender can now be used again.
tx.send(456).await.unwrap();当需要多个 OwnedPermit,或者 sender 无法按值移动时,可以在调用 reserve_owned 之前廉价地克隆它:
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(1);
// Clone the sender and reserve capacity.
let permit = tx.clone().reserve_owned().await.unwrap();
// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());
// Sending on the permit succeeds.
permit.send(456);
// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);源代码pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>>
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>>
尝试在通道中获取一个槽位,而不会等待槽位变为可用。
如果通道已满,此函数将返回 TrySendError;否则,如果有可用槽位,它将返回一个 Permit,从而允许你在保证有槽位的情况下在通道上 send。此函数类似于 reserve,不同之处在于它不会等待槽位变为可用。
在不发送消息的情况下 drop Permit 会将容量释放回通道。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(1);
// 预留容量
let permit = tx.try_reserve().unwrap();
// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());
// Trying to reserve an additional slot on the `tx` will
// fail because there is no capacity.
assert!(tx.try_reserve().is_err());
// 在 permit 上发送成功
permit.send(456);
// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
源代码pub fn try_reserve_many(
&self,
n: usize,
) -> Result<PermitIterator<'_, T>, TrySendError<()>>
pub fn try_reserve_many( &self, n: usize, ) -> Result<PermitIterator<'_, T>, TrySendError<()>>
尝试在通道中获取 n 个槽位,而不会等待槽位变为可用。
返回一个 PermitIterator 来跟踪已预留的容量。你可以调用此 Iterator 直到它耗尽,以获取一个 Permit,然后调用 Permit::send。此函数类似于 reserve_many,不同之处在于它不会等待槽位变为可用。
如果通道上可用许可少于 n 个,则此函数将返回 TrySendError::Full。如果通道已关闭,此函数将返回 TrySendError::Closed。
在不完整消费的情况下 drop PermitIterator 会将剩余的许可释放回通道。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(2);
// 预留容量
let mut permit = tx.try_reserve_many(2).unwrap();
// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());
// Trying to reserve an additional slot on the `tx` will
// fail because there is no capacity.
assert!(tx.try_reserve().is_err());
// Sending with the permit iterator succeeds
permit.next().unwrap().send(456);
permit.next().unwrap().send(457);
// The iterator should now be exhausted
assert!(permit.next().is_none());
// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
assert_eq!(rx.recv().await.unwrap(), 457);
// Trying to call try_reserve_many with 0 will return an empty iterator
let mut permit = tx.try_reserve_many(0).unwrap();
assert!(permit.next().is_none());
// Trying to call try_reserve_many with a number greater than the channel
// capacity will return an error
let permit = tx.try_reserve_many(3);
assert!(permit.is_err());
// Trying to call try_reserve_many on a closed channel will return an error
drop(rx);
let permit = tx.try_reserve_many(1);
assert!(permit.is_err());
let permit = tx.try_reserve_many(0);
assert!(permit.is_err());源代码pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>>
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>>
尝试在通道中获取一个槽位,而不会等待槽位变为可用,并返回一个拥有的 permit。
此方法按值移动 sender,并返回一个可用于向通道发送消息的拥有的 permit。与 Sender::try_reserve 不同,此方法可用于 permit 必须对 'static 生命周期有效的场景。Sender 可以廉价地克隆(Sender::clone 本质上是一个引用计数增加,可与 Arc::clone 媲美),因此在需要多个 OwnedPermit 或无法移动 Sender 时,可以在调用 try_reserve_owned 之前对其进行克隆。
如果通道已满,此函数将返回 TrySendError。由于 sender 是按值获取的,因此在这种情况下返回的 TrySendError 包含 sender,以便可以再次使用。否则,如果有可用槽位,此方法将返回一个 OwnedPermit,然后可用于在保证有槽位的情况下在通道上 send。此函数类似于 reserve_owned,不同之处在于它不会等待槽位变为可用。
在不发送消息的情况下 drop OwnedPermit 会将容量释放回通道。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(1);
// 预留容量
let permit = tx.clone().try_reserve_owned().unwrap();
// Trying to send directly on the `tx` will fail due to no
// available capacity.
assert!(tx.try_send(123).is_err());
// Trying to reserve an additional slot on the `tx` will
// fail because there is no capacity.
assert!(tx.try_reserve().is_err());
// 在 permit 上发送成功
permit.send(456);
// 接收在 permit 上发送的值
assert_eq!(rx.recv().await.unwrap(), 456);
源代码pub fn same_channel(&self, other: &Self) -> bool
pub fn same_channel(&self, other: &Self) -> bool
如果这些 sender 属于同一个通道,则返回 true。
§示例
let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
let tx2 = tx.clone();
assert!(tx.same_channel(&tx2));
let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
assert!(!tx3.same_channel(&tx2));源代码pub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
返回通道的当前容量。
容量在通过调用 send 发送值或通过 reserve 预留容量时减少。容量在 Receiver 接收值时增加。这与 max_capacity 不同,后者始终返回最初调用 channel 时指定的缓冲区容量。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel::<()>(5);
assert_eq!(tx.capacity(), 5);
// Making a reservation drops the capacity by one.
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.capacity(), 4);
// Sending and receiving a value increases the capacity by one.
permit.send(());
rx.recv().await.unwrap();
assert_eq!(tx.capacity(), 5);源代码pub fn downgrade(&self) -> WeakSender<T>
pub fn downgrade(&self) -> WeakSender<T>
将 Sender 转换为 WeakSender,该 WeakSender 不计入 RAII 语义,即如果通道的所有 Sender 实例都已 drop,仅剩 WeakSender 实例,则通道将关闭。
源代码pub fn max_capacity(&self) -> usize
pub fn max_capacity(&self) -> usize
返回通道的最大缓冲区容量。
最大容量是最初调用 channel 时指定的缓冲区容量。这与 capacity 不同,后者返回当前可用的缓冲区容量:随着消息的发送和接收,capacity 返回的值会上下波动,而 max_capacity 返回的值则保持不变。
§示例
use tokio::sync::mpsc;
let (tx, _rx) = mpsc::channel::<()>(5);
// both max capacity and capacity are the same at first
assert_eq!(tx.max_capacity(), 5);
assert_eq!(tx.capacity(), 5);
// Making a reservation doesn't change the max capacity.
let permit = tx.reserve().await.unwrap();
assert_eq!(tx.max_capacity(), 5);
// but drops the capacity by one
assert_eq!(tx.capacity(), 4);源代码pub fn strong_count(&self) -> usize
pub fn strong_count(&self) -> usize
返回 Sender 句柄的数量。
源代码pub fn weak_count(&self) -> usize
pub fn weak_count(&self) -> usize
返回 WeakSender 句柄的数量。