内容提要

最近,在公司做了几次技术分享,将slides上的内容整理成博客。本文的主要内容包括如下几个部分:

  1. 背景
  2. Java线程池类型
  3. 线程池参数与使用实例
  4. ThreadPoolExecutor实现
  5. 总结

本文将主要讲解ThreadPoolExecutor的使用、参数解释以及内部实现,对于ScheduedThreadPoolExecutor与ForkJoinPool只会简单提及。

一、背景

我们为什么需要使用线程池,直接new Thread岂不是很方便?如果我们在编写Demo或者开发个小型的Java应用程序,那么这种方法的确没什么问题。但是如果我们在开发大型的企业级应用,那么直接创建线程是一种糟糕的实践,会造成如下问题:

  • 直接new Thread使创建线程的代码分散在项目各个地方,不利于维护(指代码上的)
  • 当你的Runnable执行完毕线程无法复用造成资源浪费
  • 直接new Thread不遍于控制线程的数量(启动多少个线程不受限制,想new就new)

那么这些问题有什么缺点呢?对于第一条是显然的。对于第二条,你可能以为创建线程的代价和创建一个对象的代价差不多,但除了创建对象外,还可能会在OS层面创建线程(这取决于JVM实现)。线程是一种宝贵的资源(每个线程有自己的调用栈、PC、IR),频繁创建销毁开销很大。对于第三条,创建大量的线程显而易见会带来昂贵的调度开销,要知道线程调度需要做保留现场、恢复现场等一些列操作。

二、Java线程池类型

为了实现不同类型的线程池,定义了三个接口:ExecutorExecutorServiceScheduledExecutorService接口,他们都位于java.util.concurrent包下。

  • Executor:能够执行Runnable的简单接口,只含有execute一个方法
  • ExecutorService :继承自Executor,添加了shutdown、submit、invokeAll等方法,以支持异步执行
  • ScheduledExecutorService:继承自ExecutorService,支持周期性地执行任务

Java线程池目前有三个实现,分别是ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。ThreadPoolExecutor可以根据构造方法实现不同类型的线程池(或者使用Executors类),但是不支持定时任务;ScheduledExecutorService支持定时执行;ForkJoinPool利用Fork-join框架,实现了work-stealing,对于执行时间差异很大的任务使用该类型线程池能够实现负载均衡。图1展示了他们直接的继承/实现关系。

图1 – 线程池UML图

三、线程池参数与使用实例

下面我们分别介绍这三个线程池的构造方法,这样我们才能够知道如何创建线程池。因为每种线程池都有多个构造方法,我们只介绍参数最复杂的那一个。

1. ThreadPoolExecutor

构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

corePoolSize: 当线程池中线程数量小于corePoolSize, 即使这些线程是空闲的,也不会被销毁。但如果调用方法allowCoreThreadTimeOut(true),表示核心线程池的线程在没有任务到达的时候,keepAliveTime时间后销毁。

maximumPoolSize:限定线程池最大数量。当线程池中线程数达到corePoolSize时,且任务队列workQueue以及满了,就会创建新线程直到数量达到maxiumPoolSize。注意,如果workQueue是一个无界队列(unbounded)时,该参数是无效的,因为你无论添加多少任务workQueue都不会满。

keepAliveTime:当线程数大于corePoolSize时,多余线程的存活时间。

unit:keepAliveTime参数的时间单位

workQueue:用于存放尚未执行任务的队列,元素必须是Runnable。workQueue必须是一个阻塞队列,例如LinkedBlockingQueue、ArrayBlockingQueue。

threadFactory:创建线程时所使用的线程工厂,默认为Executors.defaultThreadFactor。如果你想让创建的线程带有自定义name或者优先级时,可以传入自己实现的线程工厂。

handler:在某种情况下(下面会提到),无法提交任务所执行的处理策略。目前有如下内置的handler实现。

  • AbortPolicy 直接拒绝新任务,并抛出RejectedExecutionException异常
  • CallerRunsPolicy 用当前线程的execute方法执行被拒绝的任务,如果执行器已经关闭则丢弃任务
  • DiscardPolicy 默默地丢弃新到的任务
  • DiscardOldestPolicy 丢弃最老的一个未执行的任务并执行当前任务

