博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
物联网架构成长之路(32)-SpringBoot集成MQTT客户端
阅读量:4982 次
发布时间:2019-06-12

本文共 12438 字,大约阅读时间需要 41 分钟。

一、前言

  这里虽然是说MQTT客户端。其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情。比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT。那么就可以通过HTTP请求业务服务器。然后由业务服务器利用这个MQTT客户端进行发送数据。
  还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊。这里给出的结论是不需要。保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置Web消息转发【EMQ 3.x 版本】,还有就是这种通过业务服务器订阅根Topic来保存物联网原始数据。
  这篇博客这讨论如何把MQTT客户端集成到业务服务器上(基于SpringBoot 2.0)。下一篇博客会讲到数据保存到InfluxDB,然后如何通过Grafana进行可视化Dashboard看板模式展示。

 

二、配置pom.xml,引入第三方库

1         
2
3
org.springframework.boot
4
spring-boot-starter-integration
5
6
7
org.springframework.integration
8
spring-integration-stream
9
10
11
org.springframework.integration
12
spring-integration-mqtt
13

 

三、MQTT客户端代码(Java)

  MqttDemoApplication.java

1 package com.wunaozai.mqtt; 2  3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5  6 import com.wunaozai.mqtt.tools.MqttPushClient; 7  8 @SpringBootApplication 9 public class MqttDemoApplication {10 11     public static void main(String[] args) {12         SpringApplication.run(MqttDemoApplication.class, args);13         14         test();15     }16 17     18     private static void test(){19         MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";20         MqttPushClient.MQTT_CLIENTID = "client";21         MqttPushClient.MQTT_USERNAME = "username";22         MqttPushClient.MQTT_PASSWORD = "password";23         MqttPushClient client = MqttPushClient.getInstance();24         client.subscribe("/#");25     }26 }

  MqttPushCallback.java

1 package com.wunaozai.mqtt.tools; 2  3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 4 import org.eclipse.paho.client.mqttv3.MqttCallback; 5 import org.eclipse.paho.client.mqttv3.MqttMessage; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8  9 /**10  * MQTT 推送回调11  * @author wunaozai12  * @date 2018-08-2213  */14 public class MqttPushCallback implements MqttCallback {15     16     private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);17 18     @Override19     public void connectionLost(Throwable cause) {20         log.info("断开连接,建议重连" + this);21         //断开连接,建议重连22     }23 24     @Override25     public void deliveryComplete(IMqttDeliveryToken token) {26         //log.info(token.isComplete() + "");27     }28 29     @Override30     public void messageArrived(String topic, MqttMessage message) throws Exception {31         log.info("Topic: " + topic);32         log.info("Message: " + new String(message.getPayload()));33     }34 35 }

  MqttPushClient.java

1 package com.wunaozai.mqtt.tools;  2   3 import org.eclipse.paho.client.mqttv3.MqttClient;  4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  6 import org.eclipse.paho.client.mqttv3.MqttMessage;  7 import org.eclipse.paho.client.mqttv3.MqttTopic;  8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;  9 import org.slf4j.Logger; 10 import org.slf4j.LoggerFactory; 11  12 /** 13  * 创建一个MQTT客户端 14  * @author wunaozai 15  * @date 2018-08-22 16  */ 17 public class MqttPushClient { 18      19     private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); 20     public static String MQTT_HOST = ""; 21     public static String MQTT_CLIENTID = ""; 22     public static String MQTT_USERNAME = ""; 23     public static String MQTT_PASSWORD = ""; 24     public static int MQTT_TIMEOUT = 10; 25     public static int MQTT_KEEPALIVE = 10; 26      27     private MqttClient client; 28     private static volatile MqttPushClient mqttClient = null; 29     public static MqttPushClient getInstance() { 30         if(mqttClient == null) { 31             synchronized (MqttPushClient.class) { 32                 if(mqttClient == null) { 33                     mqttClient = new MqttPushClient(); 34                 } 35             } 36         } 37         return mqttClient; 38     } 39      40     private MqttPushClient() { 41         log.info("Connect MQTT: " + this); 42         connect(); 43     } 44      45     private void connect() { 46         try { 47             client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence()); 48             MqttConnectOptions option = new MqttConnectOptions(); 49             option.setCleanSession(true); 50             option.setUserName(MQTT_USERNAME); 51             option.setPassword(MQTT_PASSWORD.toCharArray()); 52             option.setConnectionTimeout(MQTT_TIMEOUT); 53             option.setKeepAliveInterval(MQTT_KEEPALIVE); 54             option.setAutomaticReconnect(true); 55             try { 56                 client.setCallback(new MqttPushCallback()); 57                 client.connect(option); 58             } catch (Exception e) { 59                 e.printStackTrace(); 60             } 61         } catch (Exception e) { 62             e.printStackTrace(); 63         } 64     } 65     /** 66      * 发布主题,用于通知
67 * 默认qos为1 非持久化 68 * @param topic 69 * @param data 70 */ 71 public void publish(String topic, String data) { 72 publish(topic, data, 1, false); 73 } 74 /** 75 * 发布 76 * @param topic 77 * @param data 78 * @param qos 79 * @param retained 80 */ 81 public void publish(String topic, String data, int qos, boolean retained) { 82 MqttMessage message = new MqttMessage(); 83 message.setQos(qos); 84 message.setRetained(retained); 85 message.setPayload(data.getBytes()); 86 MqttTopic mqttTopic = client.getTopic(topic); 87 if(null == mqttTopic) { 88 log.error("Topic Not Exist"); 89 } 90 MqttDeliveryToken token; 91 try { 92 token = mqttTopic.publish(message); 93 token.waitForCompletion(); 94 } catch (Exception e) { 95 e.printStackTrace(); 96 } 97 } 98 /** 99 * 订阅某个主题 qos默认为1100 * @param topic101 */102 public void subscribe(String topic) {103 subscribe(topic, 1);104 }105 /**106 * 订阅某个主题107 * @param topic108 * @param qos109 */110 public void subscribe(String topic, int qos) {111 try {112 client.subscribe(topic, qos);113 } catch (Exception e) {114 e.printStackTrace();115 }116 }117 }

 

四、MQTT客户端代码(C#)

  为了下下篇博客Grafana有数据可以展示,我需要开发一个PC小工具【设备仿真】,用来模拟设备一直发送数据。这里就不对C#开发进行过多的说明了。通过nuget,引入第三方mqtt库。这个工具是我现在开发平台工具链的一个小工具。至于里面的Payload协议,可以不用管。读者可以根据自己的业务制定自己的通信协议。

  部分C#代码(连接服务器与发送数据)

1 using MQTTClient.Model;  2 using MQTTnet;  3 using MQTTnet.Core;  4 using MQTTnet.Core.Client;  5 using Newtonsoft.Json;  6 using System;  7 using System.Collections.Generic;  8 using System.Text;  9 using System.Threading.Tasks; 10 using System.Windows.Forms; 11  12 namespace MQTTClient 13 { 14     public partial class MainPage : Form 15     { 16         public MainPage() 17         { 18             InitializeComponent(); 19             init(); 20         } 21         private void init() 22         { 23             txtusername.Text = ""; 24             txtpassword.Text = ""; 25             txtclientid.Text = ""; 26             txttopic.Text = "iot/UUID/device/devicepub/update"; 27         } 28  29         IMqttClient client = null; 30         private async Task ConnectMqttServerAsync() 31         { 32             if(client == null) 33             { 34                 client = new MqttClientFactory().CreateMqttClient() as MqttClient; 35                 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived; 36                 client.Connected += mqttClientConnected; 37                 client.Disconnected += mqttClientDisconnected; 38             } 39             try 40             { 41                 await client.DisconnectAsync(); 42                 var option = getMQTTOption(); 43                 await client.ConnectAsync(option); 44             }catch(Exception e) 45             { 46                 Invoke((new Action(() => 47                 { 48                     lblStatus.Text = "连接服务器失败: " + e.Message; 49                 }))); 50             } 51         } 52         private void mqttClientDisconnected(object sender, EventArgs e) 53         { 54             Invoke((new Action(() => 55             { 56                 lblStatus.Text = "连接服务器失败: ERROR"; 57             }))); 58         } 59         private void mqttClientConnected(object sender, EventArgs e) 60         { 61             Invoke((new Action(() => 62             { 63                 lblStatus.Text = "连接服务器成功"; 64             }))); 65         } 66         private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 67         { 68             //本工具部收数据 69             throw new NotImplementedException(); 70         } 71  72         private void btnconnect_Click(object sender, EventArgs e) 73         { 74             Task.Run(async () => { await ConnectMqttServerAsync(); }); 75         } 76         private void btndisconnect_Click(object sender, EventArgs e) 77         { 78             client.DisconnectAsync(); 79         } 80         private void btnsendone_Click(object sender, EventArgs e) 81         { 82             sendPayload(); 83         } 84         private void btnsendts_Click(object sender, EventArgs e) 85         { 86             timer1.Interval = Convert.ToInt32(txttime.Text); 87             timer1.Enabled = true; 88         } 89         private void btnstopts_Click(object sender, EventArgs e) 90         { 91             timer1.Enabled = false; 92         } 93         private void timer1_Tick(object sender, EventArgs e) 94         { 95             sendPayload(); 96         } 97         private int sendPayload() 98         { 99             if (client.IsConnected == false)100             {101                 return -1;102             }103             PayloadModel payload = getPayload();104             string json = JsonConvert.SerializeObject(payload, Formatting.Indented);105             txtview.Text = json;106             string topic = txttopic.Text;107             var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),108                 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);109             client.PublishAsync(msg);110             lblSendStatus.Text = "发送: " + DateTime.Now.ToLongTimeString();111             return 0;112         }113 114         private MqttClientTcpOptions getMQTTOption()115         {116             MqttClientTcpOptions option = new MqttClientTcpOptions();117             string hostname = txthostname.Text;118             string[] host_port = hostname.Split(':');119             int port = 1883;120             if(host_port.Length >= 2)121             {122                 hostname = host_port[0];123                 port = Convert.ToInt32(host_port[1]);124             }125             option.Server = hostname;126             option.ClientId = txtclientid.Text;127             option.UserName = txtusername.Text;128             option.Password = txtpassword.Text;129             option.Port = port;130             option.CleanSession = true;131             return option;132         }133 134         private PayloadModel getPayload()135         {136             PayloadModel payload = new PayloadModel();137             //略138             return payload;139         }140 141         Random rand1 = new Random(System.DateTime.Now.Millisecond);142         private int getRandomNum()143         {144             int data = rand1.Next(0, 100);145             return data;146         }147 148         int linenum = 0;149         Random rand2 = new Random(System.DateTime.Now.Millisecond);150         private int getLineNum()151         {152             int f = rand2.Next(0, 100);153             int data = rand2.Next(0, 5);154             if(f % 2 == 1)155             {156                 linenum += data;157             }158             else159             {160                 linenum -= data;161             }162             return linenum;163         }164 165     }166 }

 

本文地址: 

转载于:https://www.cnblogs.com/wunaozai/p/11147841.html

你可能感兴趣的文章