java nio消息半包、粘包解决方案
2021-02-12 04:18
标签:读取数据 如何 建立 首部 limit 通过 数据交互 文件 判断 NIO是面向缓冲区进行通信的,不是面向流的。我们都知道,既然是缓冲区,那它一定存在一个固定大小。这样一来通常会遇到两个问题: 介绍这个问题之前,务必要提一下我代码整体架构。 https://github.com/CuriousLei/smyl-im 在这个项目中,我的NIO核心库设计思路流程图如下所示 介绍: 光这样实现,必然会有粘包、半包问题。要重现这两个问题也很简单。 这个问题实质上是消息体与缓冲区数据不一一对应导致的。那么,如何解决呢? 可以采用固定头部方案来解决,头部设置四个字节,存储一个int值,记录后面数据的长度。以此来标记一个消息体。 我的工程项目中,客户端和服务端共用一个nio核心包,即niohdl,可保证收发数据格式一致。 要实现以上设想,必须在connector和ioArgs之间加一层Dispatcher类,用于处理消息体与缓冲区之间的转化关系(消息体取个名字:Packet)。根据输入和输出的不同,分别叫ReceiveDispatcher和SendDispatcher。即通过它们来操作Packet与ioArgs之间的转化。 定义这个消息体,继承关系如下图所示: Packet是基类,代码如下: SendPacket和ReceivePacket分别代表发送消息体和接收消息体。StringReceivePacket和StringSendPacket代表字符串类的消息,因为本次实践只限于字符串消息的收发,今后可能有文件之类的,有待扩展。 代码中必然会涉及到字节数组的操作,所以,以StringSendPacket为例,需要提供将String转化为byte[]的方法。代码如下所示: 在connector对象的实例域中会引用一个SendDispatcher对象。发送数据时,会通过SendDispatcher中的方法对数据进行封装和处理。其大致的关系图如下所示: SendDispatcher中设置任务队列Queue 在代码中,SendDispatcher实际上是一个接口,我用AsyncSendDispatcher实现此接口,代码如下: 同样,ReceiveDispatcher也是一个接口,代码中用AsyncReceiveDispatcher实现。在connector对象的实例域中会引用一个AsyncReceiveDispatcher对象。接收数据时,会通过ReceiveDispatcher中的方法对接收到的数据进行拆包处理。其大致的关系图如下所示: 每一个消息体的首部会有一个四字节的int字段,代表消息的长度值,按照这个长度来进行读取。如若一个ioArgs未满足这个长度,就读取下一个ioArgs,保证数据包的完整性。这个流程就不画程序框图了,偷个懒hhhh。其实看下面代码注释已经很清晰了,容易理解。 AsyncReceiveDispatcher的代码如下所示: 其实粘包、半包的解决方案并没有什么奥秘,单纯地复杂而已。方法核心就是自定义一个消息体Packet,完成Packet中的byte数组与缓冲区数组之间的复制转化即可。当然,position、limit等等指针的辅助很重要。 总结这个博客,也是将目前为止的工作进行梳理和记录。我将通过smyl-im这个项目来持续学习+实践。因为之前学习过程中有很多零碎的知识点,都躺在我的有道云笔记里,感觉没必要总结成博客。本次博客讲的内容刚好是一个成体系的东西,正好可以将这个项目背景带出来,后续的博客就可以在这基础上衍生拓展了。 java nio消息半包、粘包解决方案 标签:读取数据 如何 建立 首部 limit 通过 数据交互 文件 判断 原文地址:https://www.cnblogs.com/buptleida/p/12732288.html问题背景
代码参见GitHub仓库
思路
固定头部方案
设计方案
Packet
package cn.buptleida.niohdl.core;
import java.io.Closeable;
import java.io.IOException;
/**
* 公共的数据封装
* 提供了类型以及基本的长度的定义
*/
public class Packet implements Closeable {
protected byte type;
protected int length;
public byte type(){
return type;
}
public int length(){
return length;
}
@Override
public void close() throws IOException {
}
}
package cn.buptleida.niohdl.box;
import cn.buptleida.niohdl.core.SendPacket;
public class StringSendPacket extends SendPacket {
private final byte[] bytes;
public StringSendPacket(String msg) {
this.bytes = msg.getBytes();
this.length = bytes.length;//父类中的实例域
}
@Override
public byte[] bytes() {
return bytes;
}
}
SendDispatcher
这个过程的程序框图如下所示:package cn.buptleida.niohdl.impl.async;
import cn.buptleida.niohdl.core.*;
import cn.buptleida.utils.CloseUtil;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class AsyncSendDispatcher implements SendDispatcher {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private Sender sender;
private Queue
ReceiveDispatcher
package cn.buptleida.niohdl.impl.async;
import cn.buptleida.niohdl.box.StringReceivePacket;
import cn.buptleida.niohdl.core.ReceiveDispatcher;
import cn.buptleida.niohdl.core.ReceivePacket;
import cn.buptleida.niohdl.core.Receiver;
import cn.buptleida.niohdl.core.ioArgs;
import cn.buptleida.utils.CloseUtil;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class AsyncReceiveDispatcher implements ReceiveDispatcher {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Receiver receiver;
private final ReceivePacketCallback callback;
private ioArgs args = new ioArgs();
private ReceivePacket packetTemp;
private byte[] buffer;
private int total;
private int position;
public AsyncReceiveDispatcher(Receiver receiver, ReceivePacketCallback callback) {
this.receiver = receiver;
this.receiver.setReceiveListener(ioArgsEventListener);
this.callback = callback;
}
/**
* connector中调用该方法进行
*/
@Override
public void start() {
registerReceive();
}
private void registerReceive() {
try {
receiver.receiveAsync(args);
} catch (IOException e) {
closeAndNotify();
}
}
private void closeAndNotify() {
CloseUtil.close(this);
}
@Override
public void stop() {
}
@Override
public void close() throws IOException {
if(isClosed.compareAndSet(false,true)){
ReceivePacket packet = packetTemp;
if(packet!=null){
packetTemp = null;
CloseUtil.close(packet);
}
}
}
/**
* 回调方法,从readHandler输入线程中回调
*/
private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() {
@Override
public void onStarted(ioArgs args) {
int receiveSize;
if (packetTemp == null) {
receiveSize = 4;
} else {
receiveSize = Math.min(total - position, args.capacity());
}
//设置接受数据大小
args.setLimit(receiveSize);
}
@Override
public void onCompleted(ioArgs args) {
assemblePacket(args);
//继续接受下一条数据,因为可能同一个消息可能分隔在两份IoArgs中
registerReceive();
}
};
/**
* 解析数据到packet
* @param args
*/
private void assemblePacket(ioArgs args) {
if (packetTemp == null) {
int length = args.readLength();
packetTemp = new StringReceivePacket(length);
buffer = new byte[length];
total = length;
position = 0;
}
//将args中的数据写进外面buffer中
int count = args.writeTo(buffer,0);
if(count>0){
//将数据存进StringReceivePacket的buffer当中
packetTemp.save(buffer,count);
position+=count;
if(position == total){
completePacket();
packetTemp = null;
}
}
}
private void completePacket() {
ReceivePacket packet = this.packetTemp;
CloseUtil.close(packet);
callback.onReceivePacketCompleted(packet);
}
}
总结
上一篇:树状数组BIT
下一篇:python 操作 excel