物联网架构成长之路(32)-SpringBoot集成MQTT客户端
2020-12-13 05:52
标签:代码 start == char on() qos pop options 如何 一、前言 二、配置pom.xml,引入第三方库 三、MQTT客户端代码(Java) MqttDemoApplication.java MqttPushCallback.java MqttPushClient.java 四、MQTT客户端代码(C#) 部分C#代码(连接服务器与发送数据) 本文地址: https://www.cnblogs.com/wunaozai/p/11147841.html 物联网架构成长之路(32)-SpringBoot集成MQTT客户端 标签:代码 start == char on() qos pop options 如何 原文地址:https://www.cnblogs.com/wunaozai/p/11147841.html
这里虽然是说MQTT客户端。其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情。比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT。那么就可以通过HTTP请求业务服务器。然后由业务服务器利用这个MQTT客户端进行发送数据。
还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊。这里给出的结论是不需要。保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置Web消息转发【EMQ 3.x 版本】,还有就是这种通过业务服务器订阅根Topic来保存物联网原始数据。
这篇博客这讨论如何把MQTT客户端集成到业务服务器上(基于SpringBoot 2.0)。下一篇博客会讲到数据保存到InfluxDB,然后如何通过Grafana进行可视化Dashboard看板模式展示。 1
2 dependency>
3 groupId>org.springframework.bootgroupId>
4 artifactId>spring-boot-starter-integrationartifactId>
5 dependency>
6 dependency>
7 groupId>org.springframework.integrationgroupId>
8 artifactId>spring-integration-streamartifactId>
9 dependency>
10 dependency>
11 groupId>org.springframework.integrationgroupId>
12 artifactId>spring-integration-mqttartifactId>
13 dependency>
1 package com.wunaozai.mqtt;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 import com.wunaozai.mqtt.tools.MqttPushClient;
7
8 @SpringBootApplication
9 public class MqttDemoApplication {
10
11 public static void main(String[] args) {
12 SpringApplication.run(MqttDemoApplication.class, args);
13
14 test();
15 }
16
17
18 private static void test(){
19 MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";
20 MqttPushClient.MQTT_CLIENTID = "client";
21 MqttPushClient.MQTT_USERNAME = "username";
22 MqttPushClient.MQTT_PASSWORD = "password";
23 MqttPushClient client = MqttPushClient.getInstance();
24 client.subscribe("/#");
25 }
26 }
1 package com.wunaozai.mqtt.tools;
2
3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
4 import org.eclipse.paho.client.mqttv3.MqttCallback;
5 import org.eclipse.paho.client.mqttv3.MqttMessage;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 /**
10 * MQTT 推送回调
11 * @author wunaozai
12 * @date 2018-08-22
13 */
14 public class MqttPushCallback implements MqttCallback {
15
16 private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);
17
18 @Override
19 public void connectionLost(Throwable cause) {
20 log.info("断开连接,建议重连" + this);
21 //断开连接,建议重连
22 }
23
24 @Override
25 public void deliveryComplete(IMqttDeliveryToken token) {
26 //log.info(token.isComplete() + "");
27 }
28
29 @Override
30 public void messageArrived(String topic, MqttMessage message) throws Exception {
31 log.info("Topic: " + topic);
32 log.info("Message: " + new String(message.getPayload()));
33 }
34
35 }
1 package com.wunaozai.mqtt.tools;
2
3 import org.eclipse.paho.client.mqttv3.MqttClient;
4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
6 import org.eclipse.paho.client.mqttv3.MqttMessage;
7 import org.eclipse.paho.client.mqttv3.MqttTopic;
8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 /**
13 * 创建一个MQTT客户端
14 * @author wunaozai
15 * @date 2018-08-22
16 */
17 public class MqttPushClient {
18
19 private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
20 public static String MQTT_HOST = "";
21 public static String MQTT_CLIENTID = "";
22 public static String MQTT_USERNAME = "";
23 public static String MQTT_PASSWORD = "";
24 public static int MQTT_TIMEOUT = 10;
25 public static int MQTT_KEEPALIVE = 10;
26
27 private MqttClient client;
28 private static volatile MqttPushClient mqttClient = null;
29 public static MqttPushClient getInstance() {
30 if(mqttClient == null) {
31 synchronized (MqttPushClient.class) {
32 if(mqttClient == null) {
33 mqttClient = new MqttPushClient();
34 }
35 }
36 }
37 return mqttClient;
38 }
39
40 private MqttPushClient() {
41 log.info("Connect MQTT: " + this);
42 connect();
43 }
44
45 private void connect() {
46 try {
47 client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
48 MqttConnectOptions option = new MqttConnectOptions();
49 option.setCleanSession(true);
50 option.setUserName(MQTT_USERNAME);
51 option.setPassword(MQTT_PASSWORD.toCharArray());
52 option.setConnectionTimeout(MQTT_TIMEOUT);
53 option.setKeepAliveInterval(MQTT_KEEPALIVE);
54 option.setAutomaticReconnect(true);
55 try {
56 client.setCallback(new MqttPushCallback());
57 client.connect(option);
58 } catch (Exception e) {
59 e.printStackTrace();
60 }
61 } catch (Exception e) {
62 e.printStackTrace();
63 }
64 }
65 /**
66 * 发布主题,用于通知
67 * 默认qos为1 非持久化
68 * @param topic
69 * @param data
70 */
71 public void publish(String topic, String data) {
72 publish(topic, data, 1, false);
73 }
74 /**
75 * 发布
76 * @param topic
77 * @param data
78 * @param qos
79 * @param retained
80 */
81 public void publish(String topic, String data, int qos, boolean retained) {
82 MqttMessage message = new MqttMessage();
83 message.setQos(qos);
84 message.setRetained(retained);
85 message.setPayload(data.getBytes());
86 MqttTopic mqttTopic = client.getTopic(topic);
87 if(null == mqttTopic) {
88 log.error("Topic Not Exist");
89 }
90 MqttDeliveryToken token;
91 try {
92 token = mqttTopic.publish(message);
93 token.waitForCompletion();
94 } catch (Exception e) {
95 e.printStackTrace();
96 }
97 }
98 /**
99 * 订阅某个主题 qos默认为1
100 * @param topic
101 */
102 public void subscribe(String topic) {
103 subscribe(topic, 1);
104 }
105 /**
106 * 订阅某个主题
107 * @param topic
108 * @param qos
109 */
110 public void subscribe(String topic, int qos) {
111 try {
112 client.subscribe(topic, qos);
113 } catch (Exception e) {
114 e.printStackTrace();
115 }
116 }
117 }
为了下下篇博客Grafana有数据可以展示,我需要开发一个PC小工具【设备仿真】,用来模拟设备一直发送数据。这里就不对C#开发进行过多的说明了。通过nuget,引入第三方mqtt库。这个工具是我现在开发平台工具链的一个小工具。至于里面的Payload协议,可以不用管。读者可以根据自己的业务制定自己的通信协议。 1 using MQTTClient.Model;
2 using MQTTnet;
3 using MQTTnet.Core;
4 using MQTTnet.Core.Client;
5 using Newtonsoft.Json;
6 using System;
7 using System.Collections.Generic;
8 using System.Text;
9 using System.Threading.Tasks;
10 using System.Windows.Forms;
11
12 namespace MQTTClient
13 {
14 public partial class MainPage : Form
15 {
16 public MainPage()
17 {
18 InitializeComponent();
19 init();
20 }
21 private void init()
22 {
23 txtusername.Text = "";
24 txtpassword.Text = "";
25 txtclientid.Text = "";
26 txttopic.Text = "iot/UUID/device/devicepub/update";
27 }
28
29 IMqttClient client = null;
30 private async Task ConnectMqttServerAsync()
31 {
32 if(client == null)
33 {
34 client = new MqttClientFactory().CreateMqttClient() as MqttClient;
35 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived;
36 client.Connected += mqttClientConnected;
37 client.Disconnected += mqttClientDisconnected;
38 }
39 try
40 {
41 await client.DisconnectAsync();
42 var option = getMQTTOption();
43 await client.ConnectAsync(option);
44 }catch(Exception e)
45 {
46 Invoke((new Action(() =>
47 {
48 lblStatus.Text = "连接服务器失败: " + e.Message;
49 })));
50 }
51 }
52 private void mqttClientDisconnected(object sender, EventArgs e)
53 {
54 Invoke((new Action(() =>
55 {
56 lblStatus.Text = "连接服务器失败: ERROR";
57 })));
58 }
59 private void mqttClientConnected(object sender, EventArgs e)
60 {
61 Invoke((new Action(() =>
62 {
63 lblStatus.Text = "连接服务器成功";
64 })));
65 }
66 private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
67 {
68 //本工具部收数据
69 throw new NotImplementedException();
70 }
71
72 private void btnconnect_Click(object sender, EventArgs e)
73 {
74 Task.Run(async () => { await ConnectMqttServerAsync(); });
75 }
76 private void btndisconnect_Click(object sender, EventArgs e)
77 {
78 client.DisconnectAsync();
79 }
80 private void btnsendone_Click(object sender, EventArgs e)
81 {
82 sendPayload();
83 }
84 private void btnsendts_Click(object sender, EventArgs e)
85 {
86 timer1.Interval = Convert.ToInt32(txttime.Text);
87 timer1.Enabled = true;
88 }
89 private void btnstopts_Click(object sender, EventArgs e)
90 {
91 timer1.Enabled = false;
92 }
93 private void timer1_Tick(object sender, EventArgs e)
94 {
95 sendPayload();
96 }
97 private int sendPayload()
98 {
99 if (client.IsConnected == false)
100 {
101 return -1;
102 }
103 PayloadModel payload = getPayload();
104 string json = JsonConvert.SerializeObject(payload, Formatting.Indented);
105 txtview.Text = json;
106 string topic = txttopic.Text;
107 var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),
108 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);
109 client.PublishAsync(msg);
110 lblSendStatus.Text = "发送: " + DateTime.Now.ToLongTimeString();
111 return 0;
112 }
113
114 private MqttClientTcpOptions getMQTTOption()
115 {
116 MqttClientTcpOptions option = new MqttClientTcpOptions();
117 string hostname = txthostname.Text;
118 string[] host_port = hostname.Split(‘:‘);
119 int port = 1883;
120 if(host_port.Length >= 2)
121 {
122 hostname = host_port[0];
123 port = Convert.ToInt32(host_port[1]);
124 }
125 option.Server = hostname;
126 option.ClientId = txtclientid.Text;
127 option.UserName = txtusername.Text;
128 option.Password = txtpassword.Text;
129 option.Port = port;
130 option.CleanSession = true;
131 return option;
132 }
133
134 private PayloadModel getPayload()
135 {
136 PayloadModel payload = new PayloadModel();
137 //略
138 return payload;
139 }
140
141 Random rand1 = new Random(System.DateTime.Now.Millisecond);
142 private int getRandomNum()
143 {
144 int data = rand1.Next(0, 100);
145 return data;
146 }
147
148 int linenum = 0;
149 Random rand2 = new Random(System.DateTime.Now.Millisecond);
150 private int getLineNum()
151 {
152 int f = rand2.Next(0, 100);
153 int data = rand2.Next(0, 5);
154 if(f % 2 == 1)
155 {
156 linenum += data;
157 }
158 else
159 {
160 linenum -= data;
161 }
162 return linenum;
163 }
164
165 }
166 }
上一篇:排序检索数据
文章标题:物联网架构成长之路(32)-SpringBoot集成MQTT客户端
文章链接:http://soscw.com/essay/31912.html