一、前言
这里虽然是说MQTT客户端。其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情。比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT。那么就可以通过HTTP请求业务服务器。然后由业务服务器利用这个MQTT客户端进行发送数据。 还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊。这里给出的结论是不需要。保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置Web消息转发【EMQ 3.x 版本】,还有就是这种通过业务服务器订阅根Topic来保存物联网原始数据。 这篇博客这讨论如何把MQTT客户端集成到业务服务器上(基于SpringBoot 2.0)。下一篇博客会讲到数据保存到InfluxDB,然后如何通过Grafana进行可视化Dashboard看板模式展示。
二、配置pom.xml,引入第三方库
1 23 6org.springframework.boot 4spring-boot-starter-integration 57 10org.springframework.integration 8spring-integration-stream 911 org.springframework.integration 12spring-integration-mqtt 13
三、MQTT客户端代码(Java)
MqttDemoApplication.java
1 package com.wunaozai.mqtt; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 import com.wunaozai.mqtt.tools.MqttPushClient; 7 8 @SpringBootApplication 9 public class MqttDemoApplication {10 11 public static void main(String[] args) {12 SpringApplication.run(MqttDemoApplication.class, args);13 14 test();15 }16 17 18 private static void test(){19 MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";20 MqttPushClient.MQTT_CLIENTID = "client";21 MqttPushClient.MQTT_USERNAME = "username";22 MqttPushClient.MQTT_PASSWORD = "password";23 MqttPushClient client = MqttPushClient.getInstance();24 client.subscribe("/#");25 }26 }
MqttPushCallback.java
1 package com.wunaozai.mqtt.tools; 2 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 4 import org.eclipse.paho.client.mqttv3.MqttCallback; 5 import org.eclipse.paho.client.mqttv3.MqttMessage; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 /**10 * MQTT 推送回调11 * @author wunaozai12 * @date 2018-08-2213 */14 public class MqttPushCallback implements MqttCallback {15 16 private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);17 18 @Override19 public void connectionLost(Throwable cause) {20 log.info("断开连接,建议重连" + this);21 //断开连接,建议重连22 }23 24 @Override25 public void deliveryComplete(IMqttDeliveryToken token) {26 //log.info(token.isComplete() + "");27 }28 29 @Override30 public void messageArrived(String topic, MqttMessage message) throws Exception {31 log.info("Topic: " + topic);32 log.info("Message: " + new String(message.getPayload()));33 }34 35 }
MqttPushClient.java
1 package com.wunaozai.mqtt.tools; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 6 import org.eclipse.paho.client.mqttv3.MqttMessage; 7 import org.eclipse.paho.client.mqttv3.MqttTopic; 8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 9 import org.slf4j.Logger; 10 import org.slf4j.LoggerFactory; 11 12 /** 13 * 创建一个MQTT客户端 14 * @author wunaozai 15 * @date 2018-08-22 16 */ 17 public class MqttPushClient { 18 19 private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); 20 public static String MQTT_HOST = ""; 21 public static String MQTT_CLIENTID = ""; 22 public static String MQTT_USERNAME = ""; 23 public static String MQTT_PASSWORD = ""; 24 public static int MQTT_TIMEOUT = 10; 25 public static int MQTT_KEEPALIVE = 10; 26 27 private MqttClient client; 28 private static volatile MqttPushClient mqttClient = null; 29 public static MqttPushClient getInstance() { 30 if(mqttClient == null) { 31 synchronized (MqttPushClient.class) { 32 if(mqttClient == null) { 33 mqttClient = new MqttPushClient(); 34 } 35 } 36 } 37 return mqttClient; 38 } 39 40 private MqttPushClient() { 41 log.info("Connect MQTT: " + this); 42 connect(); 43 } 44 45 private void connect() { 46 try { 47 client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence()); 48 MqttConnectOptions option = new MqttConnectOptions(); 49 option.setCleanSession(true); 50 option.setUserName(MQTT_USERNAME); 51 option.setPassword(MQTT_PASSWORD.toCharArray()); 52 option.setConnectionTimeout(MQTT_TIMEOUT); 53 option.setKeepAliveInterval(MQTT_KEEPALIVE); 54 option.setAutomaticReconnect(true); 55 try { 56 client.setCallback(new MqttPushCallback()); 57 client.connect(option); 58 } catch (Exception e) { 59 e.printStackTrace(); 60 } 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } 64 } 65 /** 66 * 发布主题,用于通知 67 * 默认qos为1 非持久化 68 * @param topic 69 * @param data 70 */ 71 public void publish(String topic, String data) { 72 publish(topic, data, 1, false); 73 } 74 /** 75 * 发布 76 * @param topic 77 * @param data 78 * @param qos 79 * @param retained 80 */ 81 public void publish(String topic, String data, int qos, boolean retained) { 82 MqttMessage message = new MqttMessage(); 83 message.setQos(qos); 84 message.setRetained(retained); 85 message.setPayload(data.getBytes()); 86 MqttTopic mqttTopic = client.getTopic(topic); 87 if(null == mqttTopic) { 88 log.error("Topic Not Exist"); 89 } 90 MqttDeliveryToken token; 91 try { 92 token = mqttTopic.publish(message); 93 token.waitForCompletion(); 94 } catch (Exception e) { 95 e.printStackTrace(); 96 } 97 } 98 /** 99 * 订阅某个主题 qos默认为1100 * @param topic101 */102 public void subscribe(String topic) {103 subscribe(topic, 1);104 }105 /**106 * 订阅某个主题107 * @param topic108 * @param qos109 */110 public void subscribe(String topic, int qos) {111 try {112 client.subscribe(topic, qos);113 } catch (Exception e) {114 e.printStackTrace();115 }116 }117 }
四、MQTT客户端代码(C#)
为了下下篇博客Grafana有数据可以展示,我需要开发一个PC小工具【设备仿真】,用来模拟设备一直发送数据。这里就不对C#开发进行过多的说明了。通过nuget,引入第三方mqtt库。这个工具是我现在开发平台工具链的一个小工具。至于里面的Payload协议,可以不用管。读者可以根据自己的业务制定自己的通信协议。部分C#代码(连接服务器与发送数据)
1 using MQTTClient.Model; 2 using MQTTnet; 3 using MQTTnet.Core; 4 using MQTTnet.Core.Client; 5 using Newtonsoft.Json; 6 using System; 7 using System.Collections.Generic; 8 using System.Text; 9 using System.Threading.Tasks; 10 using System.Windows.Forms; 11 12 namespace MQTTClient 13 { 14 public partial class MainPage : Form 15 { 16 public MainPage() 17 { 18 InitializeComponent(); 19 init(); 20 } 21 private void init() 22 { 23 txtusername.Text = ""; 24 txtpassword.Text = ""; 25 txtclientid.Text = ""; 26 txttopic.Text = "iot/UUID/device/devicepub/update"; 27 } 28 29 IMqttClient client = null; 30 private async Task ConnectMqttServerAsync() 31 { 32 if(client == null) 33 { 34 client = new MqttClientFactory().CreateMqttClient() as MqttClient; 35 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived; 36 client.Connected += mqttClientConnected; 37 client.Disconnected += mqttClientDisconnected; 38 } 39 try 40 { 41 await client.DisconnectAsync(); 42 var option = getMQTTOption(); 43 await client.ConnectAsync(option); 44 }catch(Exception e) 45 { 46 Invoke((new Action(() => 47 { 48 lblStatus.Text = "连接服务器失败: " + e.Message; 49 }))); 50 } 51 } 52 private void mqttClientDisconnected(object sender, EventArgs e) 53 { 54 Invoke((new Action(() => 55 { 56 lblStatus.Text = "连接服务器失败: ERROR"; 57 }))); 58 } 59 private void mqttClientConnected(object sender, EventArgs e) 60 { 61 Invoke((new Action(() => 62 { 63 lblStatus.Text = "连接服务器成功"; 64 }))); 65 } 66 private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 67 { 68 //本工具部收数据 69 throw new NotImplementedException(); 70 } 71 72 private void btnconnect_Click(object sender, EventArgs e) 73 { 74 Task.Run(async () => { await ConnectMqttServerAsync(); }); 75 } 76 private void btndisconnect_Click(object sender, EventArgs e) 77 { 78 client.DisconnectAsync(); 79 } 80 private void btnsendone_Click(object sender, EventArgs e) 81 { 82 sendPayload(); 83 } 84 private void btnsendts_Click(object sender, EventArgs e) 85 { 86 timer1.Interval = Convert.ToInt32(txttime.Text); 87 timer1.Enabled = true; 88 } 89 private void btnstopts_Click(object sender, EventArgs e) 90 { 91 timer1.Enabled = false; 92 } 93 private void timer1_Tick(object sender, EventArgs e) 94 { 95 sendPayload(); 96 } 97 private int sendPayload() 98 { 99 if (client.IsConnected == false)100 {101 return -1;102 }103 PayloadModel payload = getPayload();104 string json = JsonConvert.SerializeObject(payload, Formatting.Indented);105 txtview.Text = json;106 string topic = txttopic.Text;107 var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),108 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);109 client.PublishAsync(msg);110 lblSendStatus.Text = "发送: " + DateTime.Now.ToLongTimeString();111 return 0;112 }113 114 private MqttClientTcpOptions getMQTTOption()115 {116 MqttClientTcpOptions option = new MqttClientTcpOptions();117 string hostname = txthostname.Text;118 string[] host_port = hostname.Split(':');119 int port = 1883;120 if(host_port.Length >= 2)121 {122 hostname = host_port[0];123 port = Convert.ToInt32(host_port[1]);124 }125 option.Server = hostname;126 option.ClientId = txtclientid.Text;127 option.UserName = txtusername.Text;128 option.Password = txtpassword.Text;129 option.Port = port;130 option.CleanSession = true;131 return option;132 }133 134 private PayloadModel getPayload()135 {136 PayloadModel payload = new PayloadModel();137 //略138 return payload;139 }140 141 Random rand1 = new Random(System.DateTime.Now.Millisecond);142 private int getRandomNum()143 {144 int data = rand1.Next(0, 100);145 return data;146 }147 148 int linenum = 0;149 Random rand2 = new Random(System.DateTime.Now.Millisecond);150 private int getLineNum()151 {152 int f = rand2.Next(0, 100);153 int data = rand2.Next(0, 5);154 if(f % 2 == 1)155 {156 linenum += data;157 }158 else159 {160 linenum -= data;161 }162 return linenum;163 }164 165 }166 }
本文地址: