您现在的位置是:网站首页> 内容页

深入并发之(四) 线程池详细分析

  • 老子有钱娱乐网页版
  • 2019-03-20
  • 244人已阅读
简介线程池的分类首先我们需要了解,使用线程池的目的。如果我们有大量的较短的异步任务需要执行,那么我们需要频繁的去创建线程并关闭线程。那么这样做的代价是十分巨大的,因此,我们就采用了一种线程

线程池的分类

首先我们需要了解,使用线程池的目的。如果我们有大量的较短的异步任务需要执行,那么我们需要频繁的去创建线程并关闭线程。那么这样做的代价是十分巨大的,因此,我们就采用了一种线程池的做法,实际上,我们常用了池类方式还有数据库连接池,这种一般是将一些比较珍贵的资源放在池中,然后,每次使用完毕,再将其放回池中,不释放。节约了新建的成本。

下图是线程池的简单类图

一般我们通过Executors这个工厂类来创建线程池,那么,我们来看一下,几种线程池的真面目吧!

对于线程池中使用的任务队列的数据结构,之后会单独开博客分析,这里先有一个简单的认识就好

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}

固定大小的线程池,线程池的基本大小(corePoolSize)为nThreads,最大线程数(maximumPoolSize)也为nThreads,采用了LinkedBlockingQueue队列来存放任务。

LinkedBlockingQueue是基于链表结构的阻塞队列,FIFO,队列属于无界队列。

当线程池中的线程数量小于最大线程数量时,会直接新建线程,执行任务。当线程数量已经达到最大线程数量,并且,没有空闲线程,任务会进入队列中排队,当有空闲线程则会执行。

keepAliveTime线程活动时间,这个值表示,当线程池中的线程大于corePoolSize时,超出corePoolSize的空闲线程最大存活时间,当时间超过keepAliveTime,线程将会结束,当这个值为0表示,线程不会被回收。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}

无界的线程池,并且线程空闲达到一定时间,会被回收,这里设定的时间是60s。

采用的队列是SynchronousQueue,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。

因此,对于这种线程池,当有任务时,如果没有空闲线程,会直接增加线程,执行任务,由于我们将最大线程数设定为Integer.MAX_VALUE,所以,线程池中的线程理论上没有上线,但是机器的性能是有限的,所以如果任务非常多,可能会发生资源耗尽的情况。

newSingleThreadPool

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));}

单线程的线程池,一般情况下,当我们有很多任务需要执行,但是他们并不需要同时执行,或者是有依赖关系的时候,例如B任务必须在A任务之后执行时,我们可以使用newSingleThreadPool

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}

这个线程池一般用来执行定时任务,一般用下面的方法来设置定时任务执行的频率。

pool.scheduleWithFixedDelay(task, 2, 3, TimeUnit.SECONDS);

上面的方法表示,任务task在提交后2秒开始执行,执行完毕后每3秒执行一次。

newWorkStealingPool

public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);}

这个线程池是在Java8中新增的线程池,可以看出,这个线程池实际上是ForkJoinPool,这里就是采用了fork join算法。当一个任务可以分解为多个小任务的时候,我们可以使用这种方式,充分利用CPU的性能。

但是,我们需要注意的是,如果任务十分简单,那么这种拆分方式不仅不会提高效率,有时因为线程切换结果合并等问题可能还会更慢。

分析线程池的submit方法

MyCallable callable = new MyCallable();ExecutorService executorService = Executors.newFixedThreadPool(5);Future<String> future executorService.submit(callable);String s = future.get();System.out.println(s);executorService.shutdown();

这里我们分析的实际上就是上面这段代码中的submit方法。这里我们注意ExecutorServiceAbstractExecutorService的子类,ExecutorService中没有重写方法submit,那么我们调用的实际是父类的方法。

/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}

这里我们看到了在上一篇深入并发之(三) FutureTask与Future之写线程池的时候到底需不需要自己包装FutureTask - 菱灵心 - 博客园中介绍过的方法newTaskFor,作用是将Callable包装为FutureTask,这里不再介绍。

我们可以看到这个方法是直接返回的,这里也对应了第一部分提到的,这种future设计方法的核心,就是无需等待异步任务执行,我们可以让主线程先去做其他任务,稍后我们可以从返回值中获取我们想要的结果。

大致讲解线程池中一些基本内容

线程数与运行状态的保存

需要注意,这一部分的主要内容实际是为了方便下面来查看源码。

首先是ThreadPoolExecutor类。

对于线程池,我们肯定需要知道线程池中现在有多少线程,同时我们也需要知道线程池的状态。那么,在ThreadPoolExecutor中是如何保存这个数据的呢。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

线程池中采取了原子的Integer,并且将32位的整数分为两部分,分别保存runstateworkercount,这种保存数据的方式不是我们第一次见到了,在AQS中的共享锁中也是类似的方式,源码中经常使用的一种写法。workercount被保存在高3位中,余下的低位用来保存runstate

运行状态的变化

这里我们通过一张图来了解线程运行状态的改变过程。

图中上面是线程池的状态,下面是代表状态的常量值。当线程池刚刚创建的时候状态是RUNNING。

分析execute方法

下面进入正题,执行方法。

首先描述一下方法,这个方法主要分为3步:

如果线程数量少于corePoolSize,那么我们直接创建一个新的线程来执行任务,通过调用方法addWorker来检查runstateworkercount,如果无法新增线程,那么这个方法将会返回false。如果任务成功进入队列,我们依然需要double-check是否需要增加一个线程(因为可能有一些线程在上次检查之后死亡),或者线程池的状态已经改变。所以通过二次检查让我们来确定是否需要回滚队列或者增加线程。最后如果插入队列失败,我们再次尝试新增线程执行任务,如果无法新增那么应该是在线程池关闭或者饱和了,执行拒绝策略

给出流程图

//这段代码需要着重注意几处调用addWorker的区别,参数的不同,后面分析完addWorker再回头看才会真正发现调用的目的 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

对应上面的解释与流程图,这段代码的基本内容十分容易理解。

分析addWorker

在分析addWorker方法之前,我们先来看ThreadPoolExecutor类的内部类Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

当我们执行的时候实际会把Runnable再包装为Worker,通过firstTask。

/** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks;

同时,还实现了一个独占锁,这部分代码就是应用了之前讲到的利用AQS实现独占锁的方法。如果不了解可以参照博主的另外两篇文章。

深入并发之(一) 从来ReentrantLock看AbstractQueuedSynchronizer源码 - 菱灵心 - 博客园

两种方式实现自己的可重入锁 - 菱灵心 - 博客园

这里不详细讲解,只给出代码。

protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false;}protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }

下面,我们来看一下其中的重点方法addWorker

这段代码主要可以分为两部分,第一部分是检查线程池的状态,如果不满足条件会直接返回false,然后进入死循环等待成功增加线程,如果增加线程成功,那么就可以进入第二部分,真正新增线程,执行任务。具体的逻辑可以参考代码注释。这里给出一个简要的流程图。

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //线程池状态 int rs = runStateOf(c); /** * 大于等于SHUTDOWN表示线程池已经不应该接受新的任务了,自然应该返回false。 * 但是这里有一个前提就是需要清空已经进入队列的任务。 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //线程数 int wc = workerCountOf(c); /** * 由于位数限制,线程池有一个CAPACITY,所以超出的不能创建线程了。 * 同时,还有核心线程和最大线程数,一般来说线程池中可以创建的线程数在不会超过最大线程数, * 这里通过那个控制主要取决于传入的参数,同时也受创建线程池时设置的最大线程数的控制。 * 可以参考上面的线程池分类的部分查看几种线程池的最大线程数。 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /** * 上面的关卡都过去了,终于可以尝试增加线程数了,这里实际是CAS操作, * 如果不了解,可以到网上搜索,或者参考博主AQS那篇 * 创建成功会直接跳出死循环,进入第二部分,执行任务 */ if (compareAndIncrementWorkerCount(c)) break retry; /** * 重新查看线程池的状态,如果状态改变,那么就需要重新进行上面的状态部分的判断, * 需要跳出这个等待增加线程数的死循环 */ c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); //满足条件就增加成功 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //增加成功就可以执行了,实际调用了Worker的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //如果没有执行,那么有些内容需要回滚 addWorkerFailed(w); } return workerStarted; }

到这里为止,我们对线程池的基本工作原理有了认识,已经深入分析了源码。但是这里我们还需要提出一个问题,我们将任务加入到队列中后,到底线程池是在什么时候由那个方法将其从队列中取出,进行执行的呢?其实这里流程图中已经提到了,当我们将任务加入队列后,会调用addWorker方法,但是并没有传入任务,这里实际上会到队列中取任务,那么取任务的代码在哪里呢?实际上是在Worker类中的run方法,Worker类是一个Runnable接口的实现类,在addWorker方法中调用的t.start();实际调用了Worker的run方法,下一篇博客中,我们将会介绍这个方法中实现的功能。

文章评论

Top