UML classDiagram
class Executor {
<<interface>>
+execute()
}
class ExecutorService {
<<interface>>
+awaitTermination()
+invokeAll()
+invokeAny()
+isShutdown()
+isTerminated()
+shutdown()
+shutdownNow()
+submit()
}
class AbstractExecutorService {
<<abstract>>
+...()
+newTaskFor()
}
class ThreadPoolExecutor
Executor <|-- ExecutorService
ExecutorService <|.. AbstractExecutorService
AbstractExecutorService <|-- ThreadPoolExecutor
构造参数介绍 这里先介绍参数最全的构造参数
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
核心参数
说明
int corePoolSize
核心线程数 常驻线程数,空闲也不会回收 除非设置 allowCoreThreadTimeOut = true
int maximumPoolSize
最大线程数 当任务队列满时,会创建新线程直至此上线
long keepAliveTime
非核心线程允许空闲的时间,超过则回收
TimeUnit unit
keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue
任务队列,保存等待执行任务的阻塞队列 核心线程满之后的,会先加入任务队列
ThreadFactory threadFactory
线程工厂 用于设置线程名称、优先级、守护状态、创建方式等
RejectedExecutionHandler handler
拒绝策略 当 任务队列已满 且 达到最大线程数 触发
参数最少的构造器如下:
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
这里有2个默认值
默认线程工厂 Executors.defaultThreadFactory()
默认拒绝策略 defaultHandler = new AbortPolicy()
任务提交流程 ThreadPoolExecutor#execute 方法整体流程
判断 当前线程数 workers.size < corePoolSize ?
为真,则创建核心线程并执行任务
为假,判断任务队列 workQueue 是否已满?
未满,则加入 workQueue
已满,再判断 当前线程数 workers.size < maximumPoolSize?
为真,则创建临时线程线程执行任务
为假,则触发拒绝策略 handler
其他重要参数 线程池运行状态与有效线程数 线程池的运行状态 runState(rs) 与 有效线程数 workerCount(wc)
都是打包在一个32位原子整数 ctl 中控制实现的,就是用一个数字能同时表示2个含义
1 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));
其中高3位 用来表示运行状态rs,低29位 表示有效线程数wc
线程池的运行状态有5个,都是32位的整数,其中COUNT_BITS=29
1 2 3 4 5 6 7 private static final int COUNT_BITS = Integer.SIZE - 3 ;private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
下面详细解释下各状态的含义以及二进制表示,其中省略位都是0
状态
二进制
意义
RUNNING
1100…0000
接受新execute 的任务;执行已入队 的任务
SHUTDOWN
0000…0000
不接受新execute 的任务;但执行已入队 的任务;中断所有空闲的线程
STOP
0010…0000
不接受新execute 的任务;不执行已入队 的任务;中断所有的线程
TIDYING
0100…0000
所有线程停止;workerCount数量为0;将执行钩子方法 terminated()
TERMINATED
0110…0000
terminated()方法执行完毕
观察可知,这5个状态都是只有高3位不同,低29位都是0
这样在使用初始化方法ctlOf(),传入目标状态与有效线程数,在按位或运算下,其效果就是
不管 wc 高3位如何,结果的高3位就取决于 rs 的高3位
因为 rs 低29位都是0,所以结果的低29位取决于 wc 的低29位
1 private static int ctlOf (int rs, int wc) { return rs | wc; }
线程池也提供了2个打包方法,用来分别提取 ctl 中运行状态与有效线程数的方法
1 2 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;private static int runStateOf (int c) { return c & ~CAPACITY; }private static int workerCountOf (int c) { return c & CAPACITY; }
其中 CAPACITY就是$2^29$ = 0001…1111 (二进制下低位29个1)正是当做ctl中的区分位,具体作用如下
方法
方法体
带入CAPACITY的值
作用
runStateOf
c & ~CAPACITY
c & 11100000000000000000000000000000
提取ctl高3位
workerCountOf
c & CAPACITY
c & 00011111111111111111111111111111
提取ctl低29位
就这样线程池将运行状态与有效线程数量2个属性,打包在一个32位的整数中
并提供了初始化与分别解包提取的方法
线程池中的锁 1 private final ReentrantLock mainLock = new ReentrantLock ();
这把可重入锁在很多地方都会使用到。
比如对线程集合 workers 的访问与操作、读取曾最大线程数、读取已完成任务数等。
线程集合 1 private final HashSet<Worker> workers = new HashSet <Worker>();
用来保存当前线程池中的所有线程;
可通过该集合对线程池中的线程进行中断 , 遍历 等;
创建新的线程时, 要添加到该集合, 移除线程, 也要从该集合中移除对应的线程;
对该集合操作都需要mainLock锁.
2个可监控的数量 1 2 3 4 private int largestPoolSize;private long completedTaskCount;
都是通过 mainLock 来获取保证数据的原子性和可见性
源码解析 execute(): 将任务提交到线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public void execute1 (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
addWorker(): 创建启动线程并执行任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false ; for (; ; ) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker对象: 包装过的线程任务 什么是 Worker 上文提到线程集合的对象是Worker而不是Thread
而Worker更像是一个”打包好”的员工:
里面装着线程 Thread
本地要干的第一个任务
已完成的任务计数
再顺带自带一把小锁(AQS)
在 JDK1.8 里 Worker 是 ThreadPoolExecutor 的内部类,大概声明如下:
1 2 3 4 5 6 7 8 9 10 11 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; }
这里涉及几个关键的信息:
Worker 不是单纯的 Thread,而是 线程 +状态 + 锁 的组合体
是 Runnable ,线程启动后会执行自身的 run() 方法
继承了 AQS 实现了一个简易的 0/1互斥锁,用来判断当前线程是否在正在执行任务
构造方法 1 2 3 4 5 Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); }
说明点:
setState(-1): 将AQS的state设置成-1,这是一个特殊值避免真正runWorker前被中断或者回收
firstTask: 就是在调用 execute() 时,新建 Worker 时新建的 task
Thread = new thread(this):线程启动后会执行 Worker.run() 而不是直接提交的任务
使用AQS实现的小锁 Worker 继承了 AQS,但只用到了独占模式 ,而且也很简单:就是一个 0/1 状态的非重入锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 protected boolean isHeldExclusively () { return getState() != 0 ; }protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; }protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; }public void lock () { acquire(1 );}public boolean tryLock () { return tryAcquire(1 );}public void unlock () { release(1 );}public boolean isLocked () { return isHeldExclusively();}
可以直接把它理解为:Worker 自己身上有一把“我现在是否在执行任务”的互斥锁。
后面线程池很多逻辑(比如 interruptIdleWorkers)都要依赖这把锁的状态——成功 tryLock() 说明当前 worker 是空闲的。
run(): 真正的线程任务runWorker() Worker 作为 Runnable,它的 run() 非常简单:
1 2 3 public void run () { runWorker(this ); }
也就是说:
真正的任务循环逻辑在 ThreadPoolExecutor.runWorker(Worker w) 里
Worker 只是把自己(包含 thread / firstTask / 锁)交给 runWorker()
这在设计上有两个好处:
Worker 只负责“绑定线程和状态 ”
核心调度逻辑统一放在 ThreadPoolExecutor 里,便于管理、扩展和覆写钩子方法(beforeExecute / afterExecute)
runWorker(): Worker 主要工作流程 执行提交的task或死循环从BlockingQueue获取task.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
getTask(): 从任务队列中获取task getTask() 负责从队列里取任务并控制 worker 的回收。
runWorker 里如果 getTask() 返回了 null,就跳出 while 循环,最终由 processWorkerExit 把这个 Worker 清理掉。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private Runnable getTask () { boolean timedOut = false ; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }