当前位置:网站首页>Thread pool 02 -- source code
Thread pool 02 -- source code
2022-07-22 20:54:00 【fengxianaa】
Last one :https://blog.csdn.net/fengxianaa/article/details/124448911
Want to understand the source code , I don't think so , must debug Follow in
1. The main variable
ThreadPoolExecutor Properties in a class
// Used to mark thread pool status ( high 3 position ), Number of threads ( low 29 position )
// The default is RUNNING state , The number of threads is 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Thread mask bits ,int The type is 32 position , So the value is 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// Maximum number of threads ( low 29 position )00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// Thread pool state
//( high 3 position ):111 00000000000000000000000000000 Accept new tasks and process tasks in the blocking queue
private static final int RUNNING = -1 << COUNT_BITS;
//( high 3 position ):000 00000000000000000000000000000 Reject the new task but process the task in the blocking queue
private static final int SHUTDOWN = 0 << COUNT_BITS;
//( high 3 position ):001 00000000000000000000000000000 Reject the new task and discard the task in the blocking queue , At the same time, it will interrupt the thread of the task being processed
private static final int STOP = 1 << COUNT_BITS;
//( high 3 position ):010 00000000000000000000000000000 All tasks are completed ( Include tasks in the blocking queue ) The current thread pool active thread is 0, Will call terminated Method
private static final int TIDYING = 2 << COUNT_BITS;
//( high 3 position ):011 00000000000000000000000000000 Termination status ,terminated State after method call completion
private static final int TERMINATED = 3 << COUNT_BITS;
// Get the top three Thread pool state
private static int runStateOf(int c) { return c & ~CAPACITY; }
// Get low 29 position Number of threads
private static int workerCountOf(int c) { return c & CAPACITY; }
// Calculation ctl The new value , Thread state And Number of threads
private static int ctlOf(int rs, int wc) { return rs | wc; }
1.RUNNING -> SHUTDOWN: Explicit call shutdown() Method , Or implicitly called finalize(), It calls shutdown() Method .
2.RUNNING or SHUTDOWN -> STOP: Explicit call shutdownNow() When it comes to methods .
3.SHUTDOWN -> TIDYING: When both thread pool and task queue are empty .
4.STOP -> TIDYING: When thread pool is empty .
5.TIDYING -> TERMINATED: When terminated() hook When method execution is complete .
2. execute Method
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// Get the status of the current thread pool + Combined value of thread number variable
int c = ctl.get();
/**
* 1. If Number of threads <corePoolSize, Create a new thread to run
* If , New thread failed , Then continue to the next step
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//
return;
c = ctl.get();
}
/**
* 2. If The thread pool is RUNNING Put the task in the waiting queue
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Check again , If Thread pool state is not RUNNING, Then delete the task from the queue , And execute the rejection policy
if (! isRunning(recheck) && remove(command))
reject(command);
// If the current thread pool thread is empty , Add a thread
else if (workerCountOf(recheck) == 0)
// The first parameter is empty , Indicates that the thread needs to get tasks from the queue
addWorker(null, false);
}
//3. If the queue is full , Then add a new thread , Reject policy if adding fails
else if (!addWorker(command, false))
reject(command);
}
3. addWorker Method
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();// Get the status of the current thread pool + Combined value of thread number variable
int rs = runStateOf(c);// Gets the thread pool state
// If Thread pool state >SHUTDOWN, No new threads
// If Thread pool state =SHUTDOWN And firstTask No null, No new threads
// If Thread pool state =SHUTDOWN And firstTask yes null but workQueue It's empty. , No new threads
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);// Get the number of threads
// If the number of threads exceeds the limit , No new threads
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas Increase the number of threads , Only one thread succeeds at a moment , Jump out of the outer loop after success
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// If cas Failure , Whether the thread pool state has changed ,
// yes : Then skip to the outer loop and try to get the thread pool state again ,
// no : Inner circulation re cas.
if (runStateOf(c) != rs)
continue retry;
}
}
// So let's go over here cas success
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;//worker Realized Runnable Interface
try {
// Create thread
w = new Worker(firstTask);
/**
Worker(Runnable firstTask) {
setState(-1); // In execution runWorker front , It is forbidden to interrupt this thread ,shutdownNow The status will be judged in , If >=0 Just interrupt
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
*/
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// To obtain state
int rs = runStateOf(ctl.get());
// If the thread pool is running or (SHUTDOWN state && firstTask yes null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);// Put the new thread into the collection
int s = workers.size();
// largestPoolSize Record how many threads are created during the thread pool run
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;// Indicates that the thread is successfully added
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
4. runWorker Method
addWorker Methods t.start(), After the code is executed , Will enter Worker Class run Method , End up in runWorker Method
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// take state Set to 0, Allow the thread to be interrupted
w.unlock();
boolean completedAbruptly = true;
try {
// If the mission is not null, Or the task obtained from the task queue is not null, execute while loop
while (task != null || (task = getTask()) != null) {
// If the task is not empty , Then get Worker Worker thread exclusive lock , Avoid other threads operating this when executing tasks Worker
w.lock();
// If state >=STOP && The thread is not interrupted , Then interrupt the thread
// If Thread interrupted (interrupted Method will clear the interrupt flag ) && state >=STOP && Thread is not interrupted , Then interrupt the thread
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// Interrupt threads , Note here , If this code executes
// The task object we submitted run In the method , If there's a blockage , Will be submitted to the InterruptedException
wt.interrupt();
try {
// The logic executed before the task is executed
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Call the of the task object run Methods to perform tasks
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// The logic executed after the task is executed
afterExecute(task, thrown);
}
} finally {
// When the task is completed , Set it to empty
task = null;
// The number of tasks completed plus 1
w.completedTasks++;
// Release the lock obtained by the worker thread
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Execute exit Worker The logic of threads
processWorkerExit(w, completedAbruptly);
}
5. getTask Method
// Get one Task perform , Or blocked , Or wait with timeout limit . Returns an empty , Then the current thread ends
private Runnable getTask() {
boolean timedOut = false; // When getting an empty task from the queue , Set to true, Is one of the conditions that allow the current thread to end
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// When the thread pool state is SHUTDOWN And the queue is empty , or state >SHUTDOWN, return null, That is, the thread pool ends
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);// The current number of thread pools
// One of the conditions that allow the current thread to end
//allowCoreThreadTimeOut: Allow core thread timeout , Default false, Can pass allowCoreThreadTimeOut Methods to modify
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// Number of threads >maximumPoolSize when , return null, The current thread can end
// When the number of threads >corePoolSize And The current thread does not get the task from the queue And When the queue is empty , The current thread can end
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// Currently waiting for keepAliveTime Time , Get the task from the queue
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();// The current thread is blocked
if (r != null)
return r;
timedOut = true;// Unable to get task , Set to true
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
summary : When getTask Method return null When , End of current thread , There are the following
- The thread pool state is SHUTDOWN And the queue is empty
- Thread state >SHUTDOWN
- Number of threads >corePoolSize And The current thread does not get the task from the queue And The queue is empty
- Number of threads >maximumPoolSize when
边栏推荐
猜你喜欢
使用多种加权方法在 SAR ADC 中放置二进制加权电容阵列
Intensive reading of Detr paper and analysis of model structure
Multithread 01 -- create thread and thread state
ADB自动化测试框架
Using various weighting methods to place binary weighted capacitor array in SAR ADC
Redis series 12 -- redis master-slave
Install pycharm
LeetCode高频题:擂台赛n名战士战斗力最接近的两名战士,战斗力之差为多少
Jmeter性能测试
【FPGA】:ip核--ibert
随机推荐
1076 forwards on Weibo (30 points)
自动电流镜布局 (ACML) 工具
数据倾斜
Deformable Detr paper accuracy, and analyze the network model structure
Common centroid capacitor layout generation considering device matching and parasitic minimization
多线程01--创建线程和线程状态
【FPGA】:ip核--XADC
spark常见问题
多线程05--Lock
c语言字符串
浅析c指针
Redis 系列13--Redis 哨兵
Redis series 13 -- redis Sentinel
Common centroid layout of active and passive equipment: review and future road
线程池01--基础使用
Monkey 介绍及使用
Gradient sensitivity reduction of current mirror with non rectangular layout 1 Structure
New attribute of class (elementary understanding)
结构体和联合体
【面试:基础篇05:快速排序】