Netty框架

2021-02-01 17:14

阅读:510

标签:dex   tty   区别   switch   字符串   组成   性能   ssg   土豆   

简介

netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。
提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。特点:并发高、传输快、封装好。

 

核心组件

一、Channel:

1.概念

channel代表的一个连接,每个client请求都会对应到具体的一个channel。在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。它是Netty网络通信的主体,由它负责同对端进行网络通信、注册和数据操作等功能。

状态主要包括:打开、关闭、连接
主要的IO操作,读(read)、写(write)、连接(connect)、绑定(bind)。
所有的IO操作都是异步的,调用诸如read,write方法后,并不保证IO操作完成,但会返回一个凭证,在IO操作成功,取消或失败后会记录在该凭证中。

2.组件

Channel对应有几个重要的组件,分别是ChannelHandler、ChannelHandlerContext、ChannelPipeline

以下是它们的关系图:

技术图片

 

1>ChannelHandler

是业务逻辑的核心处理类,用于处理Channel对应的事件。

ChannelHandler接口里面只定义了三个生命周期方法,我们主要实现它的子接口ChannelInboundHandler和ChannelOutboundHandler,为了便利,框架提供了ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler这三个适配类,在使用的时候只需要实现你关注的方法即可。

 

*以下是ChannelHandler类图:

技术图片 

 

*ChannelHandler下主要是两个子接口:

      ChannelInboundHandler(入站): 处理输入数据和Channel状态类型改变。

                                      适配器: ChannelInboundHandlerAdapter(适配器设计模式)

                                      常用的: SimpleChannelInboundHandler

    ChannelOutboundHandler(出站): 处理输出数据

                                      适配器: ChannelOutboundHandlerAdapter

每一个Handler都一定会处理出站或者入站(可能两者都处理),区别在于SimpleChannelInboundHandler会对没有外界引用的资源进行一定的清理,并且入站的消息可以通过泛型来规定。

这里为什么有设配器模式呢?

      我们在写自定义Handel时候,很少会直接实现上面两个接口,因为接口中有很多默认方法需要实现,所以这里就采用了设配器模式,ChannelInboundHandlerAdapter和

ChannelInboundHandlerAdapter就是设配器模式的产物,让它去实现上面接口,实现它所有方法。那么你自己写自定义Handel时,只要继承它,就无须重写上面接口的所有方法了。

 

*ChannelHandler 生命周期:

handlerAdded: 当 ChannelHandler 添加到 ChannelPipeline 调用

handlerRemoved: 当 ChannelHandler 从 ChannelPipeline 移除时调用

exceptionCaught: 当 ChannelPipeline 执行抛出异常时调用

 

2>channelHandlerContex

是维护ChannelHandler的上下文内容(1. channelHandler的一些状态信息的存储;2. channelHandler之间互相通讯的桥梁,如本文第一张图便可知道ChannelHandlerContext的重要性了。(通过channelHandlerContext,我们可用把上一个channelHandler的处理结果传递给下一个channelHandler))。

每个ChannelHandler通过add方法加入到ChannelPipeline中去的时候,会创建一个对应的ChannelHandlerContext,并且绑定,ChannelPipeline实际维护的是ChannelHandlerContext 的关系。
每个ChannelHandlerContext之间形成双向链表。 

 

3>channelPipeline

在Channel创建的时候,会同时创建ChannelPipeline。可以理解为ChannelPipeline是ChannelHandler的容器,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。channel事件消息在ChannelPipeline中流动和传播,相应的事件能够被ChannelHandler拦截处理、传递、忽略或者终止。Pipeline把channelHandle和Context给连接起来。

以下是channelPipeline内的关系图:

技术图片

由上图可以看出,ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。

 1 /**
 2   * 可以看到,DefaultChinnelPipeline 内部使用了两个特殊的Hander 来表示Handel链的头和尾。
 3   */
 4  public DefaultChannelPipeline(AbstractChannel channel) {
 5         if (channel == null) {
 6             throw new NullPointerException("channel");
 7         }
 8         this.channel = channel;
 9  
10         TailHandler tailHandler = new TailHandler();
11         tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);
12  
13         HeadHandler headHandler = new HeadHandler(channel.unsafe());
14         head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
15  
16         head.next = tail;
17         tail.prev = head;
18     }

 

技术图片

 由上图和代码可以看出,对于DefaultChinnelPipeline它的Handel头部和尾部的Handel是固定的,我们所添加的Handel是添加在这个头和尾之前的Handel。(下面这个图更加清晰)

 

3.Channel 生命周期

技术图片

1>channelRegistered: channel注册到一个EventLoop。
2>channelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据
3>channelInactive: channel处于非活跃状态,没有连接到远程主机
4>channelUnregistered: channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定

 

 二、Netty线程模型

Netty 主要基于主从 Reactors 多线程模型(如下图)做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:

MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。

SubReactor 负责相应通道的 IO 读写请求。

非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。

这里引用 Doug Lee 大神的 Reactor 介绍:Scalable IO in Java 里面关于主从 Reactor 多线程模型的图:

 技术图片

特别说明的是:虽然 Netty 的线程模型基于主从 Reactor 多线程,借用了 MainReactor 和 SubReactor 的结构。但是实际实现上 SubReactor 和 Worker 线程在同一个线程池中:

1 EventLoopGroup bossGroup = new NioEventLoopGroup();
2 EventLoopGroup workerGroup = new NioEventLoopGroup();
3 ServerBootstrap server = new ServerBootstrap();
4 server.group(bossGroup, workerGroup)
5 .channel(NioServerSocketChannel.class)

上面代码中的 bossGroup 和 workerGroup 是 Bootstrap 构造方法中传入的两个对象,这两个 group 均是线程池:

bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor,专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程。

workerGroup 线程池会被各个 SubReactor 和 Worker 线程充分利用。

 

三、基于netty构建服务的基本步骤

1.定义两个线程组 也叫做事件循环组

2.定义一个的启动服务类

 1 package cn.haoxiaoyong.netty.netty;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.nio.NioServerSocketChannel;
 8 import org.springframework.stereotype.Component;
 9 
10 import javax.annotation.PostConstruct;
11 
12 /**
13  *  @author chensisi
14  *  @Date 2020/6/22
15  *  @Description 1.实现客户端发送请求
16  */
17 @Component
18 public class WebSocketServer {
19 
20 
21     /**
22      *主线程组,用于接收客户端的链接,但不做任何处理
23      */
24     private EventLoopGroup bossGroup;
25     /**
26      *定义从线程组,主线程组会把任务转给从线程组进行处理
27      */
28     private EventLoopGroup workerGroup;
29     /**
30      *服务器
31      */
32     private ServerBootstrap server;
33     /**
34      *回调
35      */
36      private ChannelFuture future;
37 
38     /**
39      * 服务启动类,任务分配自动处理
40      *
41      */
42     @PostConstruct
43     public void start() {
44         //启动
45         //绑定监听端口,调用sync同步阻塞方法等待绑定操作完
46         // ChannelFuture future = server.bind(port).sync();
47         try {
48             future = server.bind(9001);
49             System.out.println("netty server - 启动成功");
50
51             //获取某个客户端所对应的chanel,关闭并设置同步方式
52             //future.channel().closeFuture().sync();
53         } catch (Exception e) {
54             e.printStackTrace();
55         } finally {
56             //使用一种优雅的方式进行关闭
57             bossGroup.shutdownGracefully();
58             workerGroup.shutdownGracefully();
59         }
60     }
61 
62     /**
63      * 构造方法,方便调用
64      */
65     public WebSocketServer() {
66         bossGroup = new NioEventLoopGroup();
67         workerGroup = new NioEventLoopGroup();
68         server = new ServerBootstrap();
69         //需要去针对一个之前的线程模型(上面定义的是主从线程)
70         server.group(bossGroup,workerGroup)
71                 //设置NIO的双向通道
72                 .channel(NioServerSocketChannel.class)
73                 //子处理器,用于处理workerGroup
74                 /*设置chanel初始化器
75                  每一个chanel由多个handler共同组成管道(pipeline)*/
76 
77                 .childHandler(new WebSocketInitializer());
78     }
79 
80 }

2. 通过ChannelPipeline初始化处理器,类似于拦截器Chain,当客户端首次连接后即调用initChannel方法完成初始化动作。

   初始化器,服务端启动后会自动调用这个方法,它是一个回调方法

 1 public class WebSocketInitializer extends ChannelInitializer {
 2     /**
 3      *
 4      * @param ch
 5      * @throws Exception
 6      */
 7     @Override
 8     protected void initChannel(SocketChannel ch) throws Exception {
 9         ChannelPipeline pipeline = ch.pipeline();
10         /*
11                 用于支持 Http协议
12         */
13         //websocket基于 http协议,需要有 http 的编解码器
14         pipeline.addLast(new HttpServerCodec())
15                 //对于大数据流的支持
16                 .addLast(new ChunkedWriteHandler())
17                 //添加对HTTP请求和响应的聚合器:只要使用Netty进行 http编码都需要使用到
18                 //对HttpMessage进行聚合,聚合成FullHttpRequest或者FullHttpResponse
19                 //在 netty 编程总都会使用到Handler
20                 .addLast(new HttpObjectAggregator(1024*64))
21                 .addLast(new WebSocketServerProtocolHandler("/ws"))
22                 //添加Netty空闲超时检查的支持
23                 //4:读空闲超时,8:写空闲超时,12: 读写空闲超时
24                 .addLast(new IdleStateHandler(4,8,12))
25                 .addLast(new HearBeatHandler())
26                 //添加自定有的 handler
27                 .addLast(new ChatHandler());
28     }
29 

3.  Client的消息处理类Handler,通常继承SimpleChannelInboundHandler, 该处理器重写channelRead0方法,该方法负责请求接入,读取客户端请求,发送响应给客户端,并且重写了ChannelInboundHandlerAdapter父类的几个方法,分析不同事件方法的调用

 1 public class ChatHandler extends SimpleChannelInboundHandler {
 2 
 3     private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:MM");
 4 
 5     private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 6 
 7     private static ApplicationContext applicationContext;
 8 
 9     /**
10      * 客户端读取到数据后干什么
11      *
12      * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()
13      */
14     @Override
15     protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
16         String content = textWebSocketFrame.text();
17         System.out.println("接收到的数据: " + content);
18 
19         Message message = JSON.parseObject(content, Message.class);
20         ChatRecordService chatRecordService = SpringUtil.getBean(ChatRecordService.class);
21         switch (message.getType()) {
22             //处理客户端链接的消息
23             //表示连接
24             case 0:
25                 //建立用户和通道之间的关系
26                 UserChannelMap.put(message.getChatRecord().getUserid(), channelHandlerContext.channel());
27                 System.out.println(message.getChatRecord().getUserid() + "与" + channelHandlerContext.channel().id() + "建立了关联");
28                 UserChannelMap.print();
29                 break;
30             //表示发送消息
31             case 1:
32                 //将消息保存到数据库
33                 ChatRecord chatRecord = message.getChatRecord();
34                 chatRecordService.insert(chatRecord);
35                 //查看此好友是否在线,如果在线就将消息发送给此好友
36                 //1.根据好友id,查询此通道是否存在
37                 Channel channel = UserChannelMap.get(chatRecord.getFriendid());
38                 if (channel != null) {
39                     channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
40                 } else {
41                     System.out.println("用户" + chatRecord.getFriendid() + "不在线");
42                 }
43                 break;
44             //接收消息
45             case 2:
46                 //将消息设置为已读
47                 chatRecordService.updateChatRecordHasRead(message.getChatRecord().getId());
48                 break;
49             //检测心跳
50             case 3:
51                 //接收心跳信息
52                 System.out.println("接收到心跳消息"+JSON.toJSONString(message));
53         }
54         /*//将接收的消息发送所有的客户端
55         for (Channel channel : channels) {
56             channel.writeAndFlush(new TextWebSocketFrame(sdf.format(new Date()) + ":" + content));
57         }*/
58     }
59 
60     /**
61      * 助手类添加
62      * 每当从服务端收到新的客户端连接时,客户端的 Channel 存入  ChannelGroup列表中,并通知列表中的其他客户端 Channel
63      * 当有新的客户端连接服务器之后,就会自动调用这个方法
64      */
65     @Override
66     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
67         channels.add(ctx.channel());
68     }
69     /**
70      * 捕获channel异常
71      * 出现异常是关闭通道
72      */
73     @Override
74     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
75         //根据通道id取消用户和通道的关系
76         UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
77         ctx.channel().close();
78         System.out.println("出现异常.....关闭通道!");
79     }
80     /**
81      * 助手类移除
82      * 每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
83      * 当客户端关闭链接时关闭通道
84      */
85     @Override
86     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
87         System.out.println("关闭通道");
88         UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
89         ctx.channel().close();
90         UserChannelMap.print();
91     }

 4.心跳机制(基本步骤只要前3步即可)(这个心跳机制还没有细细研究,等研究完了再来补上相关知识点)

 1 public class HearBeatHandler extends ChannelInboundHandlerAdapter {
 2 
 3     /**
 4      * 触发用户事件
 5      */
 6     @Override
 7     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
 8         //evt是否为一个IdleStateEvent类的实例
 9         if (evt instanceof IdleStateEvent) {
10             IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
11             if (idleStateEvent.state() == IdleState.READER_IDLE) {
12                 //读空闲
13                 //检测到读空闲不做任何的操作
14                 System.out.println("读空闲事件触发...");
15             } else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
16                 //写空闲
17                 //检测到写空闲不做任何的操作
18                 System.out.println("写空闲事件触发...");
19             } else if (idleStateEvent.state() == IdleState.ALL_IDLE) {
20                 //读写空闲
21                 System.out.println("--------------");
22                 System.out.println("读写空闲事件触发");
23                 System.out.println("关闭通道资源");
24                 ctx.channel().close();
25             }
26         }
27     }
28 }

以上只是部分代码

代码学习的git地址:

后端项目地址: https://github.com/haoxiaoyong1014/chat-software

前端项目地址: https://github.com/haoxiaoyong1014/chat-view

 

netty的优点

零拷贝

Netty的传输快其实也是依赖了NIO的这个。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。

Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

 技术图片

               传统数据拷贝

技术图片

                 零拷贝

 

 

 

 

抄录地址:

1.Netty的使用:Server和Client通信

2.SpringBoot+Netty开发IM即时通讯系列(一)

3.(七)分布式通信----Netty实现NIO通信

4.【Netty】(8)---理解ChannelPipeline

5. Netty学习笔记之ChannelHandler

6.这可能是目前最透彻的Netty原理架构解析

 

Netty学习笔记之ChannelHandler

技术图片土豆肉丝盖浇饭

Netty框架

标签:dex   tty   区别   switch   字符串   组成   性能   ssg   土豆   

原文地址:https://www.cnblogs.com/chensisi/p/13164402.html


评论


亲,登录后才可以留言!