当前位置:网站首页>线程池02--源码
线程池02--源码
2022-07-22 10:44:00 【fengxianaa】
上一篇:https://blog.csdn.net/fengxianaa/article/details/124448911
想要理解源码,看是不行的,必须 debug跟进去
1. 主要变量
ThreadPoolExecutor 类中的属性
//用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程掩码位数,int类型是32位,所以值为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池状态
//(高3位):111 00000000000000000000000000000 接受新任务并且处理阻塞队列里的任务
private static final int RUNNING = -1 << COUNT_BITS;
//(高3位):000 00000000000000000000000000000 拒绝新任务但是处理阻塞队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(高3位):001 00000000000000000000000000000 拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务的线程
private static final int STOP = 1 << COUNT_BITS;
//(高3位):010 00000000000000000000000000000 所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated 方法
private static final int TIDYING = 2 << COUNT_BITS;
//(高3位):011 00000000000000000000000000000 终止状态,terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取高三位 线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取低29位 线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }
1.RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法。
2.RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow() 方法时候。
3.SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。
4.STOP -> TIDYING:当线程池为空的时候。
5.TIDYING -> TERMINATED:当 terminated() hook 方法执行完成时候。
2. execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取当前线程池的状态+线程个数变量的组合值
int c = ctl.get();
/**
* 1. 如果 线程数<corePoolSize,新建线程运行
* 如果,新建线程失败,则继续下一步
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//
return;
c = ctl.get();
}
/**
* 2. 如果 线程池是RUNNING 把任务放到等待队列中
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次校验,如果 线程池状态不是RUNNING,则从队列删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池线程空,则添加一个线程
else if (workerCountOf(recheck) == 0)
// 第一个参数为空,表示该线程需要从队列中获取任务
addWorker(null, false);
}
//3. 如果队列满了,则新增线程,新增失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
3. addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//获取当前线程池的状态+线程个数变量的组合值
int rs = runStateOf(c);//获取线程池状态
//如果 线程池状态>SHUTDOWN,不新增线程
//如果 线程池状态=SHUTDOWN 且 firstTask不是null,不新增线程
//如果 线程池状态=SHUTDOWN 且 firstTask是null 但 workQueue是空的,不新增线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//获取线程数量
// 如果线程个数超限,不新增线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas增加线程个数,某一刻只有一个线程成功,成功后跳出外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 如果cas失败,则看线程池状态是否变化了,
// 是:则跳到外层循环重试重新获取线程池状态,
// 否:内层循环重新cas。
if (runStateOf(c) != rs)
continue retry;
}
}
//到这里说明cas成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;//worker实现了Runnable接口
try {
//创建线程
w = new Worker(firstTask);
/**
Worker(Runnable firstTask) {
setState(-1); // 在执行runWorker前,禁止中断该线程,shutdownNow中会判断该状态,如果>=0才中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
*/
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取状态
int rs = runStateOf(ctl.get());
// 如果线程池是运行状态 或 (SHUTDOWN状态 && firstTask是null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);//把新建的线程放到集合中
int s = workers.size();
// largestPoolSize 记录线程池运行时候一共创建了几个线程
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;//表示线程新增成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
4. runWorker方法
addWorker 方法中的 t.start(),这句代码执行后,会进入 Worker 类的 run 方法,最终进入 runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//将state设置为0,允许中断该线程
w.unlock();
boolean completedAbruptly = true;
try {
//如果任务不为null,或者从任务队列中获取的任务不为null,则执行while循环
while (task != null || (task = getTask()) != null) {
//如果任务不为空,则获取Worker工作线程的独占锁,避免执行任务时其他线程操作这个Worker
w.lock();
//如果 状态>=STOP && 线程没有被中断,那么中断该线程
//如果 线程已中断(interrupted方法会清除中断标记) && 状态>=STOP && 线程没中断,那么中断该线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断线程,这里注意,如果这句代码执行
//我们提交的任务对象的run方法中,如果有阻塞,会报InterruptedException
wt.interrupt();
try {
//执行任务前执行的逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用任务对象的run方法执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后执行的逻辑
afterExecute(task, thrown);
}
} finally {
//任务执行完成后,将其设置为空
task = null;
//完成的任务数量加1
w.completedTasks++;
//释放工作线程获得的锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行退出Worker线程的逻辑
processWorkerExit(w, completedAbruptly);
}
5. getTask方法
//获取一个Task执行,或阻塞,或有超时限制的等待。返回空,则当前线程结束
private Runnable getTask() {
boolean timedOut = false; // 从队列中拿到空的任务时,设置为true,是允许当前线程结束的条件之一
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//当线程池状态是SHUTDOWN且队列为空,或 状态>SHUTDOWN,返回null,即线程池结束
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);//当前线程池数量
//允许当前线程结束的条件之一
//allowCoreThreadTimeOut:是否允许核心线程超时,默认false,可以通过allowCoreThreadTimeOut方法修改
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程数>maximumPoolSize时,返回null,当前线程可以结束
//当线程数>corePoolSize 且 当前线程没有从队列中拿到任务 且 队列为空时,当前线程可以结束
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 当前等待keepAliveTime时间,从队列中获取任务
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();//当前线程阻塞
if (r != null)
return r;
timedOut = true;//获取不到任务,设置为true
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结:当 getTask方法 返回 null 的时候,当前线程结束,有以下几种
- 线程池状态是SHUTDOWN且队列为空
- 线程状态>SHUTDOWN
- 线程数>corePoolSize 且 当前线程没有从队列中拿到任务 且 队列为空
- 线程数>maximumPoolSize时
边栏推荐
猜你喜欢
【FPGA】:MicroBlaze的使用
ASP.NET Core部署手册:4.注意事项和问题排查
Charles 抓包原理与实践
Pytorch dynamically adjusts the learning rate, and the learning rate automatically decreases according to loss
【FPGA】:ip核--ibert
Lire attentivement le document DETR et analyser la structure du modèle
具有任意电容比的共质心电容阵列的自动生成
Latex compiles and reports errors in vscode `recipe terminated with error Retry building the project.
ADB自动化测试框架
【fpga】gtx/gth概述
随机推荐
Pastel: parasitic matching drive layout and wiring of capacitor array with generalized ratio in charge redistribution sar-adc
LeetCode - 整数反转
网络基础知识
自动电流镜布局 (ACML) 工具
spark常见问题
1049 counting ones (30 points)
希尔排序(最小增量排序)
Parasitic sensing common centroid binary weighted capacitor layout generation integrates layout, wiring, and cell capacitor size
mysql使用常见问题
Vimplus modifies the terminal font to droid Sans Mono nerd font
C language bitfield
Binary search (recursive function)
postman接口测试
Intensive reading of Detr paper and analysis of model structure
测试相关基础概念
Performance perception of transistor arrays in analog circuits common centroid layout and wiring align
Process fork
DETR 論文精讀,並解析模型結構
1057 stack (30 points)
Vscode关闭自动更新