Rust 中的 Tokio 线程同步机制

天翼云开发者社区
• 阅读 4

本文分享自天翼云开发者社区《Rust 中的 Tokio 线程同步机制》,作者:l****n

Rust 中的 Tokio 线程同步机制

在并发编程中,线程同步是一个重要的概念,用于确保多个线程在访问共享资源时能够正确地协调。Tokio 是一个强大的异步运行时库,为 Rust 提供了多种线程同步机制。以下是一些常见的同步机制:

  1. Mutex
  2. RwLock
  3. Barrier
  4. Semaphore
  5. Notify
  6. oneshot 和 mpsc 通道
  7. watch 通道

    1. Mutex

    Mutex(互斥锁)是最常见的同步原语之一,用于保护共享数据。它确保同一时间只有一个线程能够访问数据,从而避免竞争条件。
use tokio::sync::Mutex;
use std::sync::Arc;
​
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));
​
    let mut handles = vec![];
    for _ in 0..10 {
        let data = data.clone();
        let handle = tokio::spawn(async move {
            let mut lock = data.lock().await;
            *lock += 1;
        });
        handles.push(handle);
    }
​
    for handle in handles {
        handle.await.unwrap();
    }
​
    println!("Result: {}", *data.lock().await);
}

2. RwLock

RwLock(读写锁)允许多线程同时读取数据,但只允许一个线程写入数据。它比 Mutex 更加灵活,因为在读取多于写入的场景下,它能提高性能。功能上,他是读写互斥、写写互斥、读读兼容。

use tokio::sync::RwLock;
use std::sync::Arc;
​
#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(0));
​
    let read_data = data.clone();
    let read_handle = tokio::spawn(async move {
        let lock = read_data.read().await;
        println!("Read: {}", *lock);
    });
​
    let write_data = data.clone();
    let write_handle = tokio::spawn(async move {
        let mut lock = write_data.write().await;
        *lock += 1;
        println!("Write: {}", *lock);
    });
​
    read_handle.await.unwrap();
    write_handle.await.unwrap();
}

3. Barrier

Barrier 是一种同步机制,允许多个线程在某个点上进行同步。当线程到达屏障时,它们会等待直到所有线程都到达,然后一起继续执行。

use tokio::sync::Barrier;
use std::sync::Arc;
​
#[tokio::main]
async fn main() {
    let barrier = Arc::new(Barrier::new(3));
​
    let mut handles = vec![];
    for i in 0..3 {
        let barrier = barrier.clone();
        let handle = tokio::spawn(async move {
            println!("Before wait: {}", i);
            barrier.wait().await;
            println!("After wait: {}", i);
        });
        handles.push(handle);
    }
​
    for handle in handles {
        handle.await.unwrap();
    }
}

4. Semaphore

Semaphore(信号量)是一种用于控制对资源访问的同步原语。它允许多个线程访问资源,但有一个最大并发数限制。

#[tokio::test]
async fn test_sem() {
    let semaphore = Arc::new(Semaphore::new(3));
​
    let mut handles = vec![];
    for i in 0..5 {
        let semaphore = semaphore.clone();
        let handle = tokio::spawn(async move {
            let permit = semaphore.acquire().await.unwrap();
            let now = Local::now();
            println!("Got permit: {} at {:?}", i, now);
            println!(
                "Semaphore available permits before sleep: {}",
                semaphore.available_permits()
            );
            sleep(Duration::from_secs(5)).await;
            drop(permit);
            println!(
                "Semaphore available permits after sleep: {}",
                semaphore.available_permits()
            );
        });
        handles.push(handle);
    }
​
    for handle in handles {
        handle.await.unwrap();
    }
}

最终的结果如下

Got permit: 0 at 2024-08-08T21:03:04.374666+08:00
Semaphore available permits before sleep: 2
Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00
Semaphore available permits before sleep: 1
Got permit: 2 at 2024-08-08T21:03:04.375563+08:00
Semaphore available permits before sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 0
Semaphore available permits after sleep: 1
Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00
Semaphore available permits before sleep: 1
Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00
Semaphore available permits before sleep: 1
Semaphore available permits after sleep: 2
Semaphore available permits after sleep: 3

5. Notify

Notify 是一种用于线程间通知的简单机制。它允许一个线程通知其他线程某些事件的发生。

use tokio::sync::Notify;
use std::sync::Arc;
​
#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();
​
    let handle = tokio::spawn(async move {
        notify_clone.notified().await;
        println!("Received notification");
    });
​
    notify.notify_one();
    handle.await.unwrap();
}

6. oneshot 和 mpsc 通道

