盘一盘晕头转向的Runnable、Callable、Future、FutureTask

今天我们来一起盘一盘应用场景很多的实现类,虽然我们不经常直接使用这些类,但是在各种地方都有它们的身影(比如 线程池 ),且它们很容易搞混,我称之为“ 可知结果的未来任务——FutureTask叫法有点拗口哈,因为我实在想不出更贴切的名字了~

由于涉及到的接口定义和实现比较多(看标题就知道了。。), 我先让它们各自做个自我介绍,然后我们最后来看看所谓的“ 可知结果的未来任务 ”的源码实现 。

逐个介绍

Runnable

在之前介绍线程的时候,我们就知道了,创建线程的很多方法,其中实现Runnable接口就是其中的方式,其实就是定义了这个线程run方法中做了哪些事情:

盘一盘晕头转向的Runnable、Callable、Future、FutureTask

源码中的注释还是比较好理解的,但是我们会发现,这个run方法没有任何返回值,也就是说我们根本就不知道,run方法的执行结果到底怎么样,这时Doug Lea大神就定义了另外一套接口 Callable 系列接口。

Callable

要想run方法有结果,最直接的方法就是将返回值类型void改成特定的类,好在Java有泛型编程,如下源码实现:

/**
 * A task that returns a result and may throw an exception.
 * Implementors define a single method with no arguments called
 * {@code call}.
 *
 * 

The {@code Callable} interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * {@code Runnable}, however, does not return a result and cannot * throw a checked exception. * *

The {@link Executors} class contains utility methods to * convert from other common forms to {@code Callable} classes. * * @see Executor * @since 1.5 * @author Doug Lea * @param the result type of method {@code call} */ @FunctionalInterface public interface Callable { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } 复制代码

我们和Runnable简单比较下:

  • Callable有返回值
  • Callable可以抛出异常。 除了我们想要的有返回值之外,大神还帮我们想到了 异常处理的问题,这样子线程任务里发生了异常,我们就可以做对应的处理了,而Runnable是不行的,丧失了一定的灵活和自定义的特性。如下demo演示下:
public class CallableDemo {
	public static void main(String[] args) throws Exception {
		Callable callable = new Callable() {

			@Override
			public String call() throws Exception {
				for (int i = 0; i < 5; i++) {
					Thread.sleep(1000);
					System.out.println(Thread.currentThread().getName() + "处理业务中。。。" + i);
				}
				return "122333333";
			}
		};
		System.out.println(Thread.currentThread().getName() + "结果:" + callable.call());
	}
}
//运行效果
/**
main处理业务中。。。0
main处理业务中。。。1
main处理业务中。。。2
main处理业务中。。。3
main处理业务中。。。4
main结果:122333333
*/
复制代码

用法是不是很简单呢。 特点如下:

  • 想要有结果,就必须等到业务处理完成,就像是线程卡住了自己,无法异步调用
  • 从打印效果来看,不像Runnable重新开启了另外一个线程,是主线程自己调用,然后就卡住了

这时,大神又定义了另一个接口,它可以把call方法交给另一个线程异步执行,叫做“ 未来接口 —— Future ,如果有更牛逼的名字,请告知我哈~

Future

盘一盘晕头转向的Runnable、Callable、Future、FutureTask

官方的解释如下: Future表示异步计算的结果。方法用于检查计算是否完成、等待计算完成以及检索计算结果。 只有在计算完成后,才能使用get方法检索结果,在必要时阻塞直到它准备好。取消是由cancel方法执行的。还提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果为了可取消性而使用Future,但不提供可用的结果,可以声明Future并返回null作为底层任务的结果。

简单来讲如下:

  • 此接口用来代表一个异步计算的结果。可以用来获取一个异步结果,可以取消正在执行的任务,可以判断任务是否取消,可以判断任务的是否完成。
  • 只有等任务完成,才会有结果返回。 总的来看, Future接口只是提供了获取异步结果的方法以及一系列操控异步执行任务的方法,但我感觉还没真正的是把任务异步交给了另一个线程。于是,大神在这基础之上又组合了一个类,叫未来任务接口—— RunnableFuture

RunnableFuture

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 一个可运行的未来。run方法的成功执行将导致Future的完成,并允许访问其结果。
 
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param  The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture extends Runnable, Future {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     除非已取消,否则将此Future设置为其计算的结果。
     */
     
    void run();
}
复制代码

从源码的继承关系来看, 它即继承了Runnable,又继承了Future,这样子的一个既可以创建异步线程来执行异步任务,又可以对异步任务进行操控的接口就这样诞生了。 至于里面的 run方法,一开始我也觉得很懵,为啥Runnable接口里面有了,这边又定义了一次? 结合 下图官方API,你没看错,它其实就是Runnable的run方法,我猜它 这边又定义了一次,只是用来强调下,这是这个类的特有异步执行可操控未来的一个异步过程(就是强调这是属于该“未来任务”的执行方法,只不过是借用了Runnable,感觉有点像是应用了Adapter适配器设计模式~)

盘一盘晕头转向的Runnable、Callable、Future、FutureTask

好了,上面的介绍其实都是铺垫,下面的才是本篇的主角,它实现了以上接口的所有特性,它才是真正的 未来任务实现类——FutureTask

在此,我们画一个UML类图,来对上面的各个接口定义做如下清晰展示:

盘一盘晕头转向的Runnable、Callable、Future、FutureTask

正主:FutureTask

下面我们就来看看我们的正主: FutureTask

重要属性

/*
     * Revision notes: This differs from previous versions of this
     * class that relied on AbstractQueuedSynchronizer, mainly to
     * avoid surprising users about retaining interrupt status during
     * cancellation races. Sync control in the current design relies
     * on a "state" field updated via CAS to track completion, along
     * with a simple Treiber stack to hold waiting threads.
     *
     * Style note: As usual, we bypass overhead of using
     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
     大概意思如下:
     修订注意:这与依赖于AbstractQueuedSynchronizer的该类的以前版本不同,主要是为了避免在取消
     竞赛期间保留中断状态让用户感到意外。当前设计中的同步控制依赖于通过CAS更新的“state”字段来
     跟踪完成情况,以及一个简单的Treiber堆栈来保存等待的线程。
        样式注意:与往常一样,我们绕过了使用AtomicXFieldUpdaters的开销,而是直接使用Unsafe 
        intrinsic。
     
     请注意:本篇基于JDK8源码讲解,这段话的意思其实就是在这版本之前,应该是依赖AQS实现的
     现在改成了通过 state+CAS+Treiber堆栈来实现的。有兴趣的小伙伴可以看下JDK8之前的实现。
     */
    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
复制代码
  • state:任务状态,有代码中表示的七种状态,从上面的注释可以看出,可能的状态转换有四种:NEW -> COMPLETING -> NORMALNEW -> COMPLETING -> EXCEPTIONALNEW -> INTERRUPTING -> INTERRUPTEDNEW -> CANCELLED其中, NEW(新建)是初始状态;COMPLETING(完成中)和INTERRUPTING(中断中)属于中间状态;NORMAL(正常结束)、EXCEPTIONAL(发生异常)、INTERRUPTED(中断了)和CANCELLED(被取消了)属于结束状态 。
  • Callable callable: 所谓的要执行的真正的任务
  • Object outcome: 任务执行后的结果
  • Thread runner: 真正执行任务的线程
  • WaitNode waiters: 看注释有:Treiber stack,一种无锁并发栈,其特性是基于CAS算法进行栈的操作。所以是一个线程安全的栈(可以保证入栈出栈的线程安全,即同一时刻只有一个线程操作成功) 。观其定义:等待节点栈,难道这里又是一个类似等待队列的实现?如下是内部类WaitNode的定义:
