Netty实现服务端客户端长连接通讯及心跳检测

2021-06-07 13:02

阅读:519

      通过netty实现服务端与客户端的长连接通讯,及心跳检测。

       基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

        环境JDK1.8 和netty5

        以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

     设计消息类型:

public enum MsgType {
  PING,ASK,REPLY,LOGIN
}

 Message基类:

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg implements Serializable {
  private static final long serialVersionUID = 1L;
  rivate MsgType type;
  // 必须唯一,否者会出现channel调用混乱
  private String clientId;

  // 初始化客户端id
  public BaseMsg() {
    this.clientId = Constants.getClientId();
  }

  public String getClientId() {
    return clientId;
  }

  public void setClientId(String clientId) {
    this.clientId = clientId;
  }

  public MsgType getType() {
    return type;
  }

  public void setType(MsgType type) {
    this.type = type;
  }
}

常量设置:

public class Constants {
  private static String clientId;

  public static String getClientId() {
    return clientId;
  }

  public static void setClientId(String clientId) {
    Constants.clientId = clientId;
  }
}

登录类型消息:

public class LoginMsg extends BaseMsg {
  private String userName;
  private String password;
  public LoginMsg() {
    super();
    setType(MsgType.LOGIN);
  }

  public String getUserName() {
    return userName;
  }

  public void setUserName(String userName) {
    this.userName = userName;
  }

  public String getPassword() {
    return password;
  }

  public void setPassword(String password) {
    this.password = password;
  }
}

心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {
  public PingMsg() {
    super();
    setType(MsgType.PING);
  }
}

请求类型消息:

public class AskMsg extends BaseMsg {
  public AskMsg() {
    super();
    setType(MsgType.ASK);
}

private AskParams params;

public AskParams getParams() {
  return params;
}

public void setParams(AskParams params) {
  this.params = params;
 }
}

// 请求类型参数
// 必须实现序列化接口
public class AskParams implements Serializable {
  private static final long serialVersionUID = 1L;
  private String auth;

  public String getAuth() {
    return auth;
  }

  public void setAuth(String auth) {
    this.auth = auth;
  }
}

响应类型消息:

public class ReplyMsg extends BaseMsg {
  public ReplyMsg() {
    super();
    setType(MsgType.REPLY);
  }
  private ReplyBody body;

  public ReplyBody getBody() {
    return body;
  }

  public void setBody(ReplyBody body) {
    this.body = body;
  }
}
//相应类型body对像
public class ReplyBody implements Serializable {
  private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
  private String clientInfo;

  public ReplyClientBody(String clientInfo) {
    this.clientInfo = clientInfo;
  }

  public String getClientInfo() {
    return clientInfo;
  }

  public void setClientInfo(String clientInfo) {
    this.clientInfo = clientInfo;
  }
}
public class ReplyServerBody extends ReplyBody {
  private String serverInfo;
  public ReplyServerBody(String serverInfo) {
    this.serverInfo = serverInfo;
  }
  public String getServerInfo() {
    return serverInfo;
  }
  public void setServerInfo(String serverInfo) {
    this.serverInfo = serverInfo;
  }
}

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {
  private static Map map=new ConcurrentHashMap();
  public static void add(String clientId,SocketChannel socketChannel){
    map.put(clientId,socketChannel);
  }
  public static Channel get(String clientId){
    return map.get(clientId);
  }
  public static void remove(SocketChannel socketChannel){
    for (Map.Entry entry:map.entrySet()){
      if (entry.getValue()==socketChannel){
        map.remove(entry.getKey());
       }
    }
  }

}

Handler:

public class NettyServerHandler extends SimpleChannelInboundHandler {
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    //channel失效,从Map中移除
    NettyChannelMap.remove((SocketChannel)ctx.channel());
  }
  @Override
  protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

    if(MsgType.LOGIN.equals(baseMsg.getType())){
      LoginMsg loginMsg=(LoginMsg)baseMsg;
      if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
          //登录成功,把channel存到服务端的map中
          NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
          System.out.println("client"+loginMsg.getClientId()+" 登录成功");
        }
      }else{
          if(NettyChannelMap.get(baseMsg.getClientId())==null){
          //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
          LoginMsg loginMsg=new LoginMsg();
          channelHandlerContext.channel().writeAndFlush(loginMsg);
        }
      }
    switch (baseMsg.getType()){
      case PING:{
          PingMsg pingMsg=(PingMsg)baseMsg;
          PingMsg replyPing=new PingMsg();
          NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
      }break;
      case ASK:{
          //收到客户端的请求
          AskMsg askMsg=(AskMsg)baseMsg;
          if("authToken".equals(askMsg.getParams().getAuth())){
            ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
            ReplyMsg replyMsg=new ReplyMsg();
            replyMsg.setBody(replyBody);
            NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
           }
      }break;
      case REPLY:{
          //收到客户端回复
          ReplyMsg replyMsg=(ReplyMsg)baseMsg;
          ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
          System.out.println("receive client msg: "+clientBody.getClientInfo());
      }break;
        default:break;
      }
      ReferenceCountUtil.release(baseMsg);
    }
}

ServerBootstrap:

public class NettyServerBootstrap {
  private int port;
  private SocketChannel socketChannel;
  public NettyServerBootstrap(int port) throws InterruptedException {
    this.port = port;
    bind();
  }

  private void bind() throws InterruptedException {
    EventLoopGroup boss=new NioEventLoopGroup();
    EventLoopGroup worker=new NioEventLoopGroup();
    ServerBootstrap bootstrap=new ServerBootstrap();
    bootstrap.group(boss,worker);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.option(ChannelOption.SO_BACKLOG, 128);
    //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    //保持长连接状态
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    bootstrap.childHandler(new ChannelInitializer() {
      @Override
      protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(new ObjectEncoder());
        p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        p.addLast(new NettyServerHandler());
      }
     });
    ChannelFuture f= bootstrap.bind(port).sync();
    if(f.isSuccess()){
      System.out.println("server start---------------");
      }
    }
  public static void main(String []args) throws InterruptedException {
    NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
    while (true){
      SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
      if(channel!=null){
        AskMsg askMsg=new AskMsg();
        channel.writeAndFlush(askMsg);
    }
    TimeUnit.SECONDS.sleep(5);
    }
  }
}

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler:

public class NettyClientHandler extends SimpleChannelInboundHandler {
  //利用写空闲发送心跳检测消息
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
      IdleStateEvent e = (IdleStateEvent) evt;
      switch (e.state()) {
      case WRITER_IDLE:
        PingMsg pingMsg=new PingMsg();
        ctx.writeAndFlush(pingMsg);
        System.out.println("send ping to server----------");
        break;
        default:
        break;
      }
   }
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
   MsgType msgType=baseMsg.getType();
   switch (msgType){
     case LOGIN:{
       //向服务器发起登录
       LoginMsg loginMsg=new LoginMsg();
       loginMsg.setPassword("yao");
       loginMsg.setUserName("robin");
       channelHandlerContext.writeAndFlush(loginMsg);
     }break;
     case PING:{
        System.out.println("receive ping from server----------");
     }break;
     case ASK:{
        ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
        ReplyMsg replyMsg=new ReplyMsg();
        replyMsg.setBody(replyClientBody);
        channelHandlerContext.writeAndFlush(replyMsg);
      }break;
     case REPLY:{
        ReplyMsg replyMsg=(ReplyMsg)baseMsg;
        ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
        System.out.println("receive client msg: "+replyServerBody.getServerInfo());
      }
      default:break;
    }
    ReferenceCountUtil.release(msgType);
  }
}

bootstrap

public class NettyClientBootstrap {
  private int port;
  private String host;
  private SocketChannel socketChannel;
  private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
  public NettyClientBootstrap(int port, String host) throws InterruptedException {
    this.port = port;
    this.host = host;
    start();
  }
  private void start() throws InterruptedException {
    EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
    Bootstrap bootstrap=new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
    bootstrap.group(eventLoopGroup);
    bootstrap.remoteAddress(host,port);
    bootstrap.handler(new ChannelInitializer() {
    @Override
  protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
    socketChannel.pipeline().addLast(new ObjectEncoder());
    socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
    socketChannel.pipeline().addLast(new NettyClientHandler());
  }
});
  ChannelFuture future =bootstrap.connect(host,port).sync();
    if (future.isSuccess()) {
    socketChannel = (SocketChannel)future.channel();
    System.out.println("connect server 成功---------");
   }
  }
  public static void main(String[]args) throws InterruptedException {
    Constants.setClientId("001");
    NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");

    LoginMsg loginMsg=new LoginMsg();
    loginMsg.setPassword("yao");
    loginMsg.setUserName("robin");
    bootstrap.socketChannel.writeAndFlush(loginMsg);
    while (true){
      TimeUnit.SECONDS.sleep(3);
      AskMsg askMsg=new AskMsg();
      AskParams askParams=new AskParams();
      askParams.setAuth("authToken");
      askMsg.setParams(askParams);
      bootstrap.socketChannel.writeAndFlush(askMsg);
    }
  }
}


评论


亲,登录后才可以留言!