Java多线程之任务执行

Wesley13
• 阅读 816

Java多线程之任务执行

一、在线程中执行任务

1.串行的执行任务

在应用程序中可以通过多种策略来调度任务,而其中的策略能够更好的利用潜在的并发性。_最简单的策略就是在单个线程中串行的执行各项任务。_

public class SingleThreadWebServer {

    public static void main(String args[]) throws IOException {
        ServerSocket serverSocket = new ServerSocket(80);
        while (true) {
            Socket connetcion = serverSocket.accept();
            handlerRequest();
        }
    }

    public static void handlerRequest() {
        return;
    }
}

它每次只能处理一个请求。_当服务器正在处理请求时,新到来的连接必须等待直到请求处理完成。然后服务器再次调用accept方法。_

2.显示地为任务创建线程

public class ThreadPerTaskWebServer {

    public static void main() throws IOException {
        ServerSocket serverSocket = new ServerSocket(80);

        while (true) {
            Socket connection = serverSocket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    handlerRequest();
                }
            };
            new Thread(task).start();
        }
    }

    public static void handlerRequest() {
        return;
    }
}

对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。

3.无限制创建线程的不足

为每个任务分配一个线程的缺陷:

  • _线程的生命周期的开销非常高_:线程的创建与销毁并不是没有代价的。

  • _资源的消耗_:活跃的线程会消耗系统资源,尤其是内存。如果你已经拥有足够多的线程使所有CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。

  • **稳定性**:在可创建线程的数量上存在一个限制。这个限制随着平台的不同而不同,并且受多个因素制约。

二、Executor框架

线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。_在Java类库中,任务执行的主要抽象不是Thread,而是Executor。虽然Executor是个简单的接口,但他却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。_它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监测机制。

_Executor基于生产者和消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。_如果要在程序中实现一个消费者-生产者的设计,那么最简单的方式通常就是使用Executor。

1.基于Executor的web服务器

public class TaskExecutionWebServer {

    private static final int NTHREADS = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String args[]) throws IOException {
        ServerSocket serverSocket = new ServerSocket(80);
        while (true) {
            final Socket connection = serverSocket.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    handlerRequest(connection);
                }
            };
            exec.execute(task);
        }
    }

    private static void handlerRequest(Socket connection) {
        return;
    }

}

通过使用Executor,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的Executor实现,就可以改变服务器的行为。

改变Executor实现或配置所带来的影响要远远小于改变任务提交方式带来的影响。通常Executor的配置是一次性的,因此在部署阶段可以完成,而提交任务的代码却会不断扩散到整个程序中,增加了修改的难度。

我们可以很容易地将TaskExecutionWebServer修改为类似ThreadPerTaskWebServer的行为,只需使用一个为每个请求都创建新线程的Executor。

public class ThreadPerTaskExecutor extends Executor {

    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

以同步方式执行所有任务的Executor

public class WithinThreadExecutor implements Executor {

    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

2.线程池

在线程池中执行任务比为每一个任务分配一个线程优势更多。通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。也不会由于等待创建线程而延迟任务的执行,从而提高了响应性。

‍**可以通过调用Executors中的静态工厂方法之一来创建一个线程池**‍

  • newFixedThreadPool

  • newCachedThreadPool

  • newSingleThreadExecutor

  • newSingleThreadScheduledExecutor

  • newScheduledThreadPool

/**
 * 创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化
 */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}


public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
}

/**
 * 将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}


public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            threadFactory);
}

/**
 * 是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来代替。
 * 能确保依照任务在队列中的顺序来串行执行。
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

/**
 * 创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务
 */
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

3.Executor的生命周期

由于Executor以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。

有些任务可能应经完成,有些任务可能正在运行,而其他任务可能在队列中等待执行。

为解决执行任务的声明周期问题,Executor扩展了ExecutorService接口,添加了一些用于声明周期管理的方法。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;


    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
    ............
    ............
}

ExecutorService的声明周期有三种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。

‍_shutdown_ ‍方法将执行平缓的关闭过程:不在接受新的任务,同时等待已经提交的任务执行完成---包括那些还未开始执行的任务。

‍_shutdownNow_ ‍方法将执行粗暴的关闭过程:他将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

在ExecutorService关闭后提交的任务将由“拒绝执行处理器”来处理。

可以调用awaitTermination()来等待ExecutorService到达终止状态。

或者通过调用isTerminated()来轮询ExecutorService是否已经终止。‍‍

支持关闭操作的web服务器

示例代码:

public class LifecycleWebServer {

    private final ExecutorService exec = ...;

    public void start() throws IOException {
        ServerSocket serverSocket = new ServerSocket(80) ;

        while (!exec.isShutdown()){
            try {
                final Socket conn = serverSocket.accept();
                exec.execute(new Runnable() {
                    @Override
                    public void run() {
                        handlerRequest(conn);
                    }
                });
            } catch (ReflectiveOperationException e){
                if (!exec.isShutdown()){
                    log("task submission rejected",e);
                }
            }
        }
    }

    public void stop(){
        exec.shutdown();
    }

    public void handlerRequest(Socket conn) {
        Request request = readRequest(conn);
        if (isShutdownRequest(request))
            stop();
        else
            dispatchRequest(request);
    }
}

4.延迟任务和周期任务

延迟任务:在100秒后执行该任务

周期任务:每10秒执行一次任务

Timer存在一些缺陷,应该考虑使用ScheduleThreadPoolExecutor来代替他。

可以通过‍ScheduleThreadPoolExecutor ‍的构造函数或Executors 的 newScheduleThreadPool  工厂方法来创建该类的对象。

Timer在执行所有定时任务时只会创建一个线程。如果某个任务执行时间过长,那么将破坏其他TimerTask的定时精确性。

Timer另一个问题是如果TimeTask抛出了一个未检查的异常,那么Timer将表现出槽糕的行为。Timer线程并不捕获异常,因此

当TimerTask抛出未检查的异常时将终止定时线程。这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都消失了。

因此,已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不会被调度。这就是线程泄漏。

三、找出可利用的并行性

1.携带结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式。

Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。

许多任务实际上都是存在延迟的计算。对于具有延迟性的任务,Callable是一种更好的抽象 :它认为主入口点(即call)将返回一个值,并可能抛出一个异常。

‍_Runnable和Callable描述的都是抽象的计算任务。_‍‍

Executor执行的任务有4个生命周期阶段:创建,提交,开始,完成。

Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。

在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在完成的状态上。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) 
            throws InterruptedException, ExecutionException, TimeoutException; 
}

public interface Callable<V> { 
        V call() throws Exception; 
}

可以通过多种方式创建一个Future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获取任务的执行结果或者取消任务。

==========END==========

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Stella981 Stella981
3年前
Executor框架详解
任务是一组逻辑工作单元,线程是使任务异步执行的机制。下面分析两种通过创建线程来执行任务的策略。1将所有任务放在单个任务中串行执行;2为每个任务创建单独的线程来执行实际上两种方式都存在一些严重的缺陷。串行执行的问题在于其糟糕的响应和吞吐量;而为每个任务单独创建线程的问题在于资源管理的复杂性,容易造成资源的浪费和过度消耗,影响系统的稳定性。为了提
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这