线程池逻辑

图2描述了线程创建与入队的逻辑。如果从线程创建和入队的角度分别观察,能够整理出如下规则。

线程创建规则

  • 当线程池中线程数量小于corePoolSize时,将会创建新线程(即使之前创建当线程处于空闲)
  • 如果线程数量大于等于corePoolSize且小于maximumPoolSize时,只有队列已满才会创建新线程

入队规则

  • 如果线程数量小于corePoolSize,优先创建线程而不入队
  • 如果线程数量大于等于corePoolSize,优先入队而不创建线程
  • 如果线程数量达到maximumPoolSize且队列已满,则拒绝任务,交由handler处理
图2 – 线程池流程图

队列的选择

直接交接 (Direct handoffs)

如果想要实现低延迟,可以选择SynchronousQueue。它能够把任务直接交接给线程而不是把任务暂存到队列,因为SynchronousQueue 的行为是阻塞队列,且只能存放1个元素,offer时必须等另一个线程从队列poll。

如果线程池没有可用线程来运行任务,那么尝试将任务入队将会失败,所以会创建新的线程。当用户提交的任务间有依赖,这种策略能够避免死锁(例如Task2依赖Task1,但是Task2先被调度的)。

直接交接通常需要把maximumPoolSizes设置成Integer.MAX_VALUE,来避免提交的任务被拒绝。但是当任务的产生大于处理能力时,会导致不断地创建线程。

无界队列 (Unbounded queues)

使用无界队列(例如LinkedBlockingQueue)将会导致当线程数量达到corePoolSize时,新提交的任务等待。因为队列的容量是无限的,所以当线程数量达到corePoolSize时并不会创建线程,也就是说参数maximumPoolSize是无效的。这种模式对于任务间没有依赖可能是合适的,例如Web Server的请求。这种策略可以用于平滑瞬态突发请求(多余的请求入队,不会创建大量线程压垮系统)。

有界队列 (Bounded queues)

当设置maximumPoolSizes为有限值时,使用有界队列(例如ArrayBlockingQueue)可以防止资源耗尽,但是很难对性能进行调优。用户需要衡量(trade off)队列大小与maximumPoolSizes。

使用大容量队列小线程数的配置能够减少CPU使用、OS资源和上下文切换的开销,但是会降低吞吐量。如果任务频繁阻塞(例如I/O受限型(I/O bounded)任务),系统能够调度更多的线程。使用小容量队列大线程数的配置,能够使CPU保持繁忙,但是可能会带来不可接受的调度开销,这反而会降低吞吐量。

2. ScheduledThreadPoolExecutor

使用ScheduledThreadPoolExecutor可以实现延迟任务或者周期性地执行任务。延迟任务在启用后立即执行,但不保证实时,启用他们后何时启动它们(取决于OS何时调度)。对于相同执行时间的两个任务,按照FIFO的顺序来调度(例如两个任务都在8:00执行,优先执行先入队的任务)。当一个任务在运行前被取消,那么任务不会执行。默认情况下,取消的任务不会从队列剔除,直到定时时间到达。

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,使用了corePoolSize无界队列组合。因此, maximumPoolSize参数对它是没用的。此外,不要把corePoolSize设置为0或者使用allowCoreThreadTimeOut,因为当有任务需要调度时,线程池可能会没有线程来调度任务,此时再需要创建线程会影响调度的时效性。

scheduleAtFixedRate方法能够实现按照固定频率启动任务,如果上一个任务的执行时间超过频率,那么可能会延迟一会启动,但不会让两个任务并行运行。

scheduleWithFixedDelay方法实现了按照固定的延迟启动任务,当上一个任务结束后等待给定的延迟,任务才会被执行。

3. ForkJoinPool

ForkJoinPool和上面两种线程池都不同,它采用工作窃取(Work-stealing)调度方式:池中所有的线程都尝试寻找并且执行提交到池中的任务,即使这个任务是由别的线程提交也会处理。因此,这种线程池能够更好的实现负载均衡。它是上面两种线程池的补充,而不是用来代替他们。

