带你手写基于 Spring 的可插拔式 RPC 框架(三)通信协议模块
2020-12-13 05:06
Netty 模型的客户端和服务端。
基于 Netty 的服务端,里面的编码器和解码器是我们自己实现的,大家可以先用我注释掉的那部分,等我们写到编码解码器的时候再替换。
public class NettyServer {
     private static NettyServer INSTANCE = new NettyServer();
     private static Executor executor = Executors.newCachedThreadPool();
     private final static int MESSAGE_LENGTH = 4;
     private NettyServer(){};
     public static NettyServer getInstance(){
         return INSTANCE;
     }
     private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
     public static void submit(Runnable t){
         executor.execute(t);
     }
     public void start(String host, Integer port){
         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
         EventLoopGroup workerGroup = new NioEventLoopGroup();
         try{
             final ServerBootstrap bootstrap = new ServerBootstrap();
             bootstrap.group(bossGroup,workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 128)
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .childHandler(new ChannelInitializer(){
                 @Override
                 protected void initChannel(SocketChannel arg0) throws Exception {
                     ChannelPipeline pipeline = arg0.pipeline();
                      //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
                     //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); 
 //                  pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, NettyServer.MESSAGE_LENGTH, 0, NettyServer.MESSAGE_LENGTH));
                     //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
 //                  pipeline.addLast(new LengthFieldPrepender(NettyServer.MESSAGE_LENGTH));
 //                  pipeline.addLast(new ObjectEncoder());
                     //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
 //                  pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                     //注册解码器NettyDecoderHandler
                     pipeline.addLast(new NettyDecoderHandler(RpcRequest.class, serializeType));
                     //注册编码器NettyEncoderHandler
                     pipeline.addLast(new NettyEncoderHandler(serializeType));
                     pipeline.addLast("handler", new NettyServerHandler());
                 }
             });
             Channel channel = bootstrap.bind(host, port).sync().channel();
             System.out.println("Server start listen at " + port);
         }catch(Exception e){
             bossGroup.shutdownGracefully();
             workerGroup.shutdownGracefully();
         }
     }
 } 
服务端对应的 handler,netty 都是这种 handler 模式,handler 里面也是将这个接收的 request 放入线程池中处理。
    public class NettyServerHandler extends SimpleChannelInboundHandler {
     private ChannelHandlerContext context;
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
         System.out.println("server channelRead...");
         System.out.println(ctx.channel().remoteAddress() + "->server:" + rpcRequest.toString());
         InvokeTask it = new InvokeTask(rpcRequest,ctx);
         NettyServer.submit(it);
     }
     @Override
     public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
         this.context = ctx;     
     }
 } 
给出 InvokeTask 的对应实现。
public class InvokeTask implements Runnable{
     private RpcRequest invocation;
     private ChannelHandlerContext ctx;
     public InvokeTask(RpcRequest invocation,ChannelHandlerContext ctx) {
         super();
         this.invocation = invocation;
         this.ctx = ctx;
     }
     @Override
     public void run() {
         // 从注册中心根据接口,找接口的实现类
         String interFaceName = invocation.getInterfaceName();
         Class impClass = null;
         try {
             impClass = Class.forName(invocation.getImpl());
         } catch (ClassNotFoundException e) {
             e.printStackTrace();
         }
         Method method;
         Object result = null;
         try {
             method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
             //这块考虑实现类,是不是应该在 spring 里面拿
             result = method.invoke(impClass.newInstance(),invocation.getParams());
         } catch (Exception e) {
             e.printStackTrace();
         }
         RpcResponse rpcResponse = new RpcResponse();
         rpcResponse.setResponseId(invocation.getRequestId());
         rpcResponse.setData(result);
         ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture channelFuture) throws Exception {
                 System.out.println("RPC Server Send message-id respone:" + invocation.getRequestId());
             }
         });
     }
 }
再来看客户端,客户端有两种实现,一种是不能复用 handler(可以立即为 connection)的模式,这种模式并发不太高,另一种是能够复用 handler 的 handlerPool 模式。
不能复用的模式。
public class NettyClient {
     private static NettyClient INSTANCE = new NettyClient();
     private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
     private NettyClient(){};
     public static NettyClient getInstance(){
         return INSTANCE;
     }
     private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
     public void start(String host,Integer port){
         Bootstrap bootstrap = new Bootstrap();
         EventLoopGroup group = new NioEventLoopGroup(parallel);
         try{
             bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer(){
                         @Override
                         protected void initChannel(SocketChannel arg0) throws Exception {
                             ChannelPipeline pipeline = arg0.pipeline();
                             //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
                             //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
 //                          pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                             //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
 //                          pipeline.addLast(new LengthFieldPrepender(4));
 //                          pipeline.addLast(new ObjectEncoder());
                             //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
 //                          pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                             //注册Netty编码器
                             System.out.println("11111111:"+serializeType.getSerializeType());
                             pipeline.addLast(new NettyEncoderHandler(serializeType));
                             //注册Netty解码器
                             pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
                             pipeline.addLast("handler", new NettyClientHandler());
                         }
                     });
             ChannelFuture future = bootstrap.connect(host,port).sync();
         }catch(Exception e){
             group.shutdownGracefully();
         }
     }
 } 
