目前正在出一个Java多线程专题长期系列教程,从入门到进阶含源码解读, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节主要带大家从ThreadPoolExecutor源码角度来了解一下线程池的工作原理,一起来看下吧~
首先Executor这个接口是线程池实现的顶层接口类,我们上节遇到的ExecutorService也是继承了Executor
public interface ExecutorService extends Executor {...}
ExecutorService的上层AbstractExecutorService这个抽象类实现了接口ExecutorService
public abstract class AbstractExecutorService implements ExecutorService {...}
ThreadPoolExecutor继承了AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {...}
ThreadPoolExecutor这个类我们需要重点看一下,它是接口的实现类,我们以newCachedThreadPool为例
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
可以看到内部其实还是调用了ThreadPoolExecutor,我们再看newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
内部也是调了它, 下面我们就看下这个类
首先我们从构造函数看起,它主要有四个构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
一共有五个参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
其它参数同上
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这个把之前都综合了一下,其实可以看到前几个内部都调用了this,调用自身,也就是调用这个构造函数,进行一些初始化
有几个参数比较好理解,我们来看下这个参数workQueue, 它是一个阻塞队列,这里简要给大家提一下,这块内容也比较重要,后边会专门去讲
BlockingQueue本身是一个还接口,它有几个比较常用的阻塞队列
这个是线程工厂类,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等, 同样它也是一个接口,我们在ThreadPoolExecutor内部看到了 Executors.defaultThreadFactory(),这个是一个默认工厂
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
没有指定参数,就会默认创建DefaultThreadFactory,还有其它的factory,大家可以自行看下,区别就在创建线程时指定的参数
RejectedExecutionHandler同样是一个接口,这个处理器是用来专门处理拒绝的任务,也就是ThreadPoolExecutor无法处理的程序。同理,我们可以看到ThreadPoolExecutor内部有调了defaultHandler
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
这个是默认的拒绝策略, 可以看到它的默认处理是抛出拒绝的异常
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
再带大家看下另外的策略, DiscardPolicy,这个策略不会抛出异常,它会丢弃这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy 该策略丢弃最旧的未处理请求,然后重试execute ,除非执行程序被关闭,在这种情况下任务被丢弃。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 判断是否关闭
if (!e.isShutdown()) {
e.getQueue().poll();
// 任务重试
e.execute(r);
}
}
}
CallerRunsPolicy 直接在execute方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下,任务将被丢弃。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 直接判断是否关闭 未关闭就执行
if (!e.isShutdown()) {
r.run();
}
}
}
看完构造函数,下面看下它的一些常量
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
通过变量名,我们大致知道是用来表示线程池的状态。线程池本身有一个调度线程,这个线程就是用于管理整个线程池的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等,所以它本身也有上面的状态值。
当线程池被创建后就会处于RUNNING状态, 主池控制状态ctl是一个原子整数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
调用shutdown()方法后处于「SHUTDOWN」状态,线程池不能接受新的任务,清除一些空闲worker,不会等待阻塞队列的任务完成。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
另外,还有一个shutdownNow,调用后处于「STOP」状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 将任务队列排空到一个新列表中 这里要注意下
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为「TIDYING」状态。接着会执行terminated()函数。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 不等于0时 中断任务线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将状态设置为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 终止
terminated();
} finally {
// 执行完 terminated 转为 TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
这个是执行任务的核心方法,我们一起看一下
public void execute(Runnable command) {
// 如果任务不存在 抛空异常
if (command == null)
throw new NullPointerException();
// 获取当前状态值
int c = ctl.get();
// 当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果不小于corePoolSize,则将任务添加到workQueue队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果放入workQueue失败,则创建非核心线程执行任务,
else if (!addWorker(command, false))
// 如果这时创建失败,就会执行拒绝策略。
reject(command);
}
在源码中,我们可以看到,多次进行了isRunning判断。在多线程的环境下,线程池的状态是多变的。很有可能刚获取线程池状态后线程池状态就改变了
下面给大家简要的总结一下线程池的处理流程
它的源码还是比较长的,一篇文章说不清楚,有兴趣的同学可以通过本篇文章的理解继续阅读它的源码。
下一节, 继续带大家详探讨ThreadPoolExecutor中是如何进行线程复用 ~
留言与评论(共有 0 条评论) “” |