举一个场景,当我们实现一个多线程的归并排序(merge sort),我们采用自底向上(Bottom-up)的策略,先将长度为2的子数组排序,将他们合并;再将长度为4的子数组排序,将他们合并……。这是典型的分支(Divide and conquer)思想,我们可以利用ForkJoinPool来实现这样的算法,将排序子数组作为一个任务提交给线程池。当然这个例子可能不太合适,因为这种排序算法要求在不同问题规模间有一个同步点,例如先对所有长度为2的子数组排序后才能对长度为4的排序,而不能异步地将长度为2和长度为4的数组排序混合起来并行执行。当然现实中肯定有这样可以异步执行的例子,使用ForkJoinPool会更合适,只不过我暂时想不到。

ForkJoinPool有一个commonPool静态方法能够创建线程池,创建出的线程池能够胜任大部分的应用场景。使用commonPool能够减少资源的使用,因为当不使用线程池的时候,池中的线程会被缓慢地回收,当使用时线程又会被创建。

某些应用可能要求使用独立的线程池,或者对线程池自定义一些参数。那么可以new一个ForJoinPool,在构造函数中可以指定并行度。若不指定并行度,默认使用处理器可用的核数作为并行度。线程池会通过动态地添加、暂停、恢复内部的工作线程,以维护足够的活跃线程数。即使遇到线程池中某些线程调用join等待其他线程,也会维护线程数使并行度达到给定的数值。但是如果线程遇到了I/O阻塞或者其他未受管理的同步,那么不会保证线程进行并行度调整。

4. Executors

类Executors提供了一些静态方法,能够创建几种线程池的范式。下面我们简要介绍几个静态方法,以及参数和使用场景。

newCachedThreadPool

参数: corePoolSize=0;keepAliveTime=60s,使用SynchronousQueue
场景:适用于执行时间短,需要尽快被调度的任务 

newFixedThreadPool

参数: corePoolSize=maximumPoolSize; keepAliveTime=0,使用LinkedBlockingQueue
场景:适用于执行时间较长的任务

newSingleThreadScheduledExecutor

参数: corePoolSize=maximumPoolSize=1; keepAliveTime=0;使用DelayedWorkQueue(按照Delay从小到大排列的Queue)
场景:适用于周期执行的任务

newWorkStealingPool

参数:parallelism表示并行度,默认值为CPU可用核数。
场景:适合大量的执行时间短的计算型任务

5. 线程池使用实例

下面这个例子演示了使用fixedThreadPool实现了多线程下载,通过复用线程池中的线程避免了频繁地创建与销毁线程。

public class DownloadTask implements Runnable {
    private String url;
    private final String toPath;

    public DownloadTask(String url, String toPath) {
        this.url = url;
        this.toPath = toPath;
    }

    @Override

    public void run() {
        // downloadFile方法实现了下载文件到指定目录
        downloadFile(url, toPath);
    }
}

// 向线程池提交任务
ExecutorService pool = Executors.newFixedThreadPool(10);
for (String url : urls) {
    pool.submit(new DownloadTask(url, toPath));
}
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

四、ThreadPoolExecutor实现

下面我们将会简要的介绍ThreadPoolExecutor的实现思路,首先我们先要了解这个类的成员变量,他们保存了线程池的状态信息。

1. 成员变量

  • ctl变量

ctl变量类型为AtomicInteger,它同时保存了线程池状态与worker的数量。其中高3位用于表示线程的状态,低29位用于表示Worker数量。状态类型包括:RUNNING(-1)、SHUTDOWN(0)、STOP(1)、TIDYING(2)和TERMINATED(3)。该变量的示意图如图3所示。

图3 – ctl变量示意图

为什么要把状态信息和worker数量用一个变量表示,是为了节省空间吗?显然不是,把这两个信息放在同一个变量是为了方便维护状态的一致性。例如把线程池terminate的时候, 需要把线程池的状态置为terminated,并且同时将woker count置0。如果我们使用两个变量,修改时状态可能就会不一致。

  • workQueue

workQueue是用户给定的阻塞队列的实例,在特定条件下(上文已讲过)用户提交的任务将会被加入到队列中。

  • workers

workers是一个HashSet<Worker>(),它存放线程池所有的Worker实例。这里使用了普通的HashSet而不是ConcurrentHashSet,原因在ThreadPoolExecutor中已经给出。

Lock held on access to workers set and related bookkeeping. While we could use a concurrent set of some sort, it turns out to be generally preferable to use a lock. Among the reasons is that this serializes interruptIdleWorkers, which avoids unnecessary interrupt storms, especially during shutdown. Otherwise exiting threads would concurrently interrupt those that have not yet interrupted. It also simplifies some of the associated statistics bookkeeping of largestPoolSize etc. We also hold mainLock on shutdown and shutdownNow, for the sake of ensuring workers set is stable while separately checking permission to interrupt and actually interrupting.

mainLock注释

在注释中提到,使用锁+HashSet可以使interruptIdelWorkers串行执行,这样可以避免中断风暴。这样做还可以简化bookkeeping,例如更新largestPoolSize变量时只需要设置为workers.size即可。

  • mainLock

mainLock是一个ReentrantLock,当修改workers变量时需要持有这个锁。

2. 线程池状态

线程池包括以下几个状态,他们的状态转化条件如图4所示。

  • RUNNING:接受新任务、处理已入队的任务
  • SHUTDOWN:拒接接受新任务,但是处理已入队的任务
  • STOP:不接受新任务、不处理已入队的任务并且打断正在执行的任务
  • TIDYING:所有任务终止,workerCount=0,所有线程都转到TIDYING状态,执行terminated方法
  • TERMINATED:terminated方法调用完成
图4 – 状态转化列表

3. Worker(消费者,真正执行用户任务的实体)

上面几节多次提到了Worker,那它是什么呢?它是ThreadPoolExecutor的一个内部类,它继承了AbstractQueuedSynchronizer,实现了Runnable接口,用户提交的任务都将由Worker来执行。也就是说Worker可以当锁来用,同时是可以运行的。下面是Worker类的实现,一些锁相关的方法已经被去掉。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
}

我们可以观察到,Worker内部有一个Thread的实例thread,它是由ThreadFactory创建得到的,将Worker对象自身传递给Thread,那么当Thread启动时Worker的run方法也就会被调用。run方法又调用了runWorker方法,将this(也就是Worker对象)传给runWorker。那我们接下来看一下runWorker方法是如何实现的。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    // getTask实现了从队列取任务
    while (task != null || (task = getTask()) != null) {
        w.lock();
        if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
            wt.interrupt();
        try {
            beforeExecute(wt, task);
            Throwable thrown = null;
            try {
                task.run(); // 在这里真正执行用户的任务
            } catch ... {
            } finally {
                afterExecute(task, thrown);
            }
        } finally {
            task = null;
            w.completedTasks++;
            w.unlock();
        }
    }
}

我们可以看到runWorker方法首先调用Thread.currentThread获取当前所在的线程。然后取worker的firstTask成员变量得到任务,这个任务是创建Worker时赋予的。当firstTask被消耗完毕后,会调用getTask从队列中取任务,那我我们继续跟踪getTask的实现。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

也许getTask方法代码有些难懂,但是我们可以先从返回值理解。getTask方法是被runWorker方法调用的,一旦getTask方法返回null,那么runWorker的循环也就退出了,意味着Worker的生命周期终止了。那么什么时候Worker需要退出呢?

  • 线程数超过了maximumPoolSize
  • 线程池已停止
  • 线程池shutdown且workQueue为空
  • worker在获取任务的时候等待超时,且allowCoreThreadTimeOut = true或者workerCount > corePoolSize

表达式wc > maximumPoolSize在验证第一项;表达式if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))在验证第二、三项;表达式allowCoreThreadTimeOut || wc > corePoolSize在验证第四项。

如果worker验证通过,不需要退出,则会从workQueue取出用户提交的任务,也就是一个Runnable的实例r,然后返回。我们把目光再回到方法runWorker来,当取得task时,会调用task当run方法,此时用户提交的任务才被真正执行。

4. execute方法(生产者,提交任务的方法)

execute方法是用户来提交一个任务的方法,用户需要传递Runnable的一个实例,用户的业务逻辑在Runnable的run方法中被执行。那我们来看看execute方法的实现。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

