netty 4.x用户使用指南
2021-04-29 04:26
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3、像往常一样,我们编写构造好的消息。
但是等等,抛硬币在哪里?在使用NIO发送消息之前,我们不是曾经调用java.nio.ByteBuffer.flip()吗?ByteBuf没有这样的方法,因为它有两个指针;一个用于读操作,另一个用于写操作。当您向ByteBuf写入内容时,写入器索引会增加,而读取器索引不会改变。阅读器索引和写入器索引分别表示消息开始和结束的位置。
相反,NIO缓冲区没有提供一种干净的方法来确定消息内容在哪里开始和结束,而不调用flip方法。当您忘记翻转缓冲区时,您将遇到麻烦,因为不会发送任何或不正确的数据。在Netty中不会发生这样的错误,因为对于不同的操作类型,我们有不同的指针。当你习惯了它,你会发现它让你的生活变得更容易——一个没有翻转的生活!
要注意的另一点是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示尚未发生的I/O操作。这意味着,由于Netty中的所有操作都是异步的,因此可能还没有执行任何请求的操作。例如,以下代码可能会在发送消息之前关闭连接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
f.addListener(ChannelFutureListener.CLOSE);
要测试我们的时间服务器是否按预期工作,您可以使用UNIX rdate命令:
$ rdate -o-p
在Netty中,服务器和客户机之间最大也是唯一的区别是使用了不同的引导和通道实现。请查看以下代码:
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
2、如果只指定一个EventLoopGroup,它将作为boss组和工作者组使用。但是,老板员工并不用于客户端
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
TIME
客户端示例。我们在这里遇到同样的问题。32位整数是非常少量的数据,并且不太可能经常被分段。然而,问题在于它可能是碎片化的,并且随着流量的增加,碎片化的可能性将增加。TimeClientHandler
修复此问题的修改实现:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
TIME
客户端的问题,但修改后的处理程序看起来并不干净。想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。您的ChannelInboundHandler
实施将很快变得无法维护。ChannelHandler
为a 添加多个ChannelPipeline
,因此,您可以将一个单片拆分ChannelHandler
为多个模块化,以降低应用程序的复杂性。例如,您可以拆分TimeClientHandler
为两个处理程序:-
TimeDecoder
它涉及碎片问题,以及 - 最初的简单版本
TimeClientHandler
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
如果你是一个喜欢冒险的人,你可能想试试ReplayingDecoder,这将解码器变得更加简单。不过,您需要参考API参考以获得更多信息。
public class TimeDecoder extends ReplayingDecoder{ @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List
-
io.netty.example.factorial
基于二进制协议 -
io.netty.example.telnet
基于文本行的协议.
ByteBuf
作为协议消息的主要数据结构。在本节中,我们将改进TIME
协议客户端和服务器示例以使用POJO而不是ByteBuf
。ChannelHandler
中使用POJO的优势是显而易见的; 通过分离ByteBuf
从处理程序中提取信息的代码,您的处理程序变得更易于维护和重用。在TIME
客户端和服务器示例中,我们只读取一个32位整数,这不是ByteBuf
直接使用的主要问题。但是,您会发现在实现真实世界协议时必须进行分离。UnixTime
。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
我们现在可以修改它TimeDecoder
来产生一个UnixTime
而不是一个ByteBuf
。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
使用更新的解码器,TimeClientHandler
不再使用ByteBuf
:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
更简单,更优雅,对吧?可以在服务器端应用相同的技术。我们TimeServerHandler
这次更新第一次:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
现在,唯一缺少的部分是一个编码器,它的实现ChannelOutboundHandler
将一个UnixTime
转换为一个ByteBuf
。它比编写解码器简单得多,因为编码消息时无需处理数据包碎片和汇编。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
MessageToByteEncoder
:
public class TimeEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { out.writeInt((int)msg.value()); } }
Future
。io.netty.example
包中的Netty示例。