oneshot 通道用于一次性发送消息,而 mpsc 通道则允许多个生产者发送消息到一个消费者。一般地onshot用于异常通知、启动分析等功能。mpsc用于实现异步消息同步``` oneshot

use tokio::sync::oneshot;
​
#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();
​
    tokio::spawn(async move {
        tx.send("Hello, world!").unwrap();
    });
​
    let message = rx.await.unwrap();
    println!("Received: {}", message);
}

mpsc

use tokio::sync::mpsc;
​
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
​
    tokio::spawn(async move {
        tx.send("Hello, world!").await.unwrap();
    });
​
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}

7. watch 通道

watch 通道用于发送和接收共享状态的更新。它允许多个消费者监听状态的变化。

use tokio::sync::watch;
​
#[tokio::main]
async fn main() {
    let (tx, mut rx) = watch::channel("initial");
​
    tokio::spawn(async move {
        tx.send("updated").unwrap();
    });
​
    while rx.changed().await.is_ok() {
        println!("Received: {}", *rx.borrow());
    }
}

watch通道​:

用于广播状态更新,一个生产者更新状态,多个消费者获取最新状态。 适合配置变更、状态同步等场景。 ​mpsc通道​:

用于传递消息队列,多个生产者发送消息,一个消费者逐条处理。 适合任务队列、事件驱动等场景。

总结

Rust 中的 Tokio 提供了丰富的线程同步机制,可以根据具体需求选择合适的同步原语。常用的同步机制包括:

  1. Mutex:互斥锁,保护共享数据。
  2. RwLock:读写锁,允许并发读,写时独占。
  3. Barrier:屏障,同步多个线程在某一点。
  4. Semaphore:信号量,控制并发访问资源。
  5. Notify:通知机制,用于线程间通知。
  6. oneshot 和 mpsc 通道:消息传递机制。
  7. watch 通道:状态更新机制。 通过这些同步机制,可以在 Rust 中编写高效、安全的并发程序。
点赞
收藏
评论区
推荐文章
Easter79 Easter79
3年前
synchronized 和 ReentrantLock的区别
synchronized是Java内建的同步机制,所以也有人称其为IntrinsicLocking,它提供了互斥的语义和可见性,当一个线程已经获取当前锁时,其他试图获取的线程只能等待或者阻塞在那里。在Java5以前,synchronized是仅有的同步手段,在代码中,synchronized可以用来修饰方法,也可以使用在特定的代码块
Wesley13 Wesley13
3年前
java中volatile关键字的理解
一、基本概念Java内存模型中的可见性、原子性和有序性。可见性:  可见性是一种复杂的属性,因为可见性中的错误总是会违背我们的直觉。通常,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。为了确保多个线程之间对内存写入操作的可见性,必须使用同步机制。  可见性,是指线程之间的可见性,一个线
小万哥 小万哥
2年前
C++多线程编程和同步机制:详解和实例演示
C中的多线程编程和同步机制使得程序员可以利用计算机的多核心来提高程序的运行效率和性能。本文将介绍多线程编程和同步机制的基本概念和使用方法。多线程编程基础在C中,使用库来创建和管理线程。线程可以通过函数、成员函数或者Lambda表达式来实现。以下是一
Wesley13 Wesley13
3年前
Java线程知识深入解析(2)
多线程程序对于多线程的好处这就不多说了。但是,它同样也带来了某些新的麻烦。只要在设计程序时特别小心留意,克服这些麻烦并不算太困难。(1)同步线程许多线程在执行中必须考虑与其他线程之间共享数据或协调执行状态。这就需要同步机制。在Java中每个对象都有一把锁与之对应。但Java不提供单独的lock和unlock操作。它由高层的结构隐
Wesley13 Wesley13
3年前
Java 之 synchronized 详解
一、概念synchronized是Java中的关键字,是利用锁的机制来实现同步的。锁机制有如下两种特性:互斥性:即在同一时间只允许一个线程持有某个对象锁,通过这种特性来实现多线程中的协调机制,这样在同一时间只有一个线程对需同步的代码块(复合操作)进行访问。互斥性我们也往往称为操作的原子性。可见性:必须确
Stella981 Stella981
3年前
Linux 多线程
I.同步机制线程间的同步机制主要包括三个:互斥锁:以排他的方式,防止共享资源被并发访问;互斥锁为二元变量,状态为0开锁、1上锁;开锁必须由上锁的线程执行,不受其它线程干扰.条件变量:
Stella981 Stella981
3年前
Linux内核同步机制
1\.同步与互斥(1)互斥与同步机制是计算机系统中,用于控制进程对某些特定资源(共享资源)的访问的机制(2)同步是指用于实现控制多个进程按照一定的规则或顺序访问某些系统资源的机制。(3)互斥是指用于实现控制某些系统资源在任意时刻只能允许一个进程访问的机制。互斥是同步机制中的一种特殊情况。(4)同步机制是linux操作系统可以高效稳定运行的重
深入理解分布式锁:原理、应用与挑战| 京东物流技术团队
前言在单机环境中,我们主要通过线程间的加锁机制来确保同一时间只有一个线程能够访问某个共享资源或执行某个关键代码块,从而防止各种并发修改异常。例如,在Java中提供了synchronized/Lock。但是在分布式环境中,这种线程间的锁机制已经不起作用了,因
京东云开发者 京东云开发者
6个月前
深入理解分布式锁:原理、应用与挑战
作者:京东物流刘浩前言在单机环境中,我们主要通过线程间的加锁机制来确保同一时间只有一个线程能够访问某个共享资源或执行某个关键代码块,从而防止各种并发修改异常。例如,在Java中提供了synchronized/Lock。但是在分布式环境中,这种线程间的锁机制
Rust多线程:Worker 结构体与线程池中任务的传递机制
本文分享自天翼云开发者社区《》,作者:lnRust多线程:Worker结构体与线程池中任务的传递机制在实现一个多线程的Web服务器时,我们会遇到一个问题:如何在创建线程之后让它们在没有任务时保持等待状态,并且在任务到来时可以立即执行。这是一个典型的线程池设
天翼云开发者社区
天翼云开发者社区
Lv1
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
文章
929
粉丝
16
获赞
40