版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
第Springboot集成mqtt客戶端詳解目錄1.前言2.引入依賴3.配置文件4.MQTT消息類5.MQTT消息接收器6.MQTT消息發(fā)送器7.測試MQTT發(fā)送消息
1.前言
?這里我們使用springboot搭建一個輕量級的mqtt客戶端,連接mqtt的Broker服務(wù)。
?連接信息寫在配置文件里perties
spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url=tcp://:1883
spring.mqtt.client-id=server_client_${random.value}
spring.mqtt.default-topic=$SYS/brokers/+/clients/#
pletionTimeout=3000
spring.mqtt.keepAlive=60
2.引入依賴
!--mqtt--
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-integration/artifactId
/dependency
dependency
groupIdegration/groupId
artifactIdspring-integration-stream/artifactId
/dependency
dependency
groupIdegration/groupId
artifactIdspring-integration-mqtt/artifactId
/dependency
3.配置文件
?新建MqttProperties.java文件,初始化application里的mqtt配置項
@ConfigurationProperties("spring.mqtt")
@Component
@Getter
@Setter
publicclassMqttProperties{
privateStringusername;
privateStringmqpassword;
privateStringhostUrl;
privateStringclientId;
privateStringdefaultTopic;
privateStringcompletionTimeout;
privateIntegerkeepAlive;
}
?新建MqttConfiguration.java文件,為mqtt做初始化配置
@Configuration
@Slf4j
publicclassMqttConfiguration{
@Autowired
privateMqttPropertiesmqttProperties;
*事件觸發(fā)
@Autowired
privateApplicationEventPublishereventPublisher;
@Bean
publicMqttConnectOptionsgetMqttConnectOptions(){
MqttConnectOptionsmqttConnectOptions=newMqttConnectOptions();
mqttConnectOptions.setUserName(mqttProperties.getUsername());
mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());
mqttConnectOptions.setServerURIs(newString[]{mqttProperties.getHostUrl()});
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
returnmqttConnectOptions;
@Bean
publicMqttPahoClientFactorymqttClientFactory(){
DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
returnfactory;
@Bean
publicMessageChannelmqttInputChannel(){
returnnewDirectChannel();
*配置client,監(jiān)聽的topic
@Bean
publicMessageProducerinbound(){
MqttPahoMessageDrivenChannelAdapteradapter=
newMqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound",mqttClientFactory(),
mqttProperties.getDefaultTopic().split(","));
adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));
adapter.setConverter(newDefaultPahoMessageConverter());
//默認添加TopicName中所有tipic
adapter.addTopic("+/+/test");
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
returnadapter;
@Bean
@ServiceActivator(inputChannel="mqttInputChannel")
publicMessageHandlerhandler(){
returnnewMessageHandler(){
@Override
publicvoidhandleMessage(Messagemessage)throwsMessagingException{
Stringtopic=message.getHeaders().get("mqtt_receivedTopic").toString();
Stringqos=message.getHeaders().get("mqtt_receivedQos").toString();
//觸發(fā)事件這里不再做業(yè)務(wù)處理,包listener中做處理
eventPublisher.publishEvent(newMqttEvent(this,topic,message.getPayload().toString()));
*發(fā)送消息和消費消息Channel可以使用相同MqttPahoClientFactory
*@return
@Bean
@ServiceActivator(inputChannel="mqttOutboundChannel")
publicMessageHandlermqttOutbound(){
//在這里進行mqttOutboundChannel的相關(guān)設(shè)置
MqttPahoMessageHandlermessageHandler=newMqttPahoMessageHandler(mqttProperties.getClientId(),mqttClientFactory());
//如果設(shè)置成true,發(fā)送消息時將不會阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
returnmessageHandler;
@Bean
publicMessageChannelmqttOutboundChannel(){
returnnewDirectChannel();
}
4.MQTT消息類
新建MqttEvent.java消息類。用于發(fā)送mqtt的消息
@Getter
publicclassMqttEventextendsApplicationEvent{
privateStringtopic;
*發(fā)送的消息
privateStringmessage;
publicMqttEvent(Objectsource,Stringtopic,Stringmessage){
super(source);
this.topic=topic;
this.message=message;
}
5.MQTT消息接收器
新建JobListener.java文件作為mqtt的消息接收類
@Slf4j
@Component
publicclassJobListener{
@Autowired
DeviceDaodeviceDao;
*監(jiān)聽topic
*@parammqttEvent
@EventListener(condition="#mqttEvent.topic.startsWith('pay')")
publicvoidonEmqttCall1(MqttEventmqttEvent)throwsException{
Stringtopic=mqttEvent.getTopic();
//寫邏輯處理
*監(jiān)聽topic
*@parammqttEvent
@EventListener(condition="#mqttEvent.topic.equals('device')")
publicvoidonEmqttCallT(MqttEventmqttEvent){
("接收到消11111111111:"+mqttEvent.getMessage());
}
6.MQTT消息發(fā)送器
新建MqttGateway.java提供發(fā)送mqttt消息的接口服務(wù)
@Component
@MessagingGateway(defaultRequestChannel="mqttOutboundChannel")
publicinterfaceMqttGateway{
voidsendToMqtt(Stringdata);
voidsendToMqtt(@Header(MqttHeaders.TOPIC)Stringtopic,Stringpayload);
voidsendToMqtt(@Header(MqttHe
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年中職第二學(xué)年(護理)老年照護專項試題及答案
- 2025年大學(xué)本科(食品質(zhì)量與安全)食品分析試題及答案
- 2025年大學(xué)食品科學(xué)與工程(食品工程)試題及答案
- 2025年中職焊接技術(shù)與自動化(手工焊接)試題及答案
- 養(yǎng)老院老人心理咨詢師培訓(xùn)制度
- 養(yǎng)老院心理慰藉制度
- 公共交通從業(yè)人員培訓(xùn)考核制度
- 2026年人工智能計算機視覺基礎(chǔ)知識題庫含答案
- 2026年刮痧師中醫(yī)理論考核試題含答案
- 2026年中級公共文化服務(wù)面試題及答案
- 土壤微生物群落結(jié)構(gòu)優(yōu)化研究
- 2024外研版四年級英語上冊Unit 4知識清單
- 四川省南充市2024-2025學(xué)年部編版七年級上學(xué)期期末歷史試題
- 國有企業(yè)三位一體推進內(nèi)控風(fēng)控合規(guī)建設(shè)的問題和分析
- 急診預(yù)檢分診課件教學(xué)
- 2025年高二數(shù)學(xué)建模試題及答案
- 儲能集裝箱知識培訓(xùn)總結(jié)課件
- 幼兒園中班語言《雪房子》課件
- 房地產(chǎn)項目開發(fā)管理方案
- 堆垛車安全培訓(xùn)課件
- 貝林妥單抗護理要點
評論
0/150
提交評論