方法中的注释清晰地分为三步。第一步:如果线程数量小于corePoolSize,那么就启动一个新线程(也就是Worker),把用户给定的command作为Worker的第一个任务。调用addWorker时会自动检查线程池状态以及worker的数量,如果添加成功会返回true。如果添加失败,那么逻辑继续执行,再一次获取线程池的状态。

第二步:如果任务入队成功,我们还需要再次检查线程池状态,因为上一次检查后,已存在的worker可能死掉了或者执行至此线程池关闭。 所以我们需要重新检查线程池状态,如果线程池关闭我们需要将入队的任务移出。否则我们就得启动新线程来处理任务。

第三步:如果我们入队失败,那么我们得创建新线程处理任务。如果创建新线程失败,就拒绝任务。

接下来我们得看看addWorker是怎么实现的,这个方法的代码超长,但是它唯一做的事情就是创建Worker。我们先来读一下方法的注释:根据当前线程池状态和给定的边界(就是corePoolSize和maximumPoolSize)判定能否能够创建一个新的Worker。如果能的话worker数量将会增加,新的Worker被创建然后启动,执行firstTask。如果线程池停止或者能够shutdown就返回false。如果ThreadFactory没法创建线程也会返回false。失败后,将会回滚线程池到一个干净的状态。

Checks if a new worker can be added with respect to current pool state and the given bound (either core or maximum). If so, the worker count is adjusted accordingly, and, if possible, a new worker is created and started, running firstTask as its first task. This method returns false if the pool is stopped or eligible to shut down. It also returns false if the thread factory fails to create a thread when asked. If the thread creation fails, either due to the thread factory returning null, or due to an exception (typically OutOfMemoryError in Thread.start()), we roll back cleanly.

addWorker方法注释
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    ThreadPoolExecutor.Worker w = null;
    try {
        w = new ThreadPoolExecutor.Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

我们先来读一下第一个超长的If:

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
可以改写成:
        if (rs >= SHUTDOWN &&
                (rs != SHUTDOWN ||
                        firstTask != null ||
                        workQueue.isEmpty()))
            return false;

我们将这个If的求非改写,可以解读为:rs >= SHUTDOWN成立(也就是线程池不处于RUNNING状态,包括SHUTDOWN、STOP、TIDYING和TERMINATED)且

  • 不处于SHUTDOWN状态(也就是STOP、TIDYING、TERMINATED之一)
  • 或者处于SHUTDOWN状态但第一个任务不为空
  • 或者处于SHUTDOWN状态且第一个任务为空且队列为空

满足上述条件,返回false表示创建Worker失败。

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

这是一个for循环,获取当前Worker的数量,校验是否达到上限。如果没有达到,则会增加worker数量。因为可能有多个线程并行地调用addWorker方法,所以采用了CAS操作来实现,并发修改失败会再一次循环来重试。

上面两个验证都过了,我们才被允许真正地创建Worker。首先new一个Worker,然后获取mainLock。然后再做条件校验,如果Worker刚创建完线程就启动了,那么就抛出IllegalThreadStateException异常。暂时我没有想到什么时候会落入这个if,可能是为了保险起见防止ThreadFactory创建完线程就启动。然后将创建的Worker加入workers这个HashSet,更新largestPoolSize变量,启动worker的Thread。启动后Worker的run方法就会被调用,这个时候我们可以参照本节第三条Worker的实现。

如果这期间出现了异常,就会调用addWorkerFailed,将Worker从workers中移出,worker数量减1,然后结束线程池。

五、总结

接下来我们做一个线程池的总结,大致概括为以下三点:

  • 使用线程池可以减少重新创建线程的开销
  • 使用线程池可以容易地控制线程数量减少调度开销
  • 根据应用场景选择合适的Work Queue与调整线程池参数

参考文献

  1. https://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
  2. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
  3. https://www.cnblogs.com/sachen/p/7401959.html
  4. https://stackoverflow.com/questions/41337451/detailed-difference-between-java8-forkjoinpool-and-executors-newworkstealingpool
  5. https://en.wikipedia.org/wiki/Green_threads#Green_threads_in_the_Java_virtual_machine
pwrliang Java, Thread-pool , ,

Leave a Reply

Your email address will not be published. Required fields are marked *