java异步接口Future和Callable
Future是Concurrent包提供的一种异步得到结果的接口。
Future接口:
public interface Future<V> {// 取消当前的计算boolean cancel(boolean mayInterruptIfRunning);// 计算是否被取消boolean isCancelled();// 计算是否已经结束boolean isDone();// 得到计算的结果V get() throws InterruptedException, ExecutionException;// 带有超时时间的get方法V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}
Future里面的任务,必须实现Callable接口。
Callable接口:
public interface Callable<V> {// 要实现具体的计算逻辑V call() throws Exception;}
state状态
当前计算的情况有下面这些状态
private volatile int state;private static final int NEW = 0;//初始private static final int COMPLETING = 1;//任务已经执行完或者出错,准备赋值private static final int NORMAL = 2;//任务正常执行完,并且已经赋值完private static final int EXCEPTIONAL = 3;//任务失败,把异常赋值回去private static final int CANCELLED = 4;//取消private static final int INTERRUPTING = 5;//准备中断计算过程private static final int INTERRUPTED = 6;//对计算进行中断
下面以FutureTask类为例,叙述一遍Future的使用方法和原理。
一、使用FutureTask
FutureTask的实现接口:
测试代码:
public class TestMain {public static void main(String[] args) {long start = System.currentTimeMillis();Callable<Clothes> callable = new Callable<Clothes>() {@Overridepublic Clothes call() throws Exception {Clothes clothes = new Clothes();clothes.washClothes();return clothes;}};FutureTask<Clothes> futureTask = new FutureTask<Clothes>(callable);new Thread(futureTask).start();Dishes dishes = new Dishes();dishes.washDishes();Clothes clothes = null;try {clothes = futureTask.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}if (clothes != null) {System.out.println("===========over===========");}long end = System.currentTimeMillis();System.out.println("all tasks cost "+(end - start)+" millis");/*运行结果:start to wash dishesmachine starts to wash clothsall dishes are cleaned!!!all clothes are cleaned!!!===========over===========all tasks cost 3003 millis*/}static class Clothes {public void washClothes() {System.out.println("machine starts to wash cloths");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("all clothes are cleaned!!!");}}static class Dishes {public void washDishes() {System.out.println("start to wash dishes");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("all dishes are cleaned!!!");}}}
由结果可看出Future的方法是异步的,所以总耗时是3s,如果同步的话应该是5s。
在代码的最开头,我们new了一个Callable对象,Callable对象会被当做任务丢到FutureTask里面执行。
启动线程就可以开始执行任务了。
下面具体说一下Future帮我们干了什么,为什么他是异步的。
二、 FutureTask的run()方法
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// 全局变量,构造函数赋值if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();// callable的返回值ran = true;// Callable计算已经结束了} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
run方法很直接,直接调用Callable的call方法获取值,获取到值了,就令ran为true,然后调用set方法。
三、 set()方法
set方法会调用finishCompletion,
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;// outcome是计算出来的结果,或者抛出的异常UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}
3.1、 finishCompletion()方法
WaitNode节点是Treiber stack的节点,就是一个排队等待的线程队列。
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {//唤醒队列中等待的线程Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);//唤醒线程,加锁在get里面,如果没有加锁,正常向下执行}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();// protected方法,钩子函数,留给程序员自己实现callable = null; // to reduce footprint}
四、 get()方法
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)//计算没完成,进入awaitDones = awaitDone(false, 0L);return report(s);}
4.1 awaitDone()
awaitDone用死循环等待结果,也就是说会阻塞在这里。
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {//死循环等待if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {// 计算完成了就返回if (q != null)q.thread = null;return s;}// COMPLETING是一个很短暂的状态,调用Thread.yield期望让出时间片,之后重试循环else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)/* 当前节点未入栈* 这是Treiber Stack算法入栈的逻辑。* Treiber Stack是一个基于CAS的无锁并发栈实现,* 更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack*/queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {// 超时,移除节点removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}else //这里阻塞住LockSupport.park(this);}}
4.2 report()
report把结果赋值回去
private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)//正常得到了结果return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
总结
FutureTask在底层开了一个死循环用于等待结果,当线程得到结果时,跳出循环,借此实现的异步操作。注意,我们一般在使用Future的时候,都不会用本文中new线程的方式,而是采用连接池中的线程。
————————————————
版权声明:本文为CSDN博主「CPeony」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_15764477/article/details/109366729