/**
     * Simple linked list nodes to record waiting threads in a Treiber
     * stack.  See other classes such as Phaser and SynchronousQueue
     * for more detailed explanation.
     */
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
复制代码

果不其然,和我们之前一起学习过的AQS差不多,这里 是一个单向的等待列表(因为有个next属性),内部也是当前线程。

根据以上源码注释和属性,总结: 在当前版本( JDK8,之前的版本实现方式不太一样 )FutureTask的同步控制实现,是通过对状态的CAS的更新,以及用一个线程安全的Treiber堆栈来保存等待的线程的方式来进行的。

构造器

FutureTask提供了两个构造器,如下:

/**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
复制代码
  • 一个是直接传入Callable对象
  • 一个传入一个任务对象runnable,以及一个指定的任务结果 ,观其内部,是通过【Executors.callable(runnable, result)】转成的callable,我们来看看怎么转的,代码如下:
public static  Callable callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter(task, result);
 }
 
 /*
    适配器模式来啦,和前面的猜测呼应上了。。。。主要目的就是将Runnable适配成Callable并指定
传过来的结果作为异步任务的结果
 */
 static final class RunnableAdapter implements Callable {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
}
复制代码

通过构造器我们知道, 最终就算传入的是Runnable对象,其实最后还是callable对象。

具体接口实现

具体接口的实现,就是FutureTask的实现原理了,我们来看看它是如何实现异步任务的控制,并且可以获取到异步任务的结果的吧。主要是Runnable接口和Future接口对应的方法,首先来看 Runnable接口 的实现:

run方法实现

public void run() {
        //如果state不为NEW 或者将runner修改为当前线程不成功,注意此时的线程是真正执行任务的线程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                //以下过程较简单,就不注释了~~
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            /**
             private static final int INTERRUPTING = 5;
             private static final int INTERRUPTED  = 6;
            */
            int s = state;
            //如果状态大于等于INTERRUPTING,说明在中断线程
            if (s >= INTERRUPTING)
                //见下面方法的分析
                handlePossibleCancellationInterrupt(s);
        }
}
 //set(result)
 protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
 }
 
//setException(ex)
 protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            //注意此处将异常结果给了结果
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
 }
 
     //finishCompletion()
     /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
    其实就是个循环清空等待队列,并且唤醒各个节点线程的方法,也相当于一个结束后的优化清理工作
    我叫他动作完成的既定动作。
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
    
//handlePossibleCancellationInterrupt
 private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        //其实是个自旋,只要没有被中断,就不会退出,直到被中断退出
        //(中断的过程在cancel方法里可以看到我们下面会分析到)
        //
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt
}
复制代码

过程如下:

  • 首先 , 先判断state状态是否是NEW状态,如果不是NEW,则直接返回,合情合理;如果是NEW状态,则通过CAS的方式将当前的runner修改为当前线程,修改成功的话,将继续执行下面的代码。由此我们可以知道的一点是,runner属性,其实是在具体运行的时候,才赋值的。即谁调用了run方法谁就是执行者。
  • 继续 , 取出任务callable,如果任务为空或者state状态不为NEW;则就直接退出了;如果都为true的话,则执行callable方法体对应的业务,并将结果拿到,如果中间出现的异常,则设置异常【setException(ex)】;如果执行成功,则设置结果【set(result)】set(result):此方法首先,将state通过CAS的方式设置为COMPLETING,然后将结果给属性outcome,然后再将state设置为NORMAL,最后调用【finishCompletion()】setException(ex):和上面一样,首先先设置状态COMPLETING,将异常结果赋给outcome,最后再将state置为EXCEPTIONAL,最后调用【finishCompletion()】从set(result)和setException(ex)两个方法可以看出:中间状态COMPLETING其实是一个很短暂的时间,其实在设置为这个状态之前,任务实际已经执行结束或者抛出异常了。finishCompletion():这段代码的作用其实就是将所有等待栈中的节点置为空,并且唤醒所有等待的线程。同时里面有个可以扩展的方法done(可以用来做些额外的事情)。我把这段代码称之为“ 完成既定动作
  • 最后 , 执行finally体里代码:首先将runner置为空,并且判断state的状态,有没有被置为s >= INTERRUPTING,如果成立,说明有其他线程中断了操作,如果是这样的话,则调用方法【handlePossibleCancellationInterrupt(s)】handlePossibleCancellationInterrupt(s):该方法其实也很简单,就是不停看状态state有没有变为INTERRUPTED,直至变为INTERRUPTED才会退出。

