Netty 中的异步编程 Future 和 Promise
2021-03-20 04:26
标签:date 逻辑 不能 覆盖 之间 arrays ack second google Netty 中大量 I/O 操作都是异步执行,本篇博文来聊聊 Netty 中的异步编程。 JDK 5 引入了 Future 模式。Future 接口是 Java 多线程 Future 模式的实现,在 对于异步编程,我们想要的实现是:提交一个任务,在任务执行期间提交者可以做别的事情,这个任务是在异步执行的,当任务执行完毕通知提交者任务完成获取结果。 那么在 Future 中是怎么实现的呢?我们先看接口定义: 我们看一个示例: Futrue 的使用方式是:投递一个任务到 Future 中执行,操作完之后调用 上面两种方式的代码就变为这样: 一般在使用线程池创建线程执行任务的时候会有两种方式,要么实现 Runnable 接口,要么实现 Callable 接口,它们的区别在于: 而我们的异步返回自然是使用 Callable 方式。那么 Callable 是如何实现的呢? 从 Callable 被提交的地方入手: 可以看到将 Callable 又包装成了 RunnableFuture。而这个 所以再看 那么 Callable 优势如何变为 RunnableFuture 的呢?我们看 将 Callable 包装为 FutureTask 对象,看到这里又关联到 FutureTask , : 可以看到 FutureTask 是 RunnableFuture 的子类,这也就解释了上面的示例为什么在线程池中可以提交 FutureTask 实例。 更详细的执行过程这里就不再分析,重点剖析 Future 的实现过程,它并不是真正的异步,没有实现回调。所以在Java8 中又新增了一个真正的异步函数:CompletableFuture。 Java 8 中新增加了一个类:CompletableFuture,它提供了非常强大的 Future 的扩展功能,最重要的是实现了回调的功能。 使用示例: CompletableFuture源码中有四个静态方法用来执行异步任务: 前面两个可以看到是带返回值的方法,后面两个是不带返回值的方法。同时支持传入自定义的线程池,如果不传入线程池的话默认是使用 合并两个异步任务 如果有两个任务需要异步执行,且后面需要对这两个任务的结果进行合并处理,CompletableFuture 也支持这种处理: 通过 下一个依赖上一个的结果 如果第二个任务依赖第一个任务的结果: 常用 API 介绍 上面四个方法表示在当前阶段任务完成之后下一步要做什么。whenComplete 表示在当前线程内继续做下一步,带 Async 后缀的表示使用新线程去执行。 拿到上一个任务的结果做后续操作,使用 handler 来处理逻辑,可以返回与第一阶段处理的返回类型不一样的返回类型。 Handler 与 whenComplete 的区别是 handler 是可以返回一个新的 CompletableFuture 类型的。 拿到上一个任务的结果做后续操作, thenApply方法 注意到 thenApply 方法的参数中是没有 Throwable,这就意味着如有有异常就会立即失败,不能在处理逻辑内处理。且 thenApply 返回的也是新的 CompletableFuture。 这就是它与前面两个的区别。 拿到上一个任务的结果做后续操作,可以不返回任何值,thenAccept方法 看这里的示例: 执行完毕是不会返回任何值的。 CompletableFuture 的特性提现在执行完 runAsync 或者 supplyAsync 之后的操作上。CompletableFuture 能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。 另外当你依赖 CompletableFuture 的计算结果才能进行下一步的时候,无需手动判断当前计算是否完成,可以通过 CompletableFuture 的事件监听自动去完成。 说 Netty 中的异步编程之前先说一个异步编程模型:Future/Promise异步模型。 future和promise起源于函数式编程和相关范例(如逻辑编程 ),目的是将值(future)与其计算方式(promise)分离,从而允许更灵活地进行计算,特别是通过并行化。 Future 表示目标计算的返回值,Promise 表示计算的方式,这个模型将返回结果和计算逻辑分离,目的是为了让计算逻辑不影响返回结果,从而抽象出一套异步编程模型。那计算逻辑如何与结果关联呢?它们之间的纽带就是 callback。 引用自:https://zh.wikipedia.org/wiki/Future%E4%B8%8Epromise 在 Netty 中的异步编程就是基于该模型来实现。Netty 中非常多的异步调用,最简单的例子就是我们 Server 和 Client 端启动的例子: Server: Client: Netty 中使用了一个 ChannelFuture 来实现异步操作,看似与 Java 中的 Future 相似,我们看一下代码: 这里 ChannelFuture 继承了一个 Future,这是 Java 中的 Future 吗?跟下去发现并不是 JDK 的,而是 Netty 自己实现的。该类位于: Netty 自己实现的 Future 继承了 JDK 的 Future,新增了 Netty的 Future 与 Java 的 Future 虽然类名相同,但功能上略有不同,Netty 中引入了 Promise 机制。在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 Promise 接口继承自 Future 接口,重点添加了上述几个方法,可以人工设置 future 的执行成功与失败,并通知所有监听的 listener。 从 Future 和 Promise 提供的方法来看,Future 都是 get 类型的方法,主要用来判断当前任务的状态。而 Promise 中是 set 类型的方法,主要来对任务的状态来进行操作。这里就体现出来将 结果和操作过程分离的设计。 Promise 实现类是DefaultPromise类,该类十分重要,Future 的 listener 机制也是由它实现的,所以我们先来分析一下该类。先来看一下它的重要属性: 第一个套 listener,是指在 listener 的 为了更好的说明,先写了一个示例,Netty 中的 Future/Promise模型是可以单独拿出来使用的。 通过这个例子可以看到,Promise 能够在业务逻辑线程中通知 Future 成功或失败,由于 Promise 继承了 Netty 的 Future,因此可以加入监听事件。而 Future 和 Promise 的好处在于,获取到 Promise 对象后可以为其设置异步调用完成后的操作,然后立即继续去做其他任务。 来看一下 addListener() 方法: 这里看到有一个全局变量 为啥会是一个 Object 类型的对象呢,不是应该是 List 或者是数组才对嘛。Netty之所以这样设计,是因为大多数情况下 listener 只有一个,用集合和数组都会造成浪费。当只有一个 listener 时,该字段为一个 GenericFutureListener 对象;当多于一个 listener 时,该字段为 DefaultFutureListeners ,可以储存多个 listener。 我们再来看 这里有个疑问就是为什么要设置当前的调用栈深度+1。 接着看真正执行通知的方法: Netty 中 DefalutPromise 是一个非常常用的类,这是 Promise 实现的基础。DefaultChannelPromise DefalutPromise 的子类,加入了 channel 这个属性。 Promise 目前支持两种类型的监听器: 为了让 Promise 支持多个监听器,Netty 添加了一个默认修饰符修饰的 以上就是关于 Promise 和监听器相关的实现分析,再回到之前的启动类,是不是还有一个 sync() 方法: 这里其实就是一个同步检测当前事件是否完成的过程。 以上就是 Netty 中实现的 Future/Promise 异步回调机制。实现并不是很难懂,代码很值得学习。除了 Netty 中实现了 Future/Promise模型,在Guava中也有相关的实现,大家日常使用可以看习惯引用相关的包。 Guava实现: Netty 中的异步编程 Future 和 Promise 标签:date 逻辑 不能 覆盖 之间 arrays ack second google 原文地址:https://www.cnblogs.com/rickiyang/p/12742091.htmlJava Future 提供的异步模型
java.util.concurrent
包中,可以来进行异步计算。public interface Future
public class FutureTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("start");
Future
Future#get()
或者 Future#isDone()
方法判断是否执行完毕。从这个逻辑上看, Future 提供的功能是:用户线程需要主动轮询 Future 线程是否完成当前任务,如果不通过轮询是否完成而是同步等待获取则会阻塞直到执行完毕为止。所以从这里看,Future并不是真正的异步,因为它少了一个回调,充其量只能算是一个同步非阻塞模式。executorService.submit()
方法获取带返回值的 Future 结果有两种方式:
Callable
接口;Callable
进行创建)。public class FutureTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("start");
//方式1 通过 executorService 提交一个异步线程
//Future
executorService.submit(task)
, ExecutorService 是一个接口,他的默认实现类是:AbstractExecutorService,我们看这里的 submit()
实现方式:public
RunnableFuture
就比较神奇,它同时继承了 Runnable 和 Future ,既有线程的能力又有可携带返回值的功能。public interface RunnableFuture
submit()
方法,其实是将 RunnableFuture 线程送入线程池执行,执行是一个新线程,只是这个执行的对象提供了 get()
方法来获取执行结果。newTaskFor(task)
方法:protected
public class FutureTask
CompletableFuture 非阻塞异步编程模型
public class CallableFutureTest {
public static void main(String[] args) {
System.out.println("start");
/**
* 异步非阻塞
*/
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("sleep done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done");
}
}
CompletableFuture.runAsync()
方法提供了异步执行无返回值任务的功能。ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture
CompletableFuture.supplyAsync()
方法提供了异步执行有返回值任务的功能。public static CompletableFuture supplyAsync(Supplier supplier){..}
public static CompletableFuture supplyAsync(Supplier supplier,Executor executor){..}
public static CompletableFuture
ForkJoinPool.commonPool()
作为它的线程池执行异步代码。ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture
CompletableFuture.thenCombineAsync()
方法获取两个任务的结果然后进行相应的操作。ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture
CompletableFuture.thenComposeAsync()
支持将第一个任务的结果传入第二个任务中。
public CompletableFuture
public CompletableFuture handle(BiFunction super T,Throwable,? extends U> fn)
public CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn)
public CompletableFuture handleAsync(BiFunction super T,Throwable,? extends U> fn, Executor executor)
CompletableFuture
public CompletableFuture thenApply(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)
public CompletableFuture
CompletableFuture.supplyAsync(() -> {
return "result";
}).thenAccept(r -> {
System.out.println(r);
}).thenAccept(r -> {
System.out.println(r);
});
Netty 中的异步编程
public interface ChannelFuture extends Future
io.netty.util.concurrent
包中:public interface Future
sync()
和 await()
用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 就可以了,那么我们就不一定要主动调用 isDone()
来获取状态,或通过 get()
阻塞方法来获取值。call()
或 run()
执行完毕意味着业务逻辑的完结,在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。public interface Promise
// 可以嵌套的Listener的最大层数,可见最大值为8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater
operationComplete()
方法中,可以再次使用 future.addListener()
继续添加 listener,Netty 限制的最大层数是8,用户可使用系统变量io.netty.defaultPromise.maxListenerStackDepth
设置。
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
/**
* @author rickiyang
* @date 2020-04-19
* @Desc TODO
*/
public class PromiseTest {
public static void main(String[] args) {
PromiseTest testPromise = new PromiseTest();
Promise
@Override
public Promise
listeners
,我们看到他的定义:private Object listeners;
notifyListeners()
方法:private void notifyListeners() {
EventExecutor executor = executor();
//当前EventLoop线程需要检查listener嵌套
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
//这里是当前listener的嵌套层数
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// 正在通知或已没有监听者(外部线程删除)直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
//只有一个listener
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
//有多个listener
notifyListener0(this, (GenericFutureListener extends Future
GenericFutureListener
的子类,支持进度表示和支持泛型的Future 监听器(有些场景需要多个步骤实现,类似于进度条那样)。DefaultFutureListeners
类用于保存监听器实例数组:final class DefaultFutureListeners {
private GenericFutureListener extends Future>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
// 这个构造相对特别,是为了让Promise中的listeners(Object类型)实例由单个GenericFutureListener实例转换为DefaultFutureListeners类型
@SuppressWarnings("unchecked")
DefaultFutureListeners(GenericFutureListener extends Future>> first, GenericFutureListener extends Future>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener extends Future>> l) {
GenericFutureListener extends Future>>[] listeners = this.listeners;
final int size = this.size;
// 注意这里,每次扩容数组长度是原来的2倍
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size > l) {
final GenericFutureListener extends Future>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i 0) {
// listenersToMove后面的元素全部移动到数组的前端
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
// 当前监听器总量的最后一个位置设置为null,数量减1
listeners[-- size] = null;
this.size = size;
// 如果监听器是GenericProgressiveFutureListener,则带进度指示的监听器总数量减1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
// 返回监听器实例数组
public GenericFutureListener extends Future>>[] listeners() {
return listeners;
}
// 返回监听器总数量
public int size() {
return size;
}
// 返回带进度指示的监听器总数量
public int progressiveSize() {
return progressiveSize;
}
}
@Override
public Promise
文章标题:Netty 中的异步编程 Future 和 Promise
文章链接:http://soscw.com/index.php/essay/66549.html