mqtt client api: 阻塞API
2021-05-30 17:27
标签:Once 消息分发 live 认证用户 publish local write 介绍 final fusesource版本:mqtt-client-1.11.jar mqtt client api: 阻塞API 标签:Once 消息分发 live 认证用户 publish local write 介绍 final 原文地址:https://www.cnblogs.com/endv/p/11037424.html
下载地址:https://github.com/fusesource/mqtt-client
fusesource提供三种mqtt client api: 阻塞API,基于Futur的API和回调API。其中,回调API是最复杂的也是性能最好的,另外两种均是对回调API的封装。 我们下面就简单介绍一下回调API的使用方法。 1 import org.fusesource.hawtbuf.Buffer;
2 import org.fusesource.hawtbuf.UTF8Buffer;
3 import org.fusesource.hawtdispatch.Dispatch;
4 import org.fusesource.hawtdispatch.DispatchQueue;
5 import org.fusesource.mqtt.client.BlockingConnection;
6 import org.fusesource.mqtt.client.Callback;
7 import org.fusesource.mqtt.client.CallbackConnection;
8 import org.fusesource.mqtt.client.FutureConnection;
9 import org.fusesource.mqtt.client.Listener;
10 import org.fusesource.mqtt.client.MQTT;
11 import org.fusesource.mqtt.client.Message;
12 import org.fusesource.mqtt.client.QoS;
13 import org.fusesource.mqtt.client.Topic;
14 import org.fusesource.mqtt.client.Tracer;
15 import org.fusesource.mqtt.codec.MQTTFrame;
16 public class MqttClient {
17 public static void main(String[] args)
18 {
19 try {
20 MQTT mqtt=new MQTT();
21
22 //MQTT设置说明
23 mqtt.setHost("tcp://10.1.58.191:1883");
24 mqtt.setClientId("876543210"); //用于设置客户端会话的ID。在setCleanSession(false);被调用时,MQTT服务器利用该ID获得相应的会话。此ID应少于23个字符,默认根据本机地址、端口和时间自动生成
25 mqtt.setCleanSession(false); //若设为false,MQTT服务器将持久化客户端会话的主体订阅和ACK位置,默认为true
26 mqtt.setKeepAlive((short) 60);//定义客户端传来消息的最大时间间隔秒数,服务器可以据此判断与客户端的连接是否已经断开,从而避免TCP/IP超时的长时间等待
27 mqtt.setUserName("admin");//服务器认证用户名
28 mqtt.setPassword("admin");//服务器认证密码
29
30 mqtt.setWillTopic("willTopic");//设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
31 mqtt.setWillMessage("willMessage");//设置“遗嘱”消息的内容,默认是长度为零的消息
32 mqtt.setWillQos(QoS.AT_LEAST_ONCE);//设置“遗嘱”消息的QoS,默认为QoS.ATMOSTONCE
33 mqtt.setWillRetain(true);//若想要在发布“遗嘱”消息时拥有retain选项,则为true
34 mqtt.setVersion("3.1.1");
35
36 //失败重连接设置说明
37 mqtt.setConnectAttemptsMax(10L);//客户端首次连接到服务器时,连接的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
38 mqtt.setReconnectAttemptsMax(3L);//客户端已经连接到服务器,但因某种原因连接断开时的最大重试次数,超出该次数客户端将返回错误。-1意为无重试上限,默认为-1
39 mqtt.setReconnectDelay(10L);//首次重连接间隔毫秒数,默认为10ms
40 mqtt.setReconnectDelayMax(30000L);//重连接间隔毫秒数,默认为30000ms
41 mqtt.setReconnectBackOffMultiplier(2);//设置重连接指数回归。设置为1则停用指数回归,默认为2
42
43 //Socket设置说明
44 mqtt.setReceiveBufferSize(65536);//设置socket接收缓冲区大小,默认为65536(64k)
45 mqtt.setSendBufferSize(65536);//设置socket发送缓冲区大小,默认为65536(64k)
46 mqtt.setTrafficClass(8);//设置发送数据包头的流量类型或服务类型字段,默认为8,意为吞吐量最大化传输
47
48 //带宽限制设置说明
49 mqtt.setMaxReadRate(0);//设置连接的最大接收速率,单位为bytes/s。默认为0,即无限制
50 mqtt.setMaxWriteRate(0);//设置连接的最大发送速率,单位为bytes/s。默认为0,即无限制
51
52 //选择消息分发队列
53 mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若没有调用方法setDispatchQueue,客户端将为连接新建一个队列。如果想实现多个连接使用公用的队列,显式地指定队列是一个非常方便的实现方法
54
55 //设置跟踪器
56 mqtt.setTracer(new Tracer(){
57 @Override
58 public void onReceive(MQTTFrame frame) {
59 System.out.println("recv: "+frame);
60 }
61 @Override
62 public void onSend(MQTTFrame frame) {
63 System.out.println("send: "+frame);
64 }
65 @Override
66 public void debug(String message, Object... args) {
67 System.out.println(String.format("debug: "+message, args));
68 }
69 });
70
71
72
73 //使用回调式API
74 final CallbackConnection callbackConnection=mqtt.callbackConnection();
75
76 //连接监听
77 callbackConnection.listener(new Listener() {
78
79 //接收订阅话题发布的消息
80 @Override
81 public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
82 System.out.println("=============receive msg================"+new String(payload.toByteArray()));
83 onComplete.run();
84 }
85
86 //连接失败
87 @Override
88 public void onFailure(Throwable value) {
89 System.out.println("===========connect failure===========");
90 callbackConnection.disconnect(null);
91 }
92
93 //连接断开
94 @Override
95 public void onDisconnected() {
96 System.out.println("====mqtt disconnected=====");
97
98 }
99
100 //连接成功
101 @Override
102 public void onConnected() {
103 System.out.println("====mqtt connected=====");
104
105 }
106 });
107
108
109
110 //连接
111 callbackConnection.connect(new Callback() {
112
113 //连接失败
114 public void onFailure(Throwable value) {
115 System.out.println("============连接失败:"+value.getLocalizedMessage()+"============");
116 }
117 // 连接成功
118 public void onSuccess(Void v) {
119 //订阅主题
120 Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
121 callbackConnection.subscribe(topics, new Callbackbyte[]>() {
122 //订阅主题成功
123 public void onSuccess(byte[] qoses) {
124 System.out.println("========订阅成功=======");
125 }
126 //订阅主题失败
127 public void onFailure(Throwable value) {
128 System.out.println("========订阅失败=======");
129 callbackConnection.disconnect(null);
130 }
131 });
132
133
134 //发布消息
135 callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() {
136 public void onSuccess(Void v) {
137 System.out.println("===========消息发布成功============");
138 }
139 public void onFailure(Throwable value) {
140 System.out.println("========消息发布失败=======");
141 callbackConnection.disconnect(null);
142 }
143 });
144
145 }
146 });
147
148
149
150 while(true)
151 {
152
153 }
154
155
156 } catch (Exception e) {
157 e.printStackTrace();
158 }
159
160 }
161 }
上一篇:实验十二:SWING界面设计