1. 简介
在引入线程池之前,我们先来了解几个事情:
- 线程的创建和销毁是有代价的,如线程创建需要时间和相关计算资源。如果在Web服务器上为每个来到的请求都创建一个线程,而大多数请求都是轻量级的处理过程。那么创建线程的代价与请求处理的代价相比就非常大了,导致影响整体性能。
- 当线程数量达到能让CPU忙绿起来的时候,此时再创建线程,线程也基本处于闲置状态,这时候多出来的线程除了占用内存外,还可能因为与其他线程争用CPU资源导致出现其他性能开销.
- 在可创建线程的数量上存在一个限制,如果超过这个限制,可能会抛出
OutOfMemoryError
异常。
这时候如果能出现一个东西能够对线程的生命周期进行管理,对现有的线程重复利用,并且能够以一种简单的方式将任务的提交与执行相解耦。没错,这就是线程池(Thread Pool),在要了解Java中的线程池,首先必须了解ThreadPoolExecutor
这个类。
2. ThreadPoolExecutor详解
类继承图
构造函数
1 | //线程池配置信息,volatile修饰保证变量在多线程下的可见性 |
从上面的JDK中ThreadPoolExecutor
类的构造函数源码看出该构造函数一共有7个参数,下面介绍七个参数的含义:
参数 | 含义 |
---|---|
corePoolSize |
基本大小,即线程池中的核心线程数 |
maximumPoolSize |
最大大小,即线程池中允许的最大线程数 |
keepAliveTime |
存活时间,当线程的没执行任务时,空闲的时间超过了这个时间就会被标记为可回收,直到线程池的大小超过基本大小,被标记的线程就会被终止 |
unit |
keepAliveTime 的单位,有DAYS 、HOURS 、MINUTES 、SECONDS 、MILLISECONDS 、MICROSECONDS 、NANOSECONDS 7个单位可选 |
workQueue |
工作队列,一个用来保存等待被执行的任务的阻塞队列 |
threadFactory |
线程工厂。线程池在创建线程时通过调用线程工厂的Thread newThread(Runnable r) 来创建线程 |
handler |
饱和策略。当阻塞队列已满、线程池当前的线程数已达到最大值且没有线程处于空闲状态时,此时对于提交过来的任务将执行饱和策略。(如果某个任务提交到一个已关闭的Executor时,也会执行饱和策略) |
ThreadPoolExecutor
类中有四个重载的构造函数,每个构造函数都必须指定上表中的前5个参数,最后两个参数可以随意指定,不指定的话构造函数会使用默认的线程工厂和饱和策略:
线程工厂(ThreadFactory)
线程池创建线程都是通过的ThreadFactory
的Thread newThread(Runnable r)
方法来创建的。下面是Executors
类里的默认线程工厂方法的源码。
1 | static class DefaultThreadFactory implements ThreadFactory { |
从上面可以看出默认线程工厂创建出的是一个非守护、优先级为Thread.NORM_PRIORITY
的线程。如果想要自己定制线程工厂满足需求,只需实现ThreadFactory
接口的Thread newThread(Runnable r)
方法。
饱和策略(RejectedExecutionHandler)
JDK中的ThreadPoolExecutor
类提供了4种不同的RejectedExecutionHandler
实现:
AbortPolicy
默认的饱和策略,该策略抛出未检查(运行时异常)的RejectedExecutionException
。DiscardPolicy
不执行任何操作,直接抛弃任务CallerRunsPolicy
在调用者线程中执行该任务DiscardOldestPolicy
丢弃阻塞队列中的第一个任务, 然后重新将该任务交给线程池执行
同样的,可以通过实现RejectedExecutionHandler
接口自定义饱和策略。
线程池状态和线程数量
1 | //代表线程池当前状态和线程数量的原子变量 |
AtomicInteger
类型的变量ctl
用高3位来表示当前线程池状态,低29位来表示当前的线程数。
Java线程池有5种不同的状态,分别为运行(RUNNING
)、关闭(SHUTDOWN
)、停止(STOP
)、整理(TIDYING
)、结束(TERMINATED
)。
在ThreadPoolExecutor
里由5个整型常量表示,每个整型常量的都由高3位表示状态:
RUNNING
高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务SHUTDOWN
高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务。调用void shutdown()
方法实现STOP
高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务。调用List<Runnable> shutdownNow()
实现。TIDYING
高3位为010,当线程池关闭后阻塞队列的任务已完成或线程池停止,然后workerCount
(当前线程数量)为0,线程池进入该状态后会调用terminated()
方法进入TERMINATED
状态。TERMINATED
高3位为011启动线程池
当创建完一个ThreadPoolExecutor
对象后,线程池里并没有线程。一般都是调用void execute(Runnable command)
执行任务时才创建线程并启动,不过可以通过调用如下方法预先创建核心线程并启动(在addWorker方法里启动):1
2
3
4
5
6public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
执行过程
如上图所示,当调用void execute(Runnable command)
这个方法执行任务时:
- 判断当前线程池线程数量是否小于核心线程池大小,是则创建线程并启动,否则到第2步
- 判断任务队列是否已满,未满则将任务加入阻塞队列,已满则到第3步
- 判断当前线程池线程数量是否小于最大线程池大小,是则创建线程并启动,否则执行饱和策略
1 | public void execute(Runnable command) { |
addWorker方法
boolean addWorker(Runnable firstTask, boolean core)
方法的作用就是创建Worker
对象并启动这个对象里的线程(Worker
里一个Thread
类型的字段)。
1 | private final ReentrantLock mainLock = new ReentrantLock(); |
Worker类
线程池维护的线程其实是一组Worker对象,Worker封装了线程也继承了AbstractQueuedSynchronizer
类并实现了Runnable
接口,重写了void run()
方法。至于为什么要继承AbstractQueuedSynchronizer
类,请看下面的runWorker
方法讲解。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
//绑定这个对象线程已执行完成的任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
//阻止中断,在任务获取前不允许中断
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//线程启动时执行的方法
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//获取锁,不可重入
public void lock() { acquire(1); }
//尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
//释放锁
public void unlock() { release(1); }
//判断锁是否被独占
public boolean isLocked() { return isHeldExclusively(); }
//中断已开始执行的线程,这个就是为什么要设置setState(-1)的一个原因了,这个方法会被`shutdownNow()`方法调用。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker方法
上面说到为什么Worker
类要继承AbstractQueuedSynchronizer
,其实是要用锁的状态来区分空闲线程和非空闲线程,在执行runWorker
方法中:
- 获取任务时没有加锁(空闲状态,可中断线程)
- 要执行任务时才加锁(不允许中断线程)
在调用void tryTerminate()
和void shutdown()
这两个方法时,会中断空闲线程,所以没有在执行任务的线程就可能被中断。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //允许中断,与Worker构造函数的setState(-1)是一对的
boolean completedAbruptly = true;
try {
//获取到任务才进入循环
while (task != null || (task = getTask()) != null) {
//加锁,表示非空闲状态
w.lock();
//1. 如果线程池状态大于等于STOP并且本线程未中断,则应该执行中断方法
2. 或者执行Thread.interrupted()方法判断本线程是否中断并且清除中断状态,
如果发现线程池状态大于等于STOP则执行中断方法。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//ThreadPoolExecutor中的beforeExecute(wt, task)方法一个空方法,用来留给继承ThreadPoolExecutor的类
来重写该方法并在任务执行前执行
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行获取到的任务
task.run();
![](https://user-gold-cdn.xitu.io/2017/12/30/160a72c7c9f35844?w=977&h=318&f=png&s=14007)
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//ThreadPoolExecutor中的afterExecute(task,thrown)方法也是一个空方法,用来留给继承
ThreadPoolExecutor的类来重写该方法并在任务执行后执行
afterExecute(task, thrown);
}
} finally {
task = null;
//该线程执行的任务加1,即使抛出异常
w.completedTasks++;
//释放锁,表示回到空闲状态
w.unlock();
}
}
//执行到这一步表示是由于获取不到任务而正常退出的,所以completedAbruptly为false
completedAbruptly = false;
} finally {
//无论怎样退出都要执行
processWorkerExit(w, completedAbruptly);
}
}
getTask方法
1 | private Runnable getTask() { |
processWorkerExit方法
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
下图是向线程池提交任务后,线程池的正常执行过程:
tryTerminate方法
terminate
(结束)是线程池的最后一个状态,只能由关闭或停止状态转变为结束状态。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39final void tryTerminate() {
for (;;) {
int c = ctl.get();
//如果满足下面任意一个条件就没办法到达结束状态
1. 线程池处于运行状态
2. 线程池状态是TIDYING或已经是结束状态
3. 线程池处于关闭状态且任务队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//当前线程数量不为0也无法到达结束状态
if (workerCountOf(c) != 0) {
//中断一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//尝试将线程池状态设置为TIDYING,失败重循环开始处开始
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//terminated()是一个空方法,留给继承ThreadPoolExecutor的类覆盖
terminated();
} finally {
//尝试将线程池状态设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
关闭操作
我们可以通过调用void shutdown()
方法关闭线程池,关闭后线程池后不允许接受新任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略判断
checkShutdownAccess();
//设置线程池状态为SHUTDOWN状态
advanceRunState(SHUTDOWN);
//中断所有空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试结束线程池
tryTerminate();
}
停止操作
我们可以在运行和关闭状态下通过调用void shutdownNow()
方法停止线程池,停止后线程池后不允许接受新任务,也不会执行阻塞队列里的任务,还会中断当前所有的线程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略判断
checkShutdownAccess();
//设置线程池状态为STOP状态
advanceRunState(STOP);
//中断所有线程,不管是空闲还是非空闲
interruptWorkers();
//取出阻塞队列的所有任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试结束线程池
tryTerminate();
return tasks;
}
3. 线程池的配置
Executors
提供了四种静态工厂方法来创建四种不同配置的线程池:
newFixedThreadPool(int nThreads)
接受一个int类型的nThreads变量,创建一个核心线程数和最大线程数都为
nThreads
的线程池(即最大线程数为nThreads),且使用一个无界的阻塞队列LinkedBlockingQueue
。如果不设置核心线程超时的话,创建的线程是不会超时的。1
2
3public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}newSingleThreadExecutor()
创建一个核心线程数和最大线程数都为1的线程池(即最大线程数为1),且使用一个无界的阻塞队列
LinkedBlockingQueue
,不设置核心线程超时的话,创建的线程也是不会超时的。唯一线程可以保证任务的顺序执行,如果这个唯一的线程执行过程中因为异常而结束的话,在processWorkerExit
方法最后会判断是否因异常而结束而创建一个新线程继续运行。1
2
3
4
5
6public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}newCachedThreadPool()
创建一个核心线程数为0,最大线程数为
Integer.MAX_VALUE
的线程池,超时时间为60秒,所以线程空闲时间超过60秒就会被回收。使用了一个同步队列作为阻塞队列,同步队列不存储元素,且在一端进行插入,另一端要有移除操作插入才会成功,否则插入操作会阻塞等待。1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}newScheduledThreadPool()
创建一个核心线程数为
corePoolSize
的线程池,用于指定的时间内周期性的执行所的任务。ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
。1
2
3public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}