多线程(二十二、异步执行-Futrue模式)
2020-12-13 06:22
标签:委托 慢慢 put cancel ace runnable odi orm als 如果一个任务需要返回执行结果,一般我们会实现一个Callable任务,并创建一个线程来执行任务。对于执行时间比较长的任务,显然我们同步的等待结果再去执行后续的业务是不现实的,那么,Future模式是怎样解决这个问题的呢? Future模式,可以让调用方立即返回,然后它自己会在后面慢慢处理,此时调用者拿到的仅仅是一个凭证,调用者可以先去处理其它任务,在真正需要用到调用结果的场合,再使用凭证去获取调用结果。这个凭证就是这里的Future。 Future接口的定义: 1、NEW:表示任务的初始化状态; 各个状态之间的流转: FutureTask在构造时可以接受Runnable或Callable任务,如果是Runnable,则最终包装成Callable: 当调用FutureTask的get方法时,如果任务没有完成,则调用线程会被阻塞,其实就是将线程包装成WaitNode结点保存到waiters指向的栈中。 set方法: 任务取消后,最终调用finishCompletion方法,释放所有在栈上等待的线程 FutureTask可以通过get方法获取任务结果,如果需要限时等待,可以调用get(long timeout, TimeUnit unit) 1、ScheduledFutureTask在普通FutureTask的基础上增加了周期执行/延迟执行的功能 FutureTask的runAndReset方法与run方法的区别就是当任务正常执行完成后,不会设置任务的最终状态(即保持NEW状态),以便任务重复执行: 多线程(二十二、异步执行-Futrue模式) 标签:委托 慢慢 put cancel ace runnable odi orm als 原文地址:https://blog.51cto.com/janephp/2419664public interface Future
FutureTask
Future模式中,最重要的就是FutureTask类
FutureTask一共给任务定义了7种状态
2、COMPLETING:表示任务已执行完成(正常完成或异常完成),但任务结果或异常原因还未设置完成,属于中间状态;
3、NORMAL:表示任务已经执行完成(正常完成),且任务结果已设置完成,属于最终状态;
4、EXCEPTIONAL:表示任务已经执行完成(异常完成),且任务异常已设置完成,属于最终状态;
5、CANCELLED:表示任务还没开始执行就被取消(非中断方式),属于最终状态;
6、INTERRUPTING:表示任务还没开始执行就被取消(中断方式),正式被中断前的过渡状态,属于中间状态;
7、INTERRUPTED:表示任务还没开始执行就被取消(中断方式),且已被中断,属于最终状态。FutureTask构造
public FutureTask(Callable
FutureTask成员
private volatile int state;//任务状态
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
任务执行run
public void run() {
// 仅当任务为NEW状态时, 才能执行任务
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;//存储结果值
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
任务取消
public boolean cancel(boolean mayInterruptIfRunning) {
// 仅NEW状态下可以取消任务
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) { // 中断任务
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//释放所有在栈上等待的线程
finishCompletion();
}
return true;
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) { //自旋释放所有等待线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//唤醒线程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
获取结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
//当前任务的状态是NEW或COMPLETING,会调用awaitDone阻塞线程
if (s = CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable) x);
}
ScheduledFutureTask
2、ScheduledFutureTask是ScheduledThreadPoolExecutor这个线程池的默认调度任务类,通过继承FutureTask和Delayed接口来实现周期/延迟功能的。ScheduledFutureTask的源码非常简单,基本都是委托FutureTask来实现的
任务运行
public void run() {
// 是否是周期任务
boolean periodic = isPeriodic();
//// 能否运行任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 非周期任务:调用FutureTask的run方法运行
ScheduledFutureTask.super.run();
// 周期任务:调用FutureTask的runAndReset方法运行
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
protected boolean runAndReset() {
// 仅NEW状态的任务可以执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable