netty实战(二)
2021-01-22 20:14
标签:管理 rac value toc 自己 程序 detail 消息 解决方法 和上一篇相比,此项目的场景有所不同:需要采集机房的电表、温湿度、水浸和烟感的数据,首先通过通讯管理机先将数据直采,然后通过服务器采集程序采集通讯管理机上存储的数据并解析入库。 先说说这2个有什么不同,项目1中户外设备主动连接服务器,并定时发送报文,而此项目需要服务器去主动连接设备发送报文采集数据,仅仅需要开发netty的client端。 项目框架:springboot+netty+mybatis+lombok+logback 开发环境:idea2018+jdk1.8+mysql5.6.35+maven3.5.3 项目搭建: 1.快速搭建springboot项目,配置pom.xml文件依赖包 2.自定义netty的client端BootNettyClient类 3.自定义初始化类BootNioChannelInitializer类 4.自定义业务处理类BootNettyClientHandler类 5.application类实现CommandLineRunner,来启动nettyClient服务 6.定时任务发送报文到下位机,请求下位机采集存储的数据 注意点:1.断线重连 2.多个通讯管理机定时任务发送报文时,存在异常。解决方法:buf.retain()的作用 netty实战(二) 标签:管理 rac value toc 自己 程序 detail 消息 解决方法 原文地址:https://www.cnblogs.com/lyzj/p/13283107.html"1.0" encoding="UTF-8"?>
package com.rtst.dhjclistener.nettyclient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class BootNettyClient {
@Autowired
BootNioChannelInitializer bootNioChannelInitializer;
@Value("${netty.port}")
private Integer port;
@Value("#{‘${netty.host}‘.split(‘,‘)}")
private List
package com.rtst.dhjclistener.nettyclient;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class BootNioChannelInitializer
package com.rtst.dhjclistener.nettyclient;
import com.rtst.dhjclistener.entity.Signal;
import com.rtst.dhjclistener.repository.DsignalMapper;
import com.rtst.dhjclistener.repository.SignalMapper;
import com.rtst.dhjclistener.service.SendEmailContentService;
import com.rtst.dhjclistener.util.StringUitls;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;
@Component
@Slf4j
@ChannelHandler.Sharable
public class BootNettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
DsignalMapper dsignalMapper;
@Autowired
SignalMapper signalMapper;
@Autowired
BootNettyClient bootNettyClient;
@Autowired
SendEmailContentService sendEmailContentService;//发送告警邮件service类
@Value("${netty.schoolId}")
private int schoolId;
@Value("#{‘${netty.host}‘.split(‘,‘)}")
private List
//业务处理逻辑,解析报文并入库,如果有告警,发送邮件提示用户机房存在告警
......
}
/**
* 连接断开时进入该方法
* @param ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开连接---channelInactive");
log.info("客户端断开连接---channelInactive");
super.channelInactive(ctx);
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址
if(ctxMap.get(clientIp)!=null){//如果不为空就删除
ctxMap.remove(clientIp, ctx.channel());
}
ctx.close();
bootNettyClient.doConnect(clientIp,port);//断线重连
}
/**
* 出现异常时进入该方法
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址
if(ctxMap.get(clientIp)!=null){//如果不为空就删除
ctxMap.remove(clientIp, ctx.channel());
}
ctx.close();
}
//处理超时读写空闲事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("触发读写空闲操作-----");
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
log.info(clientIp+"触发"+idleStateEvent.state()+"事件");
//获取IdleStateEvent事件,根据状态是否为读状态空闲
if (idleStateEvent.state() == IdleState.READER_IDLE){
log.info("已经 好长时间没有收到信息!");
System.out.println("尝试再次发送命令");
//向下位机发送消息
ByteBuf buf =Unpooled.buffer();
String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";//可以设置成读取更大地址的数据,比如读0-500的地址位:00 00 00 00 00 06 c8 03 00 00 01 F4 05 c2
byte[] msg = StringUitls.hexStrToBinaryStr(order);
buf.writeBytes(msg);
ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
triggeredNum++;
if(triggeredNum>=3){
ctx.close();
triggeredNum=0;
}
}
}
super.userEventTriggered(ctx, evt);
}
}package com.rtst.dhjclistener;
import com.rtst.dhjclistener.nettyclient.BootNettyClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@MapperScan("com.rtst.dhjclistener.repository")
@EnableScheduling
public class DhjclistenerApplication implements CommandLineRunner {
@Autowired
BootNettyClient bootNettyClient;
public static void main(String[] args) {
SpringApplication.run(DhjclistenerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
/**
* 启动netty服务端服务
*/
bootNettyClient.start();
}
}
package com.rtst.dhjclistener.ordertask;
import com.rtst.dhjclistener.nettyclient.BootNettyClientHandler;
import com.rtst.dhjclistener.util.StringUitls;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.List;
@Component
@Configuration
public class MyTask {
@Value("#{‘${netty.host}‘.split(‘,‘)}")
private List