简单总结: run方法其实就是将当前线程变成真正的runner,然后不停地判断状态是否正常,正常的话,就去执行call方法具体的业务,执行完成的话,就把结果返回,并且唤醒所有等待的线程;如果有异常,则设置异常;最后再次判断是否有中断,有的话,等待state变为已中断才退出。

我们再看 Future 的实现:其实主要就是看 get和cancel 两个方法就可以了。

get方法

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
}

    //awaitDone(false, 0L)
    /**
     * Awaits completion or aborts on interrupt or timeout.
         在中断 或 超时时等待完成 或 中止
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        /*
        待封装的节点(主要用来将当前执行get方法的线程在未来可能封装为等待节点,注意是可能,仔细看
        下面的分析)
        */
        WaitNode q = null;
        //是否排队
        boolean queued = false;
        /**
        此处是一个自旋操作,什么时候退出循环呢,其实在方法的注释上已经告诉我们了
        1、被中断
        2、等待超出设定的时间
        3、任务完成被唤醒
        */
        for (;;) {
            /**
            如果在反复循环过程中线程被中断了,此时有可能线程被放到队列(Treiber栈),则先移除,然后
            抛异常;如果没有放入等待队列,则直接抛出异常
            */
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            /**
            我把状态值移过来,方便大家分析
            private static final int NEW          = 0;

            private static final int COMPLETING   = 1;

            private static final int NORMAL       = 2;
            private static final int EXCEPTIONAL  = 3;
            private static final int CANCELLED    = 4;
            private static final int INTERRUPTING = 5;
            private static final int INTERRUPTED  = 6;
            */
            int s = state;
            /*继续走到这里,判断状态是否大于COMPLETING(NORMAL/EXCEPTIONAL/CANCELLED/
                INTERRUPTING/INTERRUPTED),就是目前处于上面那些状态,要么完成,要么异常了,
            要么被取消了等结束状态,则直接返回当前任务的状态即可。
            */   
            if (s > COMPLETING) {
                if (q != null)
                    //此处的置空,其实是方便GC垃圾回收
                    q.thread = null;
                return s;
            }
            //如果状态为正在执行任务中,则yield等等,进入下一次的循环
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            /**
                如果再进来一次循环,代码走到这里(s=COMPLETING,请回头到上面看看run方法的过程就知道了)
            */
            else if (q == null)
                q = new WaitNode();
            /**
            再进来,queued=false,如果还没入队,则将包装好的q通过CAS+头插入(查到头节点之前)
            的方式入队,然后再次进入下一次的循环
            */
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            //如果是超时进来的,则里面的就是等待超时的操作,和之前讲过的AQS超时操作都是一样的
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            //再进来循环,走到这里,说明任务还未结束,且当前线程已经包装成节点进入等待队列
            //(Treiber栈)了,则阻塞线程
            else
                LockSupport.park(this);
        }
}

    //report(s)
    /**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
        /**
            我把状态值移过来,方便大家分析
            private static final int NEW          = 0;

            private static final int COMPLETING   = 1;

            private static final int NORMAL       = 2;
            private static final int EXCEPTIONAL  = 3;
            private static final int CANCELLED    = 4;
            private static final int INTERRUPTING = 5;
            private static final int INTERRUPTED  = 6;
            */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        //除了上面两种情况外,就是执行抛异常了
        throw new ExecutionException((Throwable)x);
    }
复制代码

