Rust多线程:Worker 结构体与线程池中任务的传递机制

天翼云开发者社区
• 阅读 4

本文分享自天翼云开发者社区《Rust多线程:Worker 结构体与线程池中任务的传递机制》,作者:l****n

Rust多线程:Worker 结构体与线程池中任务的传递机制

在实现一个多线程的 Web 服务器时,我们会遇到一个问题:如何在创建线程之后让它们在没有任务时保持等待状态,并且在任务到来时可以立即执行。这是一个典型的线程池设计问题。在 Rust 中,我们需要通过自定义设计来实现这个功能,因为标准库中的 **thread::spawn 并不直接支持这种用法。

问题描述

Rust 的 **thread::spawn 方法会立即执行传入的闭包。如果我们想要在线程池中创建线程并让它们等待任务(即在创建时不执行任何任务),我们就需要自己设计一种机制,能够在稍后将任务传递给这些已经创建好的线程。

解决方案:引入 Worker 结构体

为了解决这个问题,我们引入了一个 **Worker 结构体来管理线程池中的每个线程。Worker 的作用类似于一个工人,它等待任务的到来并在接收到任务时执行。

1. Worker 结构体的定义

Worker 结构体包含两个字段:

id:用于标识每个 Worker。 thread:存放线程的 JoinHandle<()>,它是由 thread::spawn 返回的。 代码如下:

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

2. 创建 Worker 实例

为了让 **Worker 在没有任务时处于等待状态,我们可以在 Worker::new 函数中使用 thread::spawn 创建线程,并传入一个空的闭包:

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});
​
        Worker { id, thread }
    }
}

在这里,我们创建了一个 **Worker 实例,每个 Worker 都会启动一个线程。但这个线程目前还什么都不做,因为我们传递给 spawn 的闭包是空的。

3. 将 Worker 集成到线程池中

接下来,我们修改 **ThreadPool 的实现,使其存储 Worker 的实例而不是直接存储线程的 JoinHandle<()>。在 ThreadPool::new 中,我们使用一个 for 循环创建多个 Worker 实例,并将它们存储在一个 Vec 中:

pub struct ThreadPool {
    workers: Vec<Worker>,
}
​
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
​
        let mut workers = Vec::with_capacity(size);
​
        for id in 0..size {
            workers.push(Worker::new(id));
        }
​
        ThreadPool { workers }
    }
}

这样,我们就为线程池创建了一个由多个 **Worker 组成的集合。每个 Worker 都有一个唯一的 ID,并且都启动了一个线程,虽然这些线程目前还没有执行任何有用的任务。

向 Worker 发送任务

现在,我们解决了创建线程并让它们等待任务的问题。接下来,我们需要设计一个机制,使得线程池能够在任务到来时将任务发送给等待中的线程。

1. 使用信道传递任务

在 Rust 中,信道(channel)是一种非常适合在线程之间传递数据的工具。我们可以使用一个信道来传递任务。线程池会创建一个信道的发送端,每个 **Worker 会拥有信道的接收端。任务通过信道从线程池传递到 Worker,再由 Worker 中的线程执行。

use std::{sync::mpsc, thread};
​
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
​
struct Job;
​
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
​
        let (sender, receiver) = mpsc::channel();
​
        let mut workers = Vec::with_capacity(size);
​
        for id in 0..size {
            workers.push(Worker::new(id));
        }
​
        ThreadPool { workers, sender }
    }
}

2. Worker 处理任务

为了让 **Worker 能够处理任务,我们将信道的接收端传递给每个 Worker 的线程。线程会不断地从信道中接收任务,并执行这些任务。

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(move || {
            receiver;
        });
​
        Worker { id, thread }
    }
}

不过在这段代码中,存在一个问题:信道的接收端 **receiver 被移交给了第一个 Worker,导致无法将其传递给其他 Worker。

3. 使用 Arc 和 Mutex 共享接收端

为了解决这个问题,我们需要使用 **Arc<Mutex> 来共享信道的接收端,这样所有的 Worker 都可以安全地从同一个信道接收任务:

use std::{sync::{mpsc, Arc, Mutex}, thread};
​
type Job = Box<dyn FnOnce() + Send + 'static>;
​
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
​
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
​
        let mut workers = Vec::with_capacity(size);
​
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
​
        ThreadPool { workers, sender }
    }

     pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
​
        self.sender.send(job).unwrap();
    }
}
​
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
​
            println!("Worker {id} got a job; executing.");
​
            job();
        });
​
        Worker { id, thread }
    }
}

在 **Worker::new 中,线程会不断地尝试获取锁来接收任务,并在收到任务后执行。这里我们使用了 Arc 来共享接收端,使用 Mutex 来确保一次只有一个 Worker 能够接收任务。

type Job = Box<dyn FnOnce() + Send + 'static>; *这行代码定义了一个类型别名 *Job。它代表了一个特定的任务类型:

Box<dyn FnOnce() + Send + 'static> 是一个动态分发的闭包(或函数),其具体实现类型在编译时不确定。Box 是一个堆分配的智能指针,用于将闭包存储在堆上。 dyn FnOnce() 表示这个闭包实现了 FnOnce trait,可以被调用一次。 Send 表示这个闭包可以在线程之间安全地传递。 'static 表示闭包的生命周期是整个程序的生命周期,确保闭包在多个线程中可以安全使用。 execute 方法 *这个方法的功能是将一个新的任务(闭包)添加到线程池的任务队列中,以供线程池中的工作线程执行。下面是对 *F: FnOnce() + Send + 'static 的解释:

F: FnOnce() + Send + 'static

是一个泛型约束,表示必须是一个实现了 FnOnce、Send和 'static的闭包类型。**

FnOnce() 确保闭包可以被调用一次。 Send 确保闭包可以安全地在线程之间传递。 'static 确保闭包的生命周期足够长,可以在整个程序运行期间有效。 在 **execute 方法中,你将传入的闭包 f 转换成 Job 类型(即 Box<dyn FnOnce() + Send + 'static>),然后通过 self.sender 将其发送到任务队列中。这使得线程池的工作线程可以从队列中接收并执行这些任务。

总结

通过引入 **Worker 结构体并使用信道进行任务传递,我们成功地实现了一个可以延迟分配任务的线程池。每个 Worker 都是在创建时启动的,但它们会等待任务的到来,只有在接收到任务后才会开始执行。这种设计不仅提高了服务器的吞吐量,还确保了线程资源的高效利用。

点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java目前可以通过以下几种方式进行定时任务
1、单机部署模式Timer:jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。ScheduledExecutorService:也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。
Wesley13 Wesley13
3年前
java多线程的3种实现方式
多线程相关的问题1.什么是进程?​正在执行的程序2.什么是线程?​进程的子单位,一个能够完成独立功能的执行路径3.为什么需要开启多线程?当执行某些耗时操作的任务的时候需要开启多线程,防止线程阻塞能够让两个任务看起来像是在同时执行
Wesley13 Wesley13
3年前
Java 并发编程:任务执行器 Executor 接口
任务执行器(Executor)是一个接口,位于java.util.concurrent包下,它的作用主要是为我们提供任务与执行机制(包括线程使用和调度细节)之间的解耦。比如我们定义了一个任务,我们是通过线程池来执行该任务,还是直接创线程来执行该任务呢?通过Executor就能为任务提供不同的执行机制。执行器的实现方式各种各样,常见的包括同步执行器、一对一执行
Easter79 Easter79
3年前
Spring注解@Scheduled 多线程异步执行
一、前言:Spring定时任务@Schedule的使用方式,默认是单线程同步执行的,启动过程是一个单线程同步启动过程,一旦中途被阻塞,会导致整个启动过程阻塞,其余的定时任务都不会启动。二、@Schedule注解多线程的实现:多个定时任务的执行,通过使用@Async注解来实现多线程异步调用。@Scheduled(
Wesley13 Wesley13
3年前
Java基础教程——线程池
启动新线程,需要和操作系统进行交互,成本比较高。使用线程池可以提高性能——线程池会提前创建大量的空闲线程,随时待命执行线程任务。在执行完了一个任务之后,线程会回到空闲状态,等待执行下一个任务。(这个任务,就是Runnable的run()方法,或Callable的call()方法)。Java5之前需要手动实现线程池,Java5之
Wesley13 Wesley13
3年前
Java 多线程,线程池,
1\.创建线程池的方法之三://对于每个任务,如果有空闲的线程可用,立即让他执行任务,//没有空闲的线程则创建一个线程。ExecutorServicepoolExecutors.newCachedThreadPool();//固定大小的线程池,任务数空闲线程数,得不到服务的任务
Stella981 Stella981
3年前
Python并发(二)
并发是指一次处理多件事,而并行是指一次做多件事。二者不同,但互相有联系。打个比方:像Python的多线程,就是并发,因为Python的解释器GIL是线程不安全的,一次只允许执行一个线程的Python字节码,我们在使用多线程时,看上去像很多个任务同时进行,但实际上但一个线程在执行的时候,其他线程是处于休眠状态的。而在多CPU的服务器上,Java或Go的多线程,
ThreadPoolExecutor线程池内部处理浅析 | 京东物流技术团队
我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池
并发编程-ExecutorCompletionService解析
1、简单介绍我们在并发编程中,目前大部分做法都是将任务添加到线程池中,并拿到Future对象,将其添加到集合中,等所有任务都添加到线程池后,在通过遍历Future集合,调用future.get()来获取每个任务的结果,这样可以使得先添加到线程池的任务先等待
Rust 中的 Tokio 线程同步机制
本文分享自天翼云开发者社区《》,作者:lnRust中的Tokio线程同步机制在并发编程中,线程同步是一个重要的概念,用于确保多个线程在访问共享资源时能够正确地协调。Tokio是一个强大的异步运行时库,为Rust提供了多种线程同步机制。以下是一些常见的同步机
天翼云开发者社区
天翼云开发者社区
Lv1
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
文章
929
粉丝
16
获赞
40