mqtt client api: 阻塞API

2021-05-30 17:27

阅读:578

标签:Once   消息分发   live   认证用户   publish   local   write   介绍   final   

fusesource版本:mqtt-client-1.11.jar
下载地址:https://github.com/fusesource/mqtt-client

fusesource提供三种mqtt client api: 阻塞API,基于Futur的API和回调API。其中,回调API是最复杂的也是性能最好的,另外两种均是对回调API的封装。 我们下面就简单介绍一下回调API的使用方法。

  1 import org.fusesource.hawtbuf.Buffer;
  2 import org.fusesource.hawtbuf.UTF8Buffer;
  3 import org.fusesource.hawtdispatch.Dispatch;
  4 import org.fusesource.hawtdispatch.DispatchQueue;
  5 import org.fusesource.mqtt.client.BlockingConnection;
  6 import org.fusesource.mqtt.client.Callback;
  7 import org.fusesource.mqtt.client.CallbackConnection;
  8 import org.fusesource.mqtt.client.FutureConnection;
  9 import org.fusesource.mqtt.client.Listener;
 10 import org.fusesource.mqtt.client.MQTT;
 11 import org.fusesource.mqtt.client.Message;
 12 import org.fusesource.mqtt.client.QoS;
 13 import org.fusesource.mqtt.client.Topic;
 14 import org.fusesource.mqtt.client.Tracer;
 15 import org.fusesource.mqtt.codec.MQTTFrame; 
 16 public class MqttClient {
 17 public static void main(String[] args)
 18 {
 19  try {
 20    MQTT mqtt=new MQTT();
 21    
 22    //MQTT设置说明
 23    mqtt.setHost("tcp://10.1.58.191:1883");
 24    mqtt.setClientId("876543210"); //用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成
 25    mqtt.setCleanSession(false); //若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true
 26    mqtt.setKeepAlive((short) 60);//定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待
 27    mqtt.setUserName("admin");//服务器认证用户名
 28    mqtt.setPassword("admin");//服务器认证密码
 29    
 30    mqtt.setWillTopic("willTopic");//设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
 31    mqtt.setWillMessage("willMessage");//设置“遗嘱”消息的内容,默认是长度为零的消息
 32    mqtt.setWillQos(QoS.AT_LEAST_ONCE);//设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE
 33    mqtt.setWillRetain(true);//若想要在发布“遗嘱”消息时拥有retain选项,则为true
 34    mqtt.setVersion("3.1.1");
 35    
 36    //失败重连接设置说明
 37    mqtt.setConnectAttemptsMax(10L);//客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
 38    mqtt.setReconnectAttemptsMax(3L);//客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
 39    mqtt.setReconnectDelay(10L);//首次重连接间隔毫秒数,默认为10ms
 40    mqtt.setReconnectDelayMax(30000L);//重连接间隔毫秒数,默认为30000ms
 41    mqtt.setReconnectBackOffMultiplier(2);//设置重连接指数回归。设置为1则停用指数回归,默认为2
 42    
 43    //Socket设置说明
 44          mqtt.setReceiveBufferSize(65536);//设置socket接收缓冲区大小,默认为65536(64k)
 45          mqtt.setSendBufferSize(65536);//设置socket发送缓冲区大小,默认为65536(64k)
 46          mqtt.setTrafficClass(8);//设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输
 47          
 48          //带宽限制设置说明
 49          mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制
 50          mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制
 51          
 52          //选择消息分发队列
 53          mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法
 54    
 55          //设置跟踪器
 56    mqtt.setTracer(new Tracer(){
 57              @Override
 58              public void onReceive(MQTTFrame frame) {
 59                  System.out.println("recv: "+frame);
 60              } 
 61              @Override
 62              public void onSend(MQTTFrame frame) {
 63                  System.out.println("send: "+frame);
 64              } 
 65              @Override
 66              public void debug(String message, Object... args) {
 67                  System.out.println(String.format("debug: "+message, args));
 68              }
 69          });
 70    
 71    
 72    
 73          //使用回调式API
 74    final CallbackConnection callbackConnection=mqtt.callbackConnection();
 75    
 76    //连接监听
 77    callbackConnection.listener(new Listener() {
 78    
 79    //接收订阅话题发布的消息
 80    @Override
 81    public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
 82     System.out.println("=============receive msg================"+new String(payload.toByteArray()));
 83      onComplete.run();
 84    }
 85    
 86    //连接失败
 87    @Override
 88    public void onFailure(Throwable value) {
 89     System.out.println("===========connect failure===========");
 90     callbackConnection.disconnect(null);
 91    }
 92    
 93       //连接断开
 94    @Override
 95    public void onDisconnected() {
 96     System.out.println("====mqtt disconnected=====");
 97     
 98    }
 99    
100    //连接成功
101    @Override
102    public void onConnected() {
103     System.out.println("====mqtt connected=====");
104     
105    }
106   });
107    
108    
109    
110    //连接
111    callbackConnection.connect(new Callback() {
112     
113      //连接失败
114        public void onFailure(Throwable value) {
115            System.out.println("============连接失败:"+value.getLocalizedMessage()+"============");
116        } 
117        // 连接成功
118        public void onSuccess(Void v) { 
119            //订阅主题
120            Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
121            callbackConnection.subscribe(topics, new Callbackbyte[]>() {
122                //订阅主题成功
123             public void onSuccess(byte[] qoses) {
124                    System.out.println("========订阅成功=======");
125                }
126             //订阅主题失败
127                public void onFailure(Throwable value) {
128                 System.out.println("========订阅失败=======");
129                 callbackConnection.disconnect(null);
130                }
131            });
132            
133            
134             //发布消息
135            callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() {
136                public void onSuccess(Void v) {
137                  System.out.println("===========消息发布成功============");
138                }
139                public void onFailure(Throwable value) {
140                 System.out.println("========消息发布失败=======");
141                 callbackConnection.disconnect(null);
142                }
143            }); 
144   
145        }
146    });
147    
148    
149    
150    while(true)
151    {
152     
153    }
154  
155   
156  } catch (Exception e) {
157   e.printStackTrace();
158  } 
159   
160 }
161 }

 

mqtt client api: 阻塞API

标签:Once   消息分发   live   认证用户   publish   local   write   介绍   final   

原文地址:https://www.cnblogs.com/endv/p/11037424.html


评论


亲,登录后才可以留言!