get流程总结如下:

  • 取出state,判断是否小于等于COMPLETING,即看任务是不是还在运行或者还没开始;如果是的话,则调用【awaitDone(false, 0L)】方法awaitDone(false, 0L): 此方法是个 for自旋,大概意思就是逐步地(逐步的意思就是通过自旋的形式,反复往下执行,注意里面的每个if分支其实互斥发生的,且每次循环,都会从头到尾再来一次, 请仔细看代码里的for循环过程分析 )将当前线程包装成等待节点从头部放入等待栈并且最后park阻塞等待的过程。这里面 一直在自旋检测是否被中断和判断当前state的状态值 ,大于COMPLETING说明要么任务完成,要么任务失败(被取消,发生异常等) 。其实 这个方法的过程 ,如果看过我之前的AQS的分析,其实看这些代码很容易理解的, 无非就是自旋+CAS操作而已
  • 上面执行awaitDone的最终线程被阻塞,直到线程被唤醒(什么时候唤醒?就在上面的run方法当中执行的finishCompletion方法中唤醒的),继续往下执行【report(s)】report(s):代码就很简单了,状态正常结束的话,就返回结果;状态被取消的话,就抛出取消异常;如果在任务执行过程中发生异常,也会包装成ExecutionException抛出异常。

cancel方法

//Future接口,cancel方法的注释
    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when {@code cancel} is called,
     * this task should never run.  If the task has already started,
     * then the {@code mayInterruptIfRunning} parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
        试图取消此任务的执行。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。
如果成功,并且在调用cancel时这个任务还没有启动,那么这个任务就不应该运行。如果任务已经开始,
那么参数mayInterruptIfRunning决定执行该任务的线程是否应该在试图停止该任务时被中断。
     *
     * 

