[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现
标签:unit 技术 eric fail 修改 abstract read his throws
netty Future是基于jdk Future扩展,以监听完成任务触发执行
Promise是对Future修改任务数据
DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise
主要是分析await是如何等侍结果的
public interface Futureextends java.util.concurrent.Future {
Future addListener(GenericFutureListener extends Future super V>> listener);
}
public interface Promiseextends Future {
Promise setSuccess(V result);
boolean trySuccess(V result);
Promise setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();
}
public class DefaultPromiseextends AbstractFutureimplements Promise {
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
//已完成任务直接忽略
if (isDone()) {
return true;
}
//没有等侍时间返回处理记录
if (timeoutNanos ) {
return isDone();
}
//已中断抛异常
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
//checkDeadLock();
//netty 认为是当前线程是死锁状态
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
long startTime = System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
for (;;) {
synchronized (this) {
if (isDone()) {
return true;
}
//最大检查次数为 Short.MAX_VALUE
//很奇怪的逻辑,处理完后又自减
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
try {
//阻塞的代码只是一行参数1是milliseconds,参数2是辅助用的大于0时milliseconds+1,如果是0的话会无限制阻塞
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
} finally {
waiters--;
}
}
//这里是double check跟并发无影响的逻辑放在synchronized外面
if (isDone()) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime ) {
return isDone();
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
public class DefaultChannelPromise extends DefaultPromiseimplements ChannelPromise, FlushCheckpoint {
private final Channel channel;
public DefaultChannelPromise(Channel channel) {
this.channel = channel;
}
public DefaultChannelPromise(Channel channel, EventExecutor executor) {
super(executor);
this.channel = channel;
}
}
[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现
标签:unit 技术 eric fail 修改 abstract read his throws
原文地址:http://www.cnblogs.com/solq111/p/7071177.html
评论