netty 4.x用户使用指南

2021-04-29 04:26

阅读:451

  本节中要实现的协议是TIME协议。与前面的示例不同的是,它发送一个包含32位整数的消息,而不接收任何请求,并且在消息发送后关闭连接。在本例中,您将学习如何构造和发送消息,以及如何在完成时关闭连接。因为我们将忽略任何接收到的数据,但是在连接建立后立即发送消息,所以这次我们不能使用channelRead()方法。相反,我们应该重写channelActive()方法。下面是实现:
package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
        
        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
注: 1、如前所述,将在建立连接并准备生成通信量时调用channelActive()方法。让我们写一个32位整数,表示这个方法中的当前时间。
        2、要发送新消息,我们需要分配一个包含消息的新缓冲区。我们要写一个32位整数,因此我们需要一个ByteBuf,它的容量至少是4字节。通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator,并分配一个新的缓冲区。

        3、像往常一样,我们编写构造好的消息。

             但是等等,抛硬币在哪里?在使用NIO发送消息之前,我们不是曾经调用java.nio.ByteBuffer.flip()吗?ByteBuf没有这样的方法,因为它有两个指针;一个用于读操作,另一个用于写操作。当您向ByteBuf写入内容时,写入器索引会增加,而读取器索引不会改变。阅读器索引和写入器索引分别表示消息开始和结束的位置。

              相反,NIO缓冲区没有提供一种干净的方法来确定消息内容在哪里开始和结束,而不调用flip方法。当您忘记翻转缓冲区时,您将遇到麻烦,因为不会发送任何或不正确的数据。在Netty中不会发生这样的错误,因为对于不同的操作类型,我们有不同的指针。当你习惯了它,你会发现它让你的生活变得更容易——一个没有翻转的生活!

              要注意的另一点是ChannelHandlerContext.write()(和writeAndFlush())方法返回ChannelFuture。ChannelFuture表示尚未发生的I/O操作。这意味着,由于Netty中的所有操作都是异步的,因此可能还没有执行任何请求的操作。例如,以下代码可能会在发送消息之前关闭连接:

Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
  因此,您需要在ChannelFuture完成之后调用close()方法,这个方法由write()方法返回,当写操作完成时,它会通知它的侦听器。请注意,close()也可能不会立即关闭连接,它将返回ChannelFuture。
        4、那么,当写请求完成时,我们如何得到通知?这就像在返回的通道未来中添加一个ChannelFutureListener一样简单。在这里,我们创建了一个新的匿名通道futurelistener,它在操作完成时关闭通道。
              或者,您可以使用预定义的侦听器简化代码:
f.addListener(ChannelFutureListener.CLOSE);

  要测试我们的时间服务器是否按预期工作,您可以使用UNIX rdate命令:

$ rdate -o  -p 
  其中是main()方法中指定的端口号,通常是本地主机。
 
编写一个时间客户端
  与Discard服务器和ECHO服务器不同,我们需要一个时间协议客户机,因为人不能将32位二进制数据转换为日历上的日期。在本节中,我们将讨论如何确保服务器正确工作,以及如何使用Netty编写客户机。
  在Netty中,服务器和客户机之间最大也是唯一的区别是使用了不同的引导和通道实现。请查看以下代码:
package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
注:1、Bootstrap类似于ServerBootstrap,只是它用于非服务器通道,比如客户端通道或无连接通道。
    2、如果只指定一个EventLoopGroup,它将作为boss组和工作者组使用。但是,老板员工并不用于客户端
    3、与NioServerSocketChannel不同,NioSocketChannel用于创建客户端通道。
    4、注意,这里我们不像使用ServerBootstrap那样使用childOption(),因为客户端SocketChannel没有父节点。
    5、我们应该调用connect()方法,而不是bind()方法。
如您所见,它与服务器端代码并没有什么不同。那么ChannelHandler实现呢?它应该从服务器接收一个32位的整数,将其转换为人类可读的格式,打印翻译后的时间,并关闭连接:
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
注:1、在TCP/IP中,Netty将从对等点发送的数据读入ByteBuf。
  它看起来非常简单,与服务器端示例没有任何不同。然而,这个处理程序有时会拒绝启动IndexOutOfBoundsException。我们将在下一节讨论为什么会发生这种情况。
 
处理基于流的传输
  套接字缓冲区的一个小警告
  在基于流的传输(如TCP/IP)中,接收到的数据存储在套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是包的队列,而是字节的队列。这意味着,即使您将两个消息作为两个独立的信息包发送,操作系统也不会将它们视为两个消息,而只是一堆字节。因此,不能保证您所阅读的内容就是您的远程对等者所写的内容。例如,假设一个操作系统的
TCP/IP栈接收了三个包:
技术图片
   由于基于流的协议的一般特性,在您的应用程序中很有可能以以下片段形式阅读它们:
技术图片
技术图片
  因此,无论接收部分是服务器端还是客户端,都应该将接收到的数据碎片整理成应用程序逻辑可以容易理解的一个或多个有意义的帧,应用程序逻辑可以很容易地理解这些帧。对于上面的例子,接收到的数据应该像下面这样构造:
技术图片
技术图片
第一个解决方案:
  现在让我们回到TIME客户端示例。我们在这里遇到同样的问题。32位整数是非常少量的数据,并且不太可能经常被分段。然而,问题在于它可能是碎片化的,并且随着流量的增加,碎片化的可能性将增加。
  简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节都被接收到内部缓冲区。以下是TimeClientHandler修复此问题的修改实现:
package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
注:1、ChannelHandler有两个生命周期监听器方法:handlerAdded()和handlerRemoved()。您可以执行任意(de)初始化任务,只要它不会长时间阻塞。
    2、首先,应将所有收到的数据累积到buf。
    3、然后,处理程序必须检查buf是否有足够的数据,在此示例中为4个字节,然后继续执行实际的业务逻辑。否则,channelRead()当更多数据到达时,Netty将再次调用该方法,最终将累计所有4个字节。
 
第二种解决方案
  虽然第一个解决方案已经解决了TIME客户端的问题,但修改后的处理程序看起来并不干净。想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。您的ChannelInboundHandler实施将很快变得无法维护。
  您可能已经注意到,您可以ChannelHandler为a 添加多个ChannelPipeline,因此,您可以将一个单片拆分ChannelHandler为多个模块化,以降低应用程序的复杂性。例如,您可以拆分TimeClientHandler为两个处理程序:
  • TimeDecoder 它涉及碎片问题,以及
  • 最初的简单版本TimeClientHandler
  幸运的是,Netty提供了一个可扩展的类,可以帮助您编写第一个开箱即用的类:
package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { // (2)
        if (in.readableBytes() ) {
            return; // (3)
        }
        out.add(in.readBytes(4)); // (4)
    }
}
注:1、ByteToMessageDecoder是一种实现ChannelInboundHandler,可以很容易地处理碎片问题。
       2、ByteToMessageDecoderdecode()每当收到新数据时,都会使用内部维护的累积缓冲区调用该方法。
       3、decode()可以决定不向累积缓冲区中没有足够数据的地方添加任何内容。当接收到更多数据时,ByteToMessageDecoder将再次调用decode()。
      4、如果decode()向out添加一个对象,则表示解码器成功解码一条消息。ByteToMessageDecoder将丢弃累积缓冲区的读取部分。请记住,您不需要解码多个消息。ByteToMessageDecoder将继续调用decode()方法,直到它没有向out添加任何内容为止。
  现在我们有另一个处理程序插入到ChannelPipeline中,我们应该修改TimeClient中的ChannelInitializer实现:
b.handler(new ChannelInitializer() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

  如果你是一个喜欢冒险的人,你可能想试试ReplayingDecoder,这将解码器变得更加简单。不过,您需要参考API参考以获得更多信息。

public class TimeDecoder extends ReplayingDecoder {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List out) {
        out.add(in.readBytes(4));
    }
}
  另外,Netty提供了开箱即用的解码器,它使您能够非常容易地实现大多数协议,并帮助您避免最终出现难以维护的单块处理程序实现。更详细的例子请参考以下包:   
  • io.netty.example.factorial 基于二进制协议
  • io.netty.example.telnet 基于文本行的协议.
用POJO代替ByteBuf
  到目前为止,我们所审查的所有示例都使用了ByteBuf作为协议消息的主要数据结构。在本节中,我们将改进TIME协议客户端和服务器示例以使用POJO而不是ByteBuf
在你的ChannelHandler中使用POJO的优势是显而易见的; 通过分离ByteBuf从处理程序中提取信息的代码,您的处理程序变得更易于维护和重用。在TIME客户端和服务器示例中,我们只读取一个32位整数,这不是ByteBuf直接使用的主要问题。但是,您会发现在实现真实世界协议时必须进行分离。
  首先,让我们定义一个名为的新类型UnixTime
package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

  我们现在可以修改它TimeDecoder来产生一个UnixTime而不是一个ByteBuf

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
    if (in.readableBytes() ) {
        return;
    }
    out.add(new UnixTime(in.readUnsignedInt()));
}

  使用更新的解码器,TimeClientHandler不再使用ByteBuf

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

  更简单,更优雅,对吧?可以在服务器端应用相同的技术。我们TimeServerHandler这次更新第一次:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

  现在,唯一缺少的部分是一个编码器,它的实现ChannelOutboundHandler将一个UnixTime转换为一个ByteBuf。它比编写解码器简单得多,因为编码消息时无需处理数据包碎片和汇编。

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
注:1、这一行中有很多重要的事情。
            首先,我们按原样传递原始文件ChannelPromise,以便当编码数据实际写入线路时,Netty将其标记为成功或失败。
            第二,我们没有调用ctx.flush()。有一个单独的处理程序方法void flush(ChannelHandlerContext ctx),用于覆盖flush()操作。
  为了进一步简化,您可以使用MessageToByteEncoder
public class TimeEncoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}
  最后一个任务是在TimeServerHandler之前将一个TimeEncoder插入到服务器端ChannelPipeline中,这只是一个简单的练习。
 
关闭你的应用程序
  关闭一个Netty应用程序通常与关闭通过shutdowndowns()创建的所有EventLoopGroups一样简单。当EventLoopGroup完全终止并且属于该组的所有通道都已关闭时通知您,它返回一个Future
 
概要
  在本章中,我们快速浏览了Netty,并演示了如何在Netty上编写完整的网络应用程序。
  在接下来的章节中有关于Netty的更多详细信息。我们还鼓励您查看io.netty.example包中的Netty示例。
  另请注意,社区始终在等待您的问题和想法,以帮助您并根据您的反馈不断改进Netty及其文档。


评论


亲,登录后才可以留言!