在看可复用的模式,固定 handler 数量,目前框架中使用的是可复用模式,上面的不可复用的没用上,为了给大家理解,没有删除。
public class NettyChannelPoolFactory {
     //初始化Netty Channel阻塞队列的长度,该值为可配置信息
     private static final int channelConnectSize = 10;
     //Key为服务提供者地址,value为Netty Channel阻塞队列
     private static final Map> channelPoolMap = new ConcurrentHashMap();
     private static NettyChannelPoolFactory INSTANCE = new NettyChannelPoolFactory();
     private NettyChannelPoolFactory(){};
     public static NettyChannelPoolFactory getInstance(){
         return INSTANCE;
     }
     private List serviceMetaDataList = new ArrayList();
     //根据配置文件里面需要调用的接口信息来初始化 channel
     public void initNettyChannelPoolFactory(Map> providerMap){
         //将服务提供者信息存入serviceMetaDataList列表
         Collection> collectionServiceMetaDataList = providerMap.values();
         for (List serviceMetaDataModels : collectionServiceMetaDataList) {
             if (CollectionUtils.isEmpty(serviceMetaDataModels)) {
                 continue;
             }
             serviceMetaDataList.addAll(serviceMetaDataModels);
         }
         //获取服务提供者地址列表
         Set set = new HashSet();
         for (ServiceProvider serviceMetaData : serviceMetaDataList) {
             String serviceIp = serviceMetaData.getIp();
             int servicePort = serviceMetaData.getPort();
             URL url = new URL(serviceIp,servicePort);
             set.add(url);
         }
         for(URL url:set){
             //为每个 ip端口 建立多个 channel,并且放入阻塞队列中
             int channelSize = 0;
             while(channelSize  queue = channelPoolMap.get(url);
                 if(queue == null){
                     queue = new ArrayBlockingQueue(channelConnectSize);
                     channelPoolMap.put(url, queue);
                 }
                 queue.offer(channel);
             }
         }
     }
     public Channel registerChannel(URL url) {
         final SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
         Bootstrap bootstrap = new Bootstrap();
         EventLoopGroup group = new NioEventLoopGroup(10);
         try{
             bootstrap.group(group)
                 .channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer(){
                 @Override
                 protected void initChannel(SocketChannel arg0) throws Exception {
                     ChannelPipeline pipeline = arg0.pipeline();
                     //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
                     //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
 //                  pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                     //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
 //                  pipeline.addLast(new LengthFieldPrepender(4));
 //                  pipeline.addLast(new ObjectEncoder());
                     //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
 //                  pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                     pipeline.addLast(new NettyEncoderHandler(serializeType));
                     //注册Netty解码器
                     pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
                     pipeline.addLast("handler", new NettyClientHandler());
                 }
             });
             ChannelFuture future = bootstrap.connect(url.getHost(),url.getPort()).sync();
             Channel channel = future.channel();
             //等待Netty服务端链路建立通知信号
             final CountDownLatch connectedLatch = new CountDownLatch(1);
             final List isSuccess = new ArrayList(1);
             future.addListener(new ChannelFutureListener(){
                 @Override
                 public void operationComplete(ChannelFuture future)
                         throws Exception {
                     if(future.isSuccess()){
                         isSuccess.add(true);
                     }else{
                         isSuccess.add(false);
                     }
                     connectedLatch.countDown();
                 }
             });
             connectedLatch.await();
             if(isSuccess.get(0)){
                 return channel;
             }
         }catch(Exception e){
             group.shutdownGracefully();
             e.printStackTrace();
         }
         return null;
     }
     //根据 url 获取阻塞队列
     public ArrayBlockingQueue acqiure(URL url){
         System.out.println(channelPoolMap.toString());
         return channelPoolMap.get(url);
     }
     //channel 使用完毕后进行回收
     public void release(ArrayBlockingQueue queue, Channel channel, URL url){
         if(queue == null){
             return;
         }
         //需要检查 channel 是否可用,如果不可用,重新注册一个放入阻塞队列中
         if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
             if (channel != null) {
                 channel.deregister().syncUninterruptibly().awaitUninterruptibly();
                 channel.closeFuture().syncUninterruptibly().awaitUninterruptibly();
             }
             Channel c = null;
             while(c == null){
                 c = registerChannel(url);
             }
             queue.offer(c);
             return;
         }
         queue.offer(channel);
     }
 }       
   
