[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

2021-07-13 05:06

阅读:526

标签:unit   技术   eric   fail   修改   abstract   read   his   throws   

netty Future是基于jdk Future扩展,以监听完成任务触发执行
Promise是对Future修改任务数据
DefaultPromise是重要的模板类,其它不同类型实现基本是一层简单的包装,如DefaultChannelPromise
主要是分析await是如何等侍结果的

技术分享

 

public interface Futureextends java.util.concurrent.Future {
   Future addListener(GenericFutureListener extends Future super V>> listener);
}
public interface Promiseextends Future {
    Promise setSuccess(V result);
    boolean trySuccess(V result);
    Promise setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    boolean setUncancellable();
}
public class DefaultPromiseextends AbstractFutureimplements Promise {

    @Override
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return await0(unit.toNanos(timeout), true);
    }
    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
        //已完成任务直接忽略
        if (isDone()) {
            return true;
        }
        //没有等侍时间返回处理记录
        if (timeoutNanos ) {
            return isDone();
        }
        //已中断抛异常
        if (interruptable && Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        //checkDeadLock();
        //netty 认为是当前线程是死锁状态
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
        
        long startTime = System.nanoTime();
        long waitTime = timeoutNanos;
        boolean interrupted = false; 
        try {
            for (;;) {
                synchronized (this) {
                    if (isDone()) {
                        return true;
                    }
                    //最大检查次数为 Short.MAX_VALUE
                    //很奇怪的逻辑,处理完后又自减
                    if (waiters == Short.MAX_VALUE) {
                        throw new IllegalStateException("too many waiters: " + this);
                    }
                    ++waiters;
                    try {
                        //阻塞的代码只是一行参数1是milliseconds,参数2是辅助用的大于0时milliseconds+1,如果是0的话会无限制阻塞
                        wait(waitTime / 1000000, (int) (waitTime % 1000000));
                    } catch (InterruptedException e) {
                        if (interruptable) {
                            throw e;
                        } else {
                            interrupted = true;
                        }
                    } finally {
                        waiters--;
                    }
                }
                //这里是double check跟并发无影响的逻辑放在synchronized外面
                if (isDone()) {
                    return true;
                } else {
                    waitTime = timeoutNanos - (System.nanoTime() - startTime);
                    if (waitTime ) {
                        return isDone();
                    }
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
public class DefaultChannelPromise extends DefaultPromiseimplements ChannelPromise, FlushCheckpoint {
    private final Channel channel;
    public DefaultChannelPromise(Channel channel) {
        this.channel = channel;
    }
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
}

 

[编织消息框架][netty源码分析]9 Promise 实现类DefaultPromise职责与实现

标签:unit   技术   eric   fail   修改   abstract   read   his   throws   

原文地址:http://www.cnblogs.com/solq111/p/7071177.html


评论


亲,登录后才可以留言!