跳到主要内容

Module sync

搜索

Module sync 

源代码
展开描述

用于异步上下文的同步原语。

Tokio 程序往往被组织为一组任务,其中每个任务独立操作,可能在不同的物理线程上执行。本模块提供的同步原语允许这些独立任务彼此通信。

§Message passing

Tokio 程序中最常见的同步形式是消息传递。两个任务独立运行,并通过相互发送消息来进行同步。这样做的好处是避免了共享状态。

消息传递使用通道来实现。通道支持将消息从一个生产者任务发送到一个或多个消费者任务。Tokio 提供了几种不同类型的通道。每种通道类型支持不同的消息传递模式。当通道支持多个生产者时,许多单独的任务可以发送消息。当通道支持多个消费者时,许多不同的单独任务可以接收消息。

由于不同的消息传递模式最适合用不同的实现来处理,因此 Tokio 提供了许多不同类型的通道。

§oneshot channel

oneshot 通道支持从单个生产者向单个消费者发送单个值。该通道通常用于将计算结果发送给等待者。

示例:使用 oneshot 通道接收计算结果。

use tokio::sync::oneshot;

async fn some_computation() -> String {
    "represents the result of the computation".to_string()
}

let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
    let res = some_computation().await;
    tx.send(res).unwrap();
});

// Do other work while the computation is happening in the background

// Wait for the computation result
let res = rx.await.unwrap();

请注意,如果任务在终止之前的最后动作是产生计算结果,则可以使用 JoinHandle 来接收该值,而无需为 oneshot 通道分配资源。等待 JoinHandle 返回 Result。如果任务 panic,则 Joinhandle 会产生带有 panic 原因的 Err

示例:

async fn some_computation() -> String {
    "the result of the computation".to_string()
}

let join_handle = tokio::spawn(async move {
    some_computation().await
});

// Do other work while the computation is happening in the background

// Wait for the computation result
let res = join_handle.await.unwrap();

§mpsc channel

mpsc 通道支持从多个生产者向单个消费者发送许多值。该通道通常用于将工作发送给任务,或接收许多计算的结果。

如果要将许多消息从单个生产者发送到单个消费者,也应该使用此通道。没有专用的 spsc 通道。

示例:使用 mpsc 增量流式传输一系列计算的结果。

use tokio::sync::mpsc;

async fn some_computation(input: u32) -> String {
    format!("the result of computation {}", input)
}

let (tx, mut rx) = mpsc::channel(100);

tokio::spawn(async move {
    for i in 0..10 {
        let res = some_computation(i).await;
        tx.send(res).await.unwrap();
    }
});

while let Some(res) = rx.recv().await {
    println!("got = {}", res);
}

mpsc::channel 的参数是通道容量。这是任何给定时间在通道中可以存储等待接收的最大值数。由于通道容量在处理背压方面起着关键作用,因此正确设置此值是实现健壮程序的关键。

一种用于资源管理的常见并发模式是生成一个专门管理该资源的任务,并使用其他任务之间的消息传递来与该资源交互。该资源可以是任何不能被并发使用的事物。一些示例包括 socket 和程序状态。例如,如果多个任务需要通过单个 socket 发送数据,则生成一个任务来管理该 socket 并使用通道进行同步。

示例:使用消息传递从许多任务通过单个 socket 发送数据。

use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("www.example.com:1234").await?;
    let (tx, mut rx) = mpsc::channel(100);

    for _ in 0..10 {
        // Each task needs its own `tx` handle. This is done by cloning the
        // original handle.
        let tx = tx.clone();

        tokio::spawn(async move {
            tx.send(&b"data to write"[..]).await.unwrap();
        });
    }

    // The `rx` half of the channel returns `None` once **all** `tx` clones
    // drop. To ensure `None` is returned, drop the handle owned by the
    // current task. If this `tx` handle is not dropped, there will always
    // be a single outstanding `tx` handle.
    drop(tx);

    while let Some(res) = rx.recv().await {
        socket.write_all(res).await?;
    }

    Ok(())
}

mpsconeshot 通道可以组合在一起,以提供与共享资源的请求/响应类型同步模式。生成一个任务来同步资源,并等待通过 mpsc 通道接收到的命令。每个命令都包含一个 oneshot Sender,命令的结果将通过它发送。

示例:使用任务同步一个 u64 计数器。每个任务发送一个“获取并递增”命令。递增之前的计数器值通过所提供的 oneshot 通道发送。