给出对应的 handler 实现,在 channelread0 里面读取 server 端返回的信息,因为 netty 是异步的,所以需要 MessageCallBack 来实现我们的同步调用。
public class NettyClientHandler extends SimpleChannelInboundHandler {
     private ChannelHandlerContext context;
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
             throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         System.out.println("停止时间是:"+new Date());
         System.out.println("HeartBeatClientHandler channelInactive");
     }
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         this.context = ctx;
         System.out.println("激活时间是:"+ctx.channel().id());
     }
     @Override
     protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
         // String res = (String)msg;
         //RpcResponse rpcResponse = (RpcResponse)msg;
         String responseId = rpcResponse.getResponseId();
         MessageCallBack callBack = ResponseHolder.getInstance().mapCallBack.get(responseId);
         if(callBack != null){
             ResponseHolder.getInstance().mapCallBack.remove(responseId);
             callBack.over(rpcResponse);
         }
     }
 } 
MessageCallBack 的实现。
public class MessageCallBack {
     private RpcRequest rpcRequest;
     private RpcResponse rpcResponse;
     private Lock lock = new ReentrantLock();
     private Condition finish = lock.newCondition();
     public MessageCallBack(RpcRequest request) {
         this.rpcRequest = request;
     }
     public Object start() throws InterruptedException {
         try {
             lock.lock();
             //设定一下超时时间,rpc服务器太久没有相应的话,就默认返回空吧。
             finish.await(10*1000, TimeUnit.MILLISECONDS);
             if (this.rpcResponse != null) {
                 return this.rpcResponse.getData();
             } else {
                 return null;
             }
         } finally {
             lock.unlock();
         }
     }
     public void over(RpcResponse reponse) {
         try {
             lock.lock();
             this.rpcResponse = reponse;
             finish.signal();
         } finally {
             lock.unlock();
         }
     }
 }
既然是可插拔式框架,那么底层协议一定要是可选择的,所以我们定义一个顶层接口来支持我们选择协议。
start 方法是启动服务端,send 方法是客户端发送数据。
public interface Procotol {
     void start(URL url);
     Object send(URL url, RpcRequest invocation);
 }
对应的三个协议的接口实现。
Netty 的实现
public class DubboProcotol implements Procotol {
     @Override
     public void start(URL url) {
         NettyServer nettyServer = NettyServer.getInstance();
         nettyServer.start(url.getHost(),url.getPort());
     }
     @Override
     public Object send(URL url, RpcRequest invocation) {
         ArrayBlockingQueue queue = NettyChannelPoolFactory.getInstance().acqiure(url);
         Channel channel = null;
         try {
             channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
             if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
                 channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
                 if(channel == null){
                     channel = NettyChannelPoolFactory.getInstance().registerChannel(url);
                 }
             }
             //将本次调用的信息写入Netty通道,发起异步调用
             ChannelFuture channelFuture = channel.writeAndFlush(invocation);
             channelFuture.syncUninterruptibly();
             MessageCallBack callback = new MessageCallBack(invocation);
             ResponseHolder.getInstance().mapCallBack.put(invocation.getRequestId(), callback);
             try {
                 return callback.start();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return null;
         } catch (InterruptedException e1) {
             e1.printStackTrace();
         }finally{
             System.out.println("release:"+channel.id());
             NettyChannelPoolFactory.getInstance().release(queue, channel, url);
         }
         return null;
     }
 } 
http 的实现
public class HttpProcotol implements Procotol {
     @Override
     public void start(URL url) {
         HttpServer httpServer = HttpServer.getInstance();
         httpServer.start(url.getHost(),url.getPort());
     }
     @Override
     public Object send(URL url, RpcRequest invocation) {
         HttpClient httpClient = HttpClient.getInstance();
         return httpClient.post(url.getHost(),url.getPort(),invocation);
     }
 }
Socket 的实现
public class SocketProcotol implements Procotol {
     @Override
     public void start(URL url) {
         SocketServer socketServer = SocketServer.getInstance();
         socketServer.publiser(url.getPort());
     }
     @Override
     public Object send(URL url, RpcRequest invocation) {
         SocketClient socketClient = SocketClient.getInstance();
         return socketClient.sendRequest(url.getHost(),url.getPort(),invocation);
     }
 }
这样一个可选择协议的模型就实现了,我们可已通过这个模块选择协议,并且与服务端通信。
文章标题:带你手写基于 Spring 的可插拔式 RPC 框架(三)通信协议模块
文章链接:http://soscw.com/essay/30425.html