当前位置: 首页 > >

【J.U.C详解】--线程池ThreadPoolExecutor源码解析

发布时间:

一、线程池介绍
1.为什么需要线程池?

线程池能够对线程进行统一分配,调优和监控:


降低资源消耗(线程无限制地创建,然后使用完毕后销毁)提高响应速度(无须创建线程)提高线程的可管理性

从JDK 5开始,把工作单元与执行机制分离开来,工作单元包括Runnable和Callable,而执行机制有Executor框架提供。


工作单元也就是任务:


Runnable 没有返回值Callable代返回值 结合Future使用,默认实现类FutureTask

执行器:Executor 接受一个Runnable对象。它有一个子接口ExecutorService


public interface ExecutorService extends Executor {
void shutdown();
List shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

/*新增submit方法,提供带返回值的任务*/
Future submit(Callable var1);
Future submit(Runnable var1, T var2);
Future submit(Runnable var1);

List> invokeAll(Collection> var1) throws InterruptedException;
T invokeAny(Collection> var1) throws InterruptedException, ExecutionException;

}

二、ThreadPoolExector源码分析
1.线程池参数详解

/**
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 等待销毁的时间
* @param unit 时间单位
* @param workQueue 存储任务的阻塞队列
* @param threadFactory 创建线程的工厂
* @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

}

1)corePoolSize


核心线程数,当线程池被创建的时候,核心线程数为0,有任务来的时候,判断当前线程数小于核心线程数,就会创建一个Worker,核心线程数+1。又继续提交一个任务,如果当前线程数大于等于核心线程数,就会添加到同步队列。
在ThreadPoolExecutor源码里面,AtomicInteger存放核心线程数,采用CAS进行+1。同时注意,该变量的高三位值保存的是线程池的状态,低29位保存核心线程数。


/** packing two conceptual fields*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

2)maximumPoolSize


最大线程数,这个的意思就是当阻塞队列满了的时候,新提交的任务就会创建新的线程,线程数+1。所以阻塞队列如果不设置容量的话,这个值可能就没什么用。同时keepAliveTime也就不会触发。同时,如果最大线程数也满了的时候,再提交任务,就会走拒绝策略RejectedExecutionHandler 。


3)keepAliveTime


线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止。源码怎么实现的呢?


/*timed什么时候为true?当workCount>corePoolSize或者设置了允许核心线程数timeOut*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();

runWork一旦getTask为null,该worker就会被踢掉

while (task != null || (task = getTask()) != null) {

4)unit

枚举类,天、小时、分钟、秒等等


5)workQueue


用来保存等待被执行的任务的阻塞队列. 在JDK中提供了如下阻塞队列:
(1) ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
(2) LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene(LinkedBlockingQueue维护了两把锁);
(3) SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
(4) priorityBlockingQuene:具有优先级的无界阻塞队列;


6)threadFactory


有一个默认的实现类


7)RejectedExecutionHandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:


AbortPolicy:直接抛出异常,默认策略;CallerRunsPolicy:用调用者所在的线程来执行任务;DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。


2、线程池状态

在前面讲了,ctl变量有两个作用,一个是存放了workerCount,另一个就是存放了线程池的状态。
AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:


1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
111 - 0000000000000000000000000000(十进制: -536, 870, 912);2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
000 - 0000000000000000000000000(十进制: 0);3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
001 - 00000000000000000000000000(十进制: 536,870, 912);4、TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
010 - 00000000000000000000000000.(十进制值: 1, 073, 741, 824);5、TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成
101 - 000000000000000000000000000(十进制值: 1, 610,612, 736)

/** packing two conceptual fields*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

3、ThreadPoolExector执行流程源码分析

话不多说,直接上源码:


public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. 如果当前线程数小于核心线程数,addWorker(command, true),即创建新的线程执行当前任务。否则
*
* 2. 添加到队列
*
* 3. 如果队列满了,如果当前线程数,没有达到最大线程数,创建新的线程去执行当前任务。如果达到最大线程数,
* 则走拒绝策略:reject(command);
* 注意:为什么要等队列满了再创建线程呢?主要是因为创建worker的时候,会使用全局锁,这样执行任务方法就会变慢。
*/
int c = ctl.get();//获取ctl变量 workerCountOf(c):取出当前worker数量
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//添加worker true表示为核心线程数
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//添加到队列成功
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//创建非核心线程 worker
reject(command);//添加失败则走拒绝策略
}



/**
*添加worker,即添加线程
* @param firstTask the task the new thread should run first
* @param core if true 核心线程
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 检查状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//CAS 将workCount ctl +1
break retry; //跳出循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//创建worker 构造函数会使用线程工厂创建一个Thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加锁,将worker 添加到works = new HashSet变量
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//线程启动,执行run方法 runWorker();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}


/** t.start();//线程启动,执行run方法 runWorker();this代表worker 里面有一个fristTask,不为空就先执行这个task */
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/*
主要的方法,worker的firstTask不能空,先执行,然后getTask()从队列里面取,里面会判断走take方法还是poll方法,
take会阻塞,poll用于超时,比如当前线程数大于核心线程数,队列也为空,就会调用poll等待超时获取,失败返回false
整个循环就结束了,那么这个线程也会被踢掉,woker数量就会-1
*/
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}



友情链接: