Netty(二、深入理解)
2021-03-08 12:29
标签:nec tail and 指定端口 interrupt etl 网络通 nal encode 在深入了解Netty之前,我们需要先知道reactor(反应器模式),是高性能网络编程必须知道的模式。 我们先了解下原始socket编程: 以服务端为例,Socket建立好后不断循环监听是否有套接字连接,获取到连接后,从socket获取输入流。在发送/接收数据时,并不是直接从网络中读取或发送,而是要通过缓冲区,例如:发送数据时,现将数据写入缓冲区,然后再由TCP/IP协议将数据由缓冲区发送目标的缓冲区,目标从缓冲区中读取。 这种多线程的socket虽然通过一个线程一个socket的方式,提高了服务器的吞吐,但每个线程内部还是阻塞的,当并发量大时,线程的反复创建和销毁会对系统造成巨大的负担。针对这种情况,我们就需要用到reactor模式。 reactor模式,基于java NIO之上,抽象出了两个组件:Reactor和Handler。 Reactor:负责响应IO事件,如新事件的连接、读写,将事件交给Handler处理。 Handler:负责事件的处理,完成channel的读取,事件逻辑处理,channel的写出。 客户端每个请求都封装成一个channel通道连接到selector上面,并有一个selectionKey选择键,选择器的附加对象是Handler处理器,将请求分派到handler中。 单线程的缺点是当Handler阻塞时,会导致其他client的请求也阻塞,这种实际使用不多,一般使用多线程的reactor模式。 多线程是将handler放入一个线程池,多线程的进行业务处理 具体代码就不展示了,也就是在Handler中建立一个线程池来进行读写操作。 ps:以上就是基于java NIO的reactor模式,虽然逻辑有些复杂且不易理解。但是在理解Netty之前一定要先理解它。 我们先将demo代码贴上来,通过代码来对Netty进行理解。 以服务端为例,设置两个线程组。 一个线程组负责监听连接的parentChannel,定义为BossLoopGroup 另一个线程组负责客户端连接读写的childChannel,定义为WorkerLoopGroup 一个线程封装到一个EventLoop,多个EventLoop就组成了线程组。而每一个channel绑定一个EventLoop,一个EventLoop可以有多个channel。 Bootstrap是Netty提供的一个工厂类,我们可以通过它来完成对Netty服务端或客户端的初始化配置,这样我们就省去了用JDK NIO繁琐的创建channel、设置、启动等步骤,将重心放在事件业务处理上面。 Bootstrap分为服务端的ServerBootstrap和客户端的Bootstrap Bootstrap执行分为8个步骤: 这8个步骤和我上面写的DEMO顺序方面可能稍有不同,不过不影响。 Channel是服务端与客户端的通信通道,每一个request都可以封装成一channel。 ChannelPipeline是用于存放Handler的容器,里面存放这事件处理器流水线。 ChannelHandler是处理器,分为入站处理器和出站处理器,以客户端的角度来看,客户端到服务端是出站,服务端到客户端就是入站。 ChannelContext是通信管道的上下文,当一个入站或出站处理器处理完后,将上下文传给下一个入站或出站处理器。 这里就是配发事件处理器流水线,编码器实质上也是个处理器,出站编码,入站解码。 自定义处理器也要继承出站或入站的事件处理配置类 数据在网络中传输并不是直接传输的,而是要通过缓冲区。写出时,先将数据写到缓冲区,再由TCP协议将数据从缓冲区。读取时也是一样,从缓冲区读取。 JAVA NIO中的缓冲区是ByteBuff,长度固定且只有一个索引,在读写操作的时候还需要切换读写状态。而Netty的ByteBuf则改良了这些问题。 在ByteBuf中,提供了三个索引,读索引(readIndex)、写索引(writeIndex)、最大容量(maxCapacity) 我们再看看这张图,入站时,当走到tailHandler(最后一个Handler)的时候,会释放掉缓冲区。出站则是在headHandler释放。 Netty(二、深入理解) 标签:nec tail and 指定端口 interrupt etl 网络通 nal encode 原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12658874.htmlreactor模式
BIO
//这里可以是个多线程,每个线程对应一个socket,循环处理业务,此处代码就略了,主要讲逻辑
while (true){
//new Thread()...
//Server监听指定端口
ServerSocket server = new ServerSocket(8080);
//socket阻塞,一直等待着连接到来
Socket socket = server.accept();
//从socket获取输入流
InputStream inputStream = socket.getInputStream();
//建立缓冲区进行读取
byte[] bytes = new byte[1024];
int len;
StringBuilder sb = new StringBuilder();
while ((len = inputStream.read(bytes)) != -1) {
//指定编码格式
sb.append(new String(bytes, 0, len,"UTF-8"));
}
System.out.println(sb);
inputStream.close();
socket.close();
server.close();
}单线程reactor模式
Reactor
package com.wk.test.nettyTest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class Reactor implements Runnable {
//选择器
final Selector selector;
//服务端通道
final ServerSocketChannel serverChannel;
//构造函数初始化
Reactor(int port) throws IOException {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
//绑定连接
serverChannel.socket().bind(new InetSocketAddress(port));
//非阻塞
serverChannel.configureBlocking(false);
//将服务端的通道绑定到选择器上面,并定义事件为接收连接时间
//OP_ACCEPT:接收连接就绪事件,服务端监听到客户端,可接收连接 1//OP_CONNECT:连接就绪事件,表示客户端与服务端建立连接成功 1//OP_READ:读就绪事件,表示通道中有可读数据,可执行读操作 1//OP_WRITE:写就绪事件,表示可以向通道写数据 1
SelectionKey selectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//选择键通过attach方法附加一个对象
selectionKey.attach(new Acceptor());
}
@Override
public void run() {
//不中断的线程则循环,interrupted方法,判断线程是否中断,并能释放已经中断的线程
while (!Thread.interrupted()){
try {
//这里每一个request封装一个channel,所有的channel注册在一个选择器上,selector选择器不断轮询查看可读状态
selector.select();
//获取选择器的选择键集合
Set
Handler
package com.wk.test.nettyTest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
public class Handler implements Runnable {
//通道
final SocketChannel channel;
//绑定到选择器的选择键
final SelectionKey selectionKey;
//定义输入输出缓冲区
ByteBuffer inputBuffer = ByteBuffer.allocate(102400);
ByteBuffer outputBuffer = ByteBuffer.allocate(102400);
static final boolean READING = true, WRITING = false;
//初始化定义可读就绪
boolean status = READING;
Handler(Selector selector, SocketChannel c) throws IOException {
channel = c;
//非阻塞
c.configureBlocking(false);
//这里将通道注册到选择器上,本应后面的int是 1(读),4(写),8(连接),16(可连接)的
//这种操作貌似是判断JDK的selector有没有立即返回或报错,并不引起任何实质操作。
//https://github.com/netty/netty/issues/1836 这个讨论问题的地址,外国友人貌似也搞不懂,似乎是个JDK NIO的BUG
selectionKey = channel.register(selector, 0);
//选择键将本身也就是Handler附加
selectionKey.attach(this);
//定义当前选择键是读就绪状态
selectionKey.interestOps(SelectionKey.OP_READ);
//唤醒选择器
selector.wakeup();
}
@Override
public void run() {
try {
if (status) {
read();
} else {
write();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void read() throws IOException {
channel.read(inputBuffer);
//一系列逻辑判定和处理
status = WRITING;
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
public void write() throws IOException {
channel.write(outputBuffer);
//判定写操作执行完毕后,关闭selectKey
selectionKey.cancel();
}
}
多线程reactor模式
Netty DEMO
服务端
NettyServerTest
package com.wk.test.nettyTest;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyServerTest {
private static final Logger logger = LoggerFactory.getLogger(NettyServerTest.class);
public static void main(String[] args) {
//实例化两个线程组
//处理服务器与客户端的连接
EventLoopGroup pGroup = new NioEventLoopGroup(1);
//进行网络通信(读写)
EventLoopGroup cGroup = new NioEventLoopGroup(10);
//配置容器,配置相关信息
ServerBootstrap bootstrap = new ServerBootstrap()
.group(pGroup,cGroup) //绑定两个线程组
.channel(NioServerSocketChannel.class) //指定NIO的模式
.childHandler(new ChannelInitializer
NettyServerHandler
package com.wk.test.nettyTest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 服务端事件处理器,基础入站处理器类
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
/**
* 客户端连接时触发
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel active");
}
/**
* 客户端发送消息时触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("服务端接收的消息:{}", msg.toString());
ctx.write("服务器返回");
ctx.flush();
}
/**
* 发生异常时触发
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
NettyClientTest
package com.wk.test.nettyTest;
import cn.jiguang.common.connection.NettyClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyClientTest {
private static final Logger logger = LoggerFactory.getLogger(NettyClientTest.class);
public static void main(String[] args) {
//客户端只需要定义一个读写的线程组
EventLoopGroup group = new NioEventLoopGroup();
//客户端是bootstrap,其他和服务端配置大同小异
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer
NettyClientHandler
package com.wk.test.nettyTest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 客户端处理类,继承入站处理适配器
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("客户端Active .....");
}
/**
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("客户端接收的消息:{}", msg.toString());
}
/**
* 发生异常时触发
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EventLoop
//实例化两个线程组
//处理服务器与客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//进行网络通信(读写)
EventLoopGroup workerGroup = new NioEventLoopGroup(10);
Bootstrap
ServerBootstrap bootstrap = new ServerBootstrap()
//1.设置reactor线程
.group(bossGroup,workerGroup)
//2.设置channel通道的类型,这里是NIO
.channel(NioServerSocketChannel.class)
//3.设置监听端口
.localAddress(new InetSocketAddress(8090))
//4.设置通道的选项
.option(ChannelOption.SO_BACKLOG, 1024) //设置TCP缓冲区
.childOption(ChannelOption.SO_KEEPALIVE, true) //心跳检测保持连接
//5.配发事件处理器流水线
.childHandler(new ChannelInitializer
Channel
核心概念以及流程
ChannelHandler
//5.配发事件处理器流水线
.childHandler(new ChannelInitializer
public class NettyServerHandler extends ChannelInboundHandlerAdapter
ByteBuf
缓冲区的释放