今天我们来一起盘一盘应用场景很多的实现类,虽然我们不经常直接使用这些类,但是在各种地方都有它们的身影(比如 线程池 ),且它们很容易搞混,我称之为“ 可知结果的未来任务——FutureTask ” (叫法有点拗口哈,因为我实在想不出更贴切的名字了~ )
由于涉及到的接口定义和实现比较多(看标题就知道了。。), 我先让它们各自做个自我介绍,然后我们最后来看看所谓的“ 可知结果的未来任务 ”的源码实现 。
在之前介绍线程的时候,我们就知道了,创建线程的很多方法,其中实现Runnable接口就是其中的方式,其实就是定义了这个线程run方法中做了哪些事情:
源码中的注释还是比较好理解的,但是我们会发现,这个run方法没有任何返回值,也就是说我们根本就不知道,run方法的执行结果到底怎么样,这时Doug Lea大神就定义了另外一套接口 Callable 系列接口。
要想run方法有结果,最直接的方法就是将返回值类型void改成特定的类,好在Java有泛型编程,如下源码实现:
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
*
The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param the result type of method {@code call}
*/
@FunctionalInterface
public interface Callable {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
复制代码
我们和Runnable简单比较下:
public class CallableDemo {
public static void main(String[] args) throws Exception {
Callable callable = new Callable() {
@Override
public String call() throws Exception {
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "处理业务中。。。" + i);
}
return "122333333";
}
};
System.out.println(Thread.currentThread().getName() + "结果:" + callable.call());
}
}
//运行效果
/**
main处理业务中。。。0
main处理业务中。。。1
main处理业务中。。。2
main处理业务中。。。3
main处理业务中。。。4
main结果:122333333
*/
复制代码
用法是不是很简单呢。 特点如下:
这时,大神又定义了另一个接口,它可以把call方法交给另一个线程异步执行,叫做“ 未来 ” 接口 —— Future ,如果有更牛逼的名字,请告知我哈~
官方的解释如下: Future表示异步计算的结果。方法用于检查计算是否完成、等待计算完成以及检索计算结果。 只有在计算完成后,才能使用get方法检索结果,在必要时阻塞直到它准备好。取消是由cancel方法执行的。还提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果为了可取消性而使用Future,但不提供可用的结果,可以声明Future并返回null作为底层任务的结果。
简单来讲如下:
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
一个可运行的未来。run方法的成功执行将导致Future的完成,并允许访问其结果。
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture extends Runnable, Future {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
除非已取消,否则将此Future设置为其计算的结果。
*/
void run();
}
复制代码
从源码的继承关系来看, 它即继承了Runnable,又继承了Future,这样子的一个既可以创建异步线程来执行异步任务,又可以对异步任务进行操控的接口就这样诞生了。 至于里面的 run方法,一开始我也觉得很懵,为啥Runnable接口里面有了,这边又定义了一次? 结合 下图官方API,你没看错,它其实就是Runnable的run方法,我猜它 这边又定义了一次,只是用来强调下,这是这个类的特有异步执行可操控未来的一个异步过程(就是强调这是属于该“未来任务”的执行方法,只不过是借用了Runnable,感觉有点像是应用了Adapter适配器设计模式~)
好了,上面的介绍其实都是铺垫,下面的才是本篇的主角,它实现了以上接口的所有特性,它才是真正的 未来任务实现类——FutureTask 。
在此,我们画一个UML类图,来对上面的各个接口定义做如下清晰展示:
下面我们就来看看我们的正主: FutureTask 。
/*
* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races. Sync control in the current design relies
* on a "state" field updated via CAS to track completion, along
* with a simple Treiber stack to hold waiting threads.
*
* Style note: As usual, we bypass overhead of using
* AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
大概意思如下:
修订注意:这与依赖于AbstractQueuedSynchronizer的该类的以前版本不同,主要是为了避免在取消
竞赛期间保留中断状态让用户感到意外。当前设计中的同步控制依赖于通过CAS更新的“state”字段来
跟踪完成情况,以及一个简单的Treiber堆栈来保存等待的线程。
样式注意:与往常一样,我们绕过了使用AtomicXFieldUpdaters的开销,而是直接使用Unsafe
intrinsic。
请注意:本篇基于JDK8源码讲解,这段话的意思其实就是在这版本之前,应该是依赖AQS实现的
现在改成了通过 state+CAS+Treiber堆栈来实现的。有兴趣的小伙伴可以看下JDK8之前的实现。
*/
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
复制代码
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
复制代码
果不其然,和我们之前一起学习过的AQS差不多,这里 是一个单向的等待列表(因为有个next属性),内部也是当前线程。
根据以上源码注释和属性,总结: 在当前版本( JDK8,之前的版本实现方式不太一样 )FutureTask的同步控制实现,是通过对状态的CAS的更新,以及用一个线程安全的Treiber堆栈来保存等待的线程的方式来进行的。
FutureTask提供了两个构造器,如下:
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
复制代码
public static Callable callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter(task, result);
}
/*
适配器模式来啦,和前面的猜测呼应上了。。。。主要目的就是将Runnable适配成Callable并指定
传过来的结果作为异步任务的结果
*/
static final class RunnableAdapter implements Callable {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
复制代码
通过构造器我们知道, 最终就算传入的是Runnable对象,其实最后还是callable对象。
具体接口的实现,就是FutureTask的实现原理了,我们来看看它是如何实现异步任务的控制,并且可以获取到异步任务的结果的吧。主要是Runnable接口和Future接口对应的方法,首先来看 Runnable接口 的实现:
public void run() {
//如果state不为NEW 或者将runner修改为当前线程不成功,注意此时的线程是真正执行任务的线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
//以下过程较简单,就不注释了~~
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
/**
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
*/
int s = state;
//如果状态大于等于INTERRUPTING,说明在中断线程
if (s >= INTERRUPTING)
//见下面方法的分析
handlePossibleCancellationInterrupt(s);
}
}
//set(result)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
//setException(ex)
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//注意此处将异常结果给了结果
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
//finishCompletion()
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
其实就是个循环清空等待队列,并且唤醒各个节点线程的方法,也相当于一个结束后的优化清理工作
我叫他动作完成的既定动作。
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
//handlePossibleCancellationInterrupt
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
//其实是个自旋,只要没有被中断,就不会退出,直到被中断退出
//(中断的过程在cancel方法里可以看到我们下面会分析到)
//
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
复制代码
过程如下:
简单总结: run方法其实就是将当前线程变成真正的runner,然后不停地判断状态是否正常,正常的话,就去执行call方法具体的业务,执行完成的话,就把结果返回,并且唤醒所有等待的线程;如果有异常,则设置异常;最后再次判断是否有中断,有的话,等待state变为已中断才退出。
我们再看 Future 的实现:其实主要就是看 get和cancel 两个方法就可以了。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
//awaitDone(false, 0L)
/**
* Awaits completion or aborts on interrupt or timeout.
在中断 或 超时时等待完成 或 中止
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
/*
待封装的节点(主要用来将当前执行get方法的线程在未来可能封装为等待节点,注意是可能,仔细看
下面的分析)
*/
WaitNode q = null;
//是否排队
boolean queued = false;
/**
此处是一个自旋操作,什么时候退出循环呢,其实在方法的注释上已经告诉我们了
1、被中断
2、等待超出设定的时间
3、任务完成被唤醒
*/
for (;;) {
/**
如果在反复循环过程中线程被中断了,此时有可能线程被放到队列(Treiber栈),则先移除,然后
抛异常;如果没有放入等待队列,则直接抛出异常
*/
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
/**
我把状态值移过来,方便大家分析
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
*/
int s = state;
/*继续走到这里,判断状态是否大于COMPLETING(NORMAL/EXCEPTIONAL/CANCELLED/
INTERRUPTING/INTERRUPTED),就是目前处于上面那些状态,要么完成,要么异常了,
要么被取消了等结束状态,则直接返回当前任务的状态即可。
*/
if (s > COMPLETING) {
if (q != null)
//此处的置空,其实是方便GC垃圾回收
q.thread = null;
return s;
}
//如果状态为正在执行任务中,则yield等等,进入下一次的循环
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
/**
如果再进来一次循环,代码走到这里(s=COMPLETING,请回头到上面看看run方法的过程就知道了)
*/
else if (q == null)
q = new WaitNode();
/**
再进来,queued=false,如果还没入队,则将包装好的q通过CAS+头插入(查到头节点之前)
的方式入队,然后再次进入下一次的循环
*/
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//如果是超时进来的,则里面的就是等待超时的操作,和之前讲过的AQS超时操作都是一样的
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
//再进来循环,走到这里,说明任务还未结束,且当前线程已经包装成节点进入等待队列
//(Treiber栈)了,则阻塞线程
else
LockSupport.park(this);
}
}
//report(s)
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
/**
我把状态值移过来,方便大家分析
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
//除了上面两种情况外,就是执行抛异常了
throw new ExecutionException((Throwable)x);
}
复制代码
get流程总结如下:
//Future接口,cancel方法的注释
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
试图取消此任务的执行。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。
如果成功,并且在调用cancel时这个任务还没有启动,那么这个任务就不应该运行。如果任务已经开始,
那么参数mayInterruptIfRunning决定执行该任务的线程是否应该在试图停止该任务时被中断。
*
* After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
该方法返回后,对isDone的后续调用将始终返回true。如果该方法返回true,则对iscancelled后续调用将
始终返回true。
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
mayInterruptIfRunning
true:当前的任务会被中断
false:允许当前的任务继续执行
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
//如果mayInterruptIfRunning为true,则中断线程,并且将状态改为INTERRUPTED
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//最后做完成既定动作,上面分析过
finishCompletion();
}
//最后返回true,取消成功
return true;
}
复制代码
根据代码和注释分析如下:
以上呢,便是FutureTask主要操作的过程分析了。
最后呢,我再提供一个demo的用法示例,注意看代码中的注释,按照最后的方法试试运行效果:
public class DEmo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("main start");
FutureTask task = new FutureTask(() -> {
System.out.println("thread:" + Thread.currentThread().getName() + "异步任务开始...");
Thread.sleep(5000);
System.out.println("thread:" + Thread.currentThread().getName() + "异步任务结束..");
return 1024;
});
Thread t1 = new Thread(task);
// 开启FutureTask任务的执行,由t1线程执行
t1.start();
//t2为取消线程
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// t2线程 2s后取消,两种形式,中断式取消和非中断式
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
// 1
task.cancel(false);
}
});
t2.start();
// 此处主线程等待获取结果
// 2
System.out.println(task.get());
// 上面的demo,我标出了1和2两处位置
// 小伙伴们可以试着切换1处的true/false看看效果
// 也可以将2处的代码进行注释和不注释切换看看效果
}
}
复制代码
PS:其实提供这段代码的主要目的是供小伙伴们打断点调试,来印证我文末最后的总结,如下方我在源码中打断点的代码位置( 看代码行数即可 )
以为到这里本篇就结束了吗?还没有,我们再来仔细看下FutureTask的方法,如下:
我们仔细看这三个方法的修饰符,都是protected,这说明什么,说明这三个方法可以在子类中被重写
在此,我们也引出一个问题: FutureTask虽然为我们提供了获取任务结果的方法get,但是呢,最终还是没有达到我想要的异步效果,即我调用了get方法之后,会阻塞在那,而且发生异常的话,异常只能被动抛出。 我现在想要实现一种我不需要get方法阻塞,也能获取到结果,且以通知的形式告诉主线程,如果任务异常的话,可以主动地获取到异常信息。
来看看怎么实现的吧~
public interface TaskListenser {
/**
exception:true,结果异常,value即异常详情信息 false,无异常,value即正常结果值
*/
public void onResultOk(boolean exception, Object value);
}
复制代码
public class MyFutureTask extends FutureTask {
/**
* 设置监听
*/
private TaskListenser taskListenser;
public MyFutureTask(Callable callable) {
super(callable);
}
@Override
protected void set(V v) {
super.set(v);
try {
// 任务执行完成,通知
taskListenser.onResultOk(false, get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@Override
protected void setException(Throwable t) {
super.setException(t);
taskListenser.onResultOk(true, getExceptionResult());
}
/**
* 把异常信息以返回值的形式给到前台
*
* @return
*/
private Throwable getExceptionResult() {
try {
get();
} catch (InterruptedException | ExecutionException e) {
return e;
}
return null;
}
public void setTaskListenser(TaskListenser taskListenser) {
this.taskListenser = taskListenser;
}
}
复制代码
public class MyDemo implements TaskListenser, Callable {
@Override
public void onResultOk(boolean exception, Object value) {
if (!exception) {
System.out.println("我收到結果了:" + (String) value);
} else {
System.out.println("执行任务发生异常了:" + ((Throwable) value).getMessage());
}
}
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + "异步任务开始...");
// 模拟执行任务
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "执行任务中..." + i);
// 此段代码可以用来测试任务异常的情况,注释打开即可
//if (i == 3) {
// throw new Exception("发生异常啦!!!");
//}
Thread.sleep(1000);
}
System.out.println(Thread.currentThread().getName() + "异步任务结束..");
return "1024";
}
public static void main(String[] args) {
MyDemo myDemo = new MyDemo();
MyFutureTask task = new MyFutureTask<>(myDemo);
task.setTaskListenser(myDemo);
Thread t1 = new Thread(task);
// 开启FutureTask任务的执行,由t1线程执行
t1.start();
}
}
复制代码
//运行效果
//正常
Thread-0异步任务开始...
Thread-0执行任务中...0
Thread-0执行任务中...1
Thread-0执行任务中...2
Thread-0执行任务中...3
Thread-0执行任务中...4
Thread-0异步任务结束..
我收到結果了:1024
//发生异常的效果
Thread-0异步任务开始...
Thread-0执行任务中...0
Thread-0执行任务中...1
Thread-0执行任务中...2
Thread-0执行任务中...3
执行任务异常了:java.lang.Exception: 发生异常啦!!!
复制代码
例子比较简单哈,大家可以试试~ 当然,大家如果能够举一反三的话就更好了!! 比如我这边的通知只是利用了观察者模式,能不能用我们之前学的阻塞队列呢? 切忌眼高手低啊!
本篇到此就结束了,你学会了吗?~ 文字偏多,感谢阅读,认可的话,请点关注吧~ 哈哈
原文链接:https://juejin.cn/post/7120469977634701319
留言与评论(共有 0 条评论) “” |