After this method returns, subsequent calls to {@link #isDone} will * always return {@code true}. Subsequent calls to {@link #isCancelled} * will always return {@code true} if this method returned {@code true}. * 该方法返回后,对isDone的后续调用将始终返回true。如果该方法返回true,则对iscancelled后续调用将 始终返回true。 * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete mayInterruptIfRunning true:当前的任务会被中断 false:允许当前的任务继续执行 * @return {@code false} if the task could not be cancelled, * typically because it has already completed normally; * {@code true} otherwise */ public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { //如果mayInterruptIfRunning为true,则中断线程,并且将状态改为INTERRUPTED try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //最后做完成既定动作,上面分析过 finishCompletion(); } //最后返回true,取消成功 return true; } 复制代码

根据代码和注释分析如下:

  • 如果state状态不为NEW,即当前任务已经进行了,即【state == NEW】为false,则直接返回false,取消失败
  • 如果state为NEW,则通过CAS将状态设置为INTERRUPTING(如果mayInterruptIfRunning为true)或者CANCELLED,如果修改成功,将继续,否则也失败返回哪些情况会取消失败: (1)任务已经开始进行或者完成了(2)任务已经被取消了(3)其他原因无法取消的
  • 此方法说是取消了,但其实并不代表任务真正地被取消了,通过源码分析可以知道,它只是改变了状态state,这时就看你当前任务执行到什么程度了,如果还没开始,那么任务就不会执行(run方法中在判断【c != null && state == NEW】后就不会继续往下执行任务了);如果任务已经在执行了,就看你有没有指定中断(即mayInterruptIfRunning为true),如果指定中断,那会中断操作;如果没有,则继续运行。回头再看一次run方法的过程就知道了: run方法最终会到finally代码块,这时【s >= INTERRUPTING】成立,如果s = INTERRUPTING,说明cancel方法还没有执行【t.interrupt()】,所以在【handlePossibleCancellationInterrupt】方法里会一直等待直到变成INTERRUPTED。

以上呢,便是FutureTask主要操作的过程分析了。

FutureTask总结

  • 1、FutureTask分别实现了Runnable和Future的接口,其中的各种操作都离不开对状态state的操作和判断。state状态的中间状态(COMPLETING,INTERRUPTING)存在时间很短(通过set(V v)/setException和cancel方法可知),state的值要么就是初始状态NEW,要么就很快变成了结束状态(NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTED)此外,在我们分析完run get cancel方法之后,我们发现这三个方法之间都需要前后联系起来看,他们之间其实是通过state这个状态值,来作为中间的一个协调者这样的一个角色来实现最终的效果的。
  • 2、FutureTask三大方法的调用是不同种类的线程在调用,大体归为三类:第一类是任务执行者,就是用来当作runner的那个线程;第二类是调用get获取结果的;第三类则是调用cancel方法的。当然可能还有其他调用isDone和isCancel这些工具方法的线程。由于是多线程并发编程,以上三类线程有时几乎是同时进行的,所以针对状态以及一些特殊值的设定,都采取了自旋+CAS的方式进行的( 包括等待队列【Treiber栈】的实现 ),这样就保证了各个操作的线程安全性。
  • 3、FutureTask等待队列的实现 和我们之前一起学习过的AQS差不多,但比AQS简单很多,同样是线程如果拿不到自己想要的任务结果就会逐步的通过 自旋+CAS 的方式把线程包装成节点,然后 头压栈 的形式放到等待队列里;当任务执行完成,再进行唤醒通知它们取结果。 在这过程中:如果任务执行发生了异常(其中也包括中断异常):则直接将state改为EXCEPTIONAL,然后退出并唤醒等待结果的线程(并将异常信息抛出,回头看report方法的分析就会知道)如果被其他线程取消,两种情况:中断式取消和非中断式中断式:先将状态改为中间状态INTERRUPTING,然后中断线程,如果这时任务正在执行,就会抛出中断异常,然后如果有线程get的话,在被唤醒后,执行report的时候直接报错结束( 这时虽然报CancellationException,其根本就是中断异常 );非中断式:先将状态改为结束状态CANCELLED,然后就看任务自己的执行情况了,如果任务还未开始,则run方法中在判断【c != null && state == NEW】后就不会继续往下执行任务了;如果任务已经进行,则任务会继续执行完成,只不过会在【set(result)】方法里修改状态为COMPLETING会失败,结果也会是空的。如果有调用get的线程,则被唤醒后,执行report的时候也会直接抛出异常。

DEMO分享

最后呢,我再提供一个demo的用法示例,注意看代码中的注释,按照最后的方法试试运行效果:

public class DEmo {
	public static void main(String[] args) throws InterruptedException, ExecutionException {

		System.out.println("main start");
		FutureTask task = new FutureTask(() -> {
			System.out.println("thread:" + Thread.currentThread().getName() + "异步任务开始...");
			Thread.sleep(5000);
			System.out.println("thread:" + Thread.currentThread().getName() + "异步任务结束..");
			return 1024;
		});
		Thread t1 = new Thread(task);
		// 开启FutureTask任务的执行,由t1线程执行
		t1.start();

		//t2为取消线程
		Thread t2 = new Thread(new Runnable() {

			@Override
			public void run() {
				// t2线程 2s后取消,两种形式,中断式取消和非中断式
				try {
					Thread.sleep(2000);
				} catch (InterruptedException e) {
				}
				// 1
				task.cancel(false);
			}
		});
		t2.start();

		// 此处主线程等待获取结果
		// 2
		System.out.println(task.get());

		// 上面的demo,我标出了1和2两处位置
		// 小伙伴们可以试着切换1处的true/false看看效果
		// 也可以将2处的代码进行注释和不注释切换看看效果
	}
}
复制代码

PS:其实提供这段代码的主要目的是供小伙伴们打断点调试,来印证我文末最后的总结,如下方我在源码中打断点的代码位置( 看代码行数即可

盘一盘晕头转向的Runnable、Callable、Future、FutureTask

FutureTask扩展

以为到这里本篇就结束了吗?还没有,我们再来仔细看下FutureTask的方法,如下:

盘一盘晕头转向的Runnable、Callable、Future、FutureTask

我们仔细看这三个方法的修饰符,都是protected,这说明什么,说明这三个方法可以在子类中被重写

在此,我们也引出一个问题 FutureTask虽然为我们提供了获取任务结果的方法get,但是呢,最终还是没有达到我想要的异步效果,即我调用了get方法之后,会阻塞在那,而且发生异常的话,异常只能被动抛出。 我现在想要实现一种我不需要get方法阻塞,也能获取到结果,且以通知的形式告诉主线程,如果任务异常的话,可以主动地获取到异常信息。

来看看怎么实现的吧~

定义结果监听器

public interface TaskListenser {
    /**
    exception:true,结果异常,value即异常详情信息 false,无异常,value即正常结果值
    */
    public void onResultOk(boolean exception, Object value);
}
复制代码

自定义Future,重写set和setException

public class MyFutureTask extends FutureTask {
	/**
	 * 设置监听
	 */
	private TaskListenser taskListenser;

	public MyFutureTask(Callable callable) {
		super(callable);
	}

	@Override
	protected void set(V v) {
		super.set(v);
		try {
			// 任务执行完成,通知
			taskListenser.onResultOk(false, get());
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
	}

	@Override
	protected void setException(Throwable t) {
		super.setException(t);
		taskListenser.onResultOk(true, getExceptionResult());
	}

	/**
	 * 把异常信息以返回值的形式给到前台
	 * 
	 * @return
	 */
	private Throwable getExceptionResult() {
		try {
			get();
		} catch (InterruptedException | ExecutionException e) {
			return e;
		}
		return null;
	}

	public void setTaskListenser(TaskListenser taskListenser) {
		this.taskListenser = taskListenser;
	}

}
复制代码

运行类DEMO

public class MyDemo implements TaskListenser, Callable {

	@Override
	public void onResultOk(boolean exception, Object value) {
		if (!exception) {
			System.out.println("我收到結果了:" + (String) value);
		} else {
			System.out.println("执行任务发生异常了:" + ((Throwable) value).getMessage());
		}

	}

	@Override
	public String call() throws Exception {
		System.out.println(Thread.currentThread().getName() + "异步任务开始...");
		// 模拟执行任务
		for (int i = 0; i < 5; i++) {
			System.out.println(Thread.currentThread().getName() + "执行任务中..." + i);
			// 此段代码可以用来测试任务异常的情况,注释打开即可
                        //if (i == 3) {
                        //   throw new Exception("发生异常啦!!!");
                        //}
			Thread.sleep(1000);
		}
		System.out.println(Thread.currentThread().getName() + "异步任务结束..");
		return "1024";
	}

	public static void main(String[] args) {
		MyDemo myDemo = new MyDemo();
		MyFutureTask task = new MyFutureTask<>(myDemo);
		task.setTaskListenser(myDemo);
		Thread t1 = new Thread(task);
		// 开启FutureTask任务的执行,由t1线程执行
		t1.start();

	}
}
复制代码
//运行效果
//正常
Thread-0异步任务开始...
Thread-0执行任务中...0
Thread-0执行任务中...1
Thread-0执行任务中...2
Thread-0执行任务中...3
Thread-0执行任务中...4
Thread-0异步任务结束..
我收到結果了:1024

//发生异常的效果
Thread-0异步任务开始...
Thread-0执行任务中...0
Thread-0执行任务中...1
Thread-0执行任务中...2
Thread-0执行任务中...3
执行任务异常了:java.lang.Exception: 发生异常啦!!!
复制代码

例子比较简单哈,大家可以试试~ 当然,大家如果能够举一反三的话就更好了!! 比如我这边的通知只是利用了观察者模式,能不能用我们之前学的阻塞队列呢? 切忌眼高手低啊!

本篇到此就结束了,你学会了吗?~ 文字偏多,感谢阅读,认可的话,请点关注吧~ 哈哈

原文链接:https://juejin.cn/post/7120469977634701319

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章