use tokio::sync::{oneshot, mpsc};
use Command::Increment;

enum Command {
    Increment,
    // Other commands can be added here
}

let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);

// Spawn a task to manage the counter
tokio::spawn(async move {
    let mut counter: u64 = 0;

    while let Some((cmd, response)) = cmd_rx.recv().await {
        match cmd {
            Increment => {
                let prev = counter;
                counter += 1;
                response.send(prev).unwrap();
            }
        }
    }
});

let mut join_handles = vec![];

// Spawn tasks that will send the increment command.
for _ in 0..10 {
    let cmd_tx = cmd_tx.clone();

    join_handles.push(tokio::spawn(async move {
        let (resp_tx, resp_rx) = oneshot::channel();

        cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
        let res = resp_rx.await.unwrap();

        println!("previous value = {}", res);
    }));
}

// Wait for all tasks to complete
for join_handle in join_handles.drain(..) {
    join_handle.await.unwrap();
}

§broadcast channel

broadcast 通道支持从多个生产者向多个消费者发送许多值。每个消费者都会收到每一条值。该通道可用于实现 pub/sub 或“聊天”系统中常见的“扇出”模式。

该通道的使用频率通常低于 oneshotmpsc,但仍有其使用场景。

如果要将值从单个生产者广播到多个消费者,也应该使用此通道。没有专用的 spmc 广播通道。

基本用法

use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tokio::spawn(async move {
    assert_eq!(rx1.recv().await.unwrap(), 10);
    assert_eq!(rx1.recv().await.unwrap(), 20);
});

tokio::spawn(async move {
    assert_eq!(rx2.recv().await.unwrap(), 10);
    assert_eq!(rx2.recv().await.unwrap(), 20);
});

tx.send(10).unwrap();
tx.send(20).unwrap();

§watch channel

watch 通道支持从多个生产者向多个消费者发送许多值。但是,通道中仅存储最近的值。消费者会在发送新值时收到通知,但不能保证消费者会看到所有值。

watch 通道类似于容量为 1 的 broadcast 通道

watch 通道的用例包括广播配置更改或发出程序状态更改的信号,例如转换到关闭状态。

示例:使用 watch 通道向任务通知配置更改。在此示例中,定期检查配置文件。当文件发生更改时,会向消费者发出配置更改的信号。

use tokio::sync::watch;
use tokio::time::{self, Duration, Instant};

use std::io;

#[derive(Debug, Clone, Eq, PartialEq)]
struct Config {
    timeout: Duration,
}

impl Config {
    async fn load_from_file() -> io::Result<Config> {
        // file loading and deserialization logic here
    }
}

async fn my_async_operation() {
    // Do something here
}

// Load initial configuration value
let mut config = Config::load_from_file().await.unwrap();

// Create the watch channel, initialized with the loaded configuration
let (tx, rx) = watch::channel(config.clone());

// Spawn a task to monitor the file.
tokio::spawn(async move {
    loop {
        // Wait 10 seconds between checks
        time::sleep(Duration::from_secs(10)).await;

        // Load the configuration file
        let new_config = Config::load_from_file().await.unwrap();

        // If the configuration changed, send the new config value
        // on the watch channel.
        if new_config != config {
            tx.send(new_config.clone()).unwrap();
            config = new_config;
        }
    }
});

let mut handles = vec![];

// Spawn tasks that runs the async operation for at most `timeout`. If
// the timeout elapses, restart the operation.
//
// The task simultaneously watches the `Config` for changes. When the
// timeout duration changes, the timeout is updated without restarting
// the in-flight operation.
for _ in 0..5 {
    // Clone a config watch handle for use in this task
    let mut rx = rx.clone();

    let handle = tokio::spawn(async move {
        // Start the initial operation and pin the future to the stack.
        // Pinning to the stack is required to resume the operation
        // across multiple calls to `select!`
        let op = my_async_operation();
        tokio::pin!(op);

        // Get the initial config value
        let mut conf = rx.borrow().clone();

        let mut op_start = Instant::now();
        let sleep = time::sleep_until(op_start + conf.timeout);
        tokio::pin!(sleep);

        loop {
            tokio::select! {
                _ = &mut sleep => {
                    // The operation elapsed. Restart it
                    op.set(my_async_operation());

                    // Track the new start time
                    op_start = Instant::now();

                    // Restart the timeout
                    sleep.set(time::sleep_until(op_start + conf.timeout));
                }
                _ = rx.changed() => {
                    conf = rx.borrow_and_update().clone();

                    // The configuration has been updated. Update the
                    // `sleep` using the new `timeout` value.
                    sleep.as_mut().reset(op_start + conf.timeout);
                }
                _ = &mut op => {
                    // The operation completed!
                    return
                }
            }
        }
    });

    handles.push(handle);
}

for handle in handles.drain(..) {
    handle.await.unwrap();
}

§State synchronization

其余的同步原语专注于同步状态。这些是 std 提供的版本的异步等价物。它们的工作方式与 std 对应物类似,但会异步等待而不是阻塞线程。

  • Barrier 确保多个任务相互等待到达程序中的某个点,然后再一起继续执行。

  • Mutex 互斥机制,确保一次最多一个线程能够访问某些数据。

  • Notify 基本任务通知。Notify 支持在不发送数据的情况下通知接收任务。在这种情况下,任务会唤醒并恢复处理。

  • RwLock 提供一种互斥机制,允许多个读者同时访问,但一次仅允许一个写入者。在某些情况下,这比互斥锁更高效。

  • Semaphore 限制并发量。信号量持有若干许可,任务可以请求这些许可以进入临界区。信号量对于实现任何类型的限制或约束非常有用。

§Runtime compatibility

本模块提供的所有同步原语都与运行时无关。你可以在 Tokio 运行时的不同实例之间自由移动它们,甚至可以从非 Tokio 运行时使用它们。

在 Tokio 运行时中使用时,这些同步原语参与协作调度以避免饥饿。从非 Tokio 运行时使用时,此功能不适用。

作为例外,以 _timeout 结尾的方法不是与运行时无关的,因为它们需要访问 Tokio 定时器。有关其使用的更多信息,请参阅各个 *_timeout 方法的文档。

模块§

broadcast
A multi-producer, multi-consumer broadcast queue. Each sent value is seen by all consumers.
futures
Named future types.
mpsc
A multi-producer, single-consumer queue for sending values between asynchronous tasks.
oneshot
A one-shot channel is used for sending a single message between asynchronous tasks. The channel function is used to create a Sender and Receiver handle pair that form the channel.
watch
A multi-producer, multi-consumer channel that only retains the last sent value.

结构体§

AcquireError
Error returned from the Semaphore::acquire function.
Barrier
A barrier enables multiple tasks to synchronize the beginning of some computation.
BarrierWaitResult
A BarrierWaitResult is returned by wait when all tasks in the Barrier have rendezvoused.
MappedMutexGuard
A handle to a held Mutex that has had a function applied to it via MutexGuard::map.
Mutex
An asynchronous Mutex-like type.
MutexGuard
A handle to a held Mutex. The guard can be held across any .await point as it is Send.
Notify
Notifies a single task to wake up.
OnceCell
A thread-safe cell that can be written to only once.
OwnedMappedMutexGuard
A owned handle to a held Mutex that has had a function applied to it via OwnedMutexGuard::map.
OwnedMutexGuard
An owned handle to a held Mutex.
OwnedRwLockMappedWriteGuard
Owned RAII structure used to release the exclusive write access of a lock when dropped.
OwnedRwLockReadGuard
Owned RAII structure used to release the shared read access of a lock when dropped.
OwnedRwLockWriteGuard
Owned RAII structure used to release the exclusive write access of a lock when dropped.
OwnedSemaphorePermit
An owned permit from the semaphore.
RwLock
An asynchronous reader-writer lock.
RwLockMappedWriteGuard
RAII structure used to release the exclusive write access of a lock when dropped.
RwLockReadGuard
RAII structure used to release the shared read access of a lock when dropped.
RwLockWriteGuard
RAII structure used to release the exclusive write access of a lock when dropped.
Semaphore
Counting semaphore performing asynchronous permit acquisition.
SemaphorePermit
A permit from the semaphore.
SetOnce
A thread-safe cell that can be written to only once.
SetOnceError
Error that can be returned from SetOnce::set.
TryLockError
Error returned from the Mutex::try_lock, RwLock::try_read and RwLock::try_write functions.

枚举§

SetError
Errors that can be returned from OnceCell::set.
TryAcquireError
Error returned from the Semaphore::try_acquire function.