Springboot集成mqtt客戶端詳解_第1頁
Springboot集成mqtt客戶端詳解_第2頁
Springboot集成mqtt客戶端詳解_第3頁
Springboot集成mqtt客戶端詳解_第4頁
Springboot集成mqtt客戶端詳解_第5頁
已閱讀5頁,還剩2頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第Springboot集成mqtt客戶端詳解目錄1.前言2.引入依賴3.配置文件4.MQTT消息類5.MQTT消息接收器6.MQTT消息發(fā)送器7.測試MQTT發(fā)送消息

1.前言

?這里我們使用springboot搭建一個輕量級的mqtt客戶端,連接mqtt的Broker服務(wù)。

?連接信息寫在配置文件里perties

spring.mqtt.username=admin

spring.mqtt.mqpassword=admin

spring.mqtt.host-url=tcp://:1883

spring.mqtt.client-id=server_client_${random.value}

spring.mqtt.default-topic=$SYS/brokers/+/clients/#

pletionTimeout=3000

spring.mqtt.keepAlive=60

2.引入依賴

!--mqtt--

dependency

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-integration/artifactId

/dependency

dependency

groupIdegration/groupId

artifactIdspring-integration-stream/artifactId

/dependency

dependency

groupIdegration/groupId

artifactIdspring-integration-mqtt/artifactId

/dependency

3.配置文件

?新建MqttProperties.java文件,初始化application里的mqtt配置項

@ConfigurationProperties("spring.mqtt")

@Component

@Getter

@Setter

publicclassMqttProperties{

privateStringusername;

privateStringmqpassword;

privateStringhostUrl;

privateStringclientId;

privateStringdefaultTopic;

privateStringcompletionTimeout;

privateIntegerkeepAlive;

}

?新建MqttConfiguration.java文件,為mqtt做初始化配置

@Configuration

@Slf4j

publicclassMqttConfiguration{

@Autowired

privateMqttPropertiesmqttProperties;

*事件觸發(fā)

@Autowired

privateApplicationEventPublishereventPublisher;

@Bean

publicMqttConnectOptionsgetMqttConnectOptions(){

MqttConnectOptionsmqttConnectOptions=newMqttConnectOptions();

mqttConnectOptions.setUserName(mqttProperties.getUsername());

mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());

mqttConnectOptions.setServerURIs(newString[]{mqttProperties.getHostUrl()});

mqttConnectOptions.setKeepAliveInterval(2);

mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());

returnmqttConnectOptions;

@Bean

publicMqttPahoClientFactorymqttClientFactory(){

DefaultMqttPahoClientFactoryfactory=newDefaultMqttPahoClientFactory();

factory.setConnectionOptions(getMqttConnectOptions());

returnfactory;

@Bean

publicMessageChannelmqttInputChannel(){

returnnewDirectChannel();

*配置client,監(jiān)聽的topic

@Bean

publicMessageProducerinbound(){

MqttPahoMessageDrivenChannelAdapteradapter=

newMqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound",mqttClientFactory(),

mqttProperties.getDefaultTopic().split(","));

adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));

adapter.setConverter(newDefaultPahoMessageConverter());

//默認添加TopicName中所有tipic

adapter.addTopic("+/+/test");

adapter.setQos(2);

adapter.setOutputChannel(mqttInputChannel());

returnadapter;

@Bean

@ServiceActivator(inputChannel="mqttInputChannel")

publicMessageHandlerhandler(){

returnnewMessageHandler(){

@Override

publicvoidhandleMessage(Messagemessage)throwsMessagingException{

Stringtopic=message.getHeaders().get("mqtt_receivedTopic").toString();

Stringqos=message.getHeaders().get("mqtt_receivedQos").toString();

//觸發(fā)事件這里不再做業(yè)務(wù)處理,包listener中做處理

eventPublisher.publishEvent(newMqttEvent(this,topic,message.getPayload().toString()));

*發(fā)送消息和消費消息Channel可以使用相同MqttPahoClientFactory

*@return

@Bean

@ServiceActivator(inputChannel="mqttOutboundChannel")

publicMessageHandlermqttOutbound(){

//在這里進行mqttOutboundChannel的相關(guān)設(shè)置

MqttPahoMessageHandlermessageHandler=newMqttPahoMessageHandler(mqttProperties.getClientId(),mqttClientFactory());

//如果設(shè)置成true,發(fā)送消息時將不會阻塞。

messageHandler.setAsync(true);

messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());

returnmessageHandler;

@Bean

publicMessageChannelmqttOutboundChannel(){

returnnewDirectChannel();

}

4.MQTT消息類

新建MqttEvent.java消息類。用于發(fā)送mqtt的消息

@Getter

publicclassMqttEventextendsApplicationEvent{

privateStringtopic;

*發(fā)送的消息

privateStringmessage;

publicMqttEvent(Objectsource,Stringtopic,Stringmessage){

super(source);

this.topic=topic;

this.message=message;

}

5.MQTT消息接收器

新建JobListener.java文件作為mqtt的消息接收類

@Slf4j

@Component

publicclassJobListener{

@Autowired

DeviceDaodeviceDao;

*監(jiān)聽topic

*@parammqttEvent

@EventListener(condition="#mqttEvent.topic.startsWith('pay')")

publicvoidonEmqttCall1(MqttEventmqttEvent)throwsException{

Stringtopic=mqttEvent.getTopic();

//寫邏輯處理

*監(jiān)聽topic

*@parammqttEvent

@EventListener(condition="#mqttEvent.topic.equals('device')")

publicvoidonEmqttCallT(MqttEventmqttEvent){

("接收到消11111111111:"+mqttEvent.getMessage());

}

6.MQTT消息發(fā)送器

新建MqttGateway.java提供發(fā)送mqttt消息的接口服務(wù)

@Component

@MessagingGateway(defaultRequestChannel="mqttOutboundChannel")

publicinterfaceMqttGateway{

voidsendToMqtt(Stringdata);

voidsendToMqtt(@Header(MqttHeaders.TOPIC)Stringtopic,Stringpayload);

voidsendToMqtt(@Header(MqttHe

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論