版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第8章SpringBoot消息服務(wù)第8章SpringBoot消息服務(wù)8.1
消息服務(wù)概述8.2
整合JMS8.3
整合AMQP28.1消息服務(wù)概述消息(Message)是指軟件對(duì)象之間進(jìn)行交互作用和通訊利用的一種方式。中間件(Middleware)是處于操作系統(tǒng)和應(yīng)用程序之間的軟件,也有人認(rèn)為它應(yīng)該屬于操作系統(tǒng)中的一部分。MQ全稱MessageQueue(消息隊(duì)列),是在消息的傳輸過(guò)程中保存消息的容器,多用于分布式系統(tǒng)之間進(jìn)行通信。
消息服務(wù)是將軟件與軟件之間的交互消息、交互方式進(jìn)行存儲(chǔ)和管理的一種技術(shù)。在多數(shù)應(yīng)用尤其是分布式系統(tǒng)中,消息服務(wù)是不可或缺的重要部分,它使用起來(lái)比較簡(jiǎn)單,同時(shí)也解決了不少難題,例如異步處理、應(yīng)用解耦、流量削峰、分布式事務(wù)管理等,使用消息服務(wù)可以實(shí)現(xiàn)一個(gè)高性能、高可用、高擴(kuò)展的系統(tǒng)。38.1消息服務(wù)概述
下面使用實(shí)際開(kāi)發(fā)中的若干場(chǎng)景來(lái)分析和說(shuō)明為什么要使用消息服務(wù),以及使用消息服務(wù)的好處。1.異步處理應(yīng)用場(chǎng)景說(shuō)明:用戶注冊(cè)后,系統(tǒng)需要將信息寫入數(shù)據(jù)庫(kù),并發(fā)送注冊(cè)郵件和注冊(cè)短息通知,下面來(lái)講解不同的處理方式,如何來(lái)處理注冊(cè)業(yè)務(wù)需求。(1)串行處理方式(2)并行處理方式(3)消息服務(wù)處理方式48.1消息服務(wù)概述
2.應(yīng)用解耦應(yīng)用場(chǎng)景說(shuō)明:訂單系統(tǒng)庫(kù)存系統(tǒng),用戶下單后,訂單服務(wù)需要通知庫(kù)存服務(wù)。如果采用傳統(tǒng)方式處理訂單業(yè)務(wù),下單后,訂單服務(wù)會(huì)直接調(diào)用庫(kù)存服務(wù)接口進(jìn)行庫(kù)存更新,如圖8-4所示。傳統(tǒng)方式應(yīng)用解耦一旦庫(kù)存系統(tǒng)出現(xiàn)異常,訂單服務(wù)會(huì)失敗導(dǎo)致訂單丟失。如果使用消息服務(wù)模式應(yīng)用解耦,訂單服務(wù)的下單會(huì)快速寫入消息隊(duì)列,庫(kù)存服務(wù)會(huì)監(jiān)聽(tīng)并讀取到訂單,從而修改庫(kù)存,如圖8-5所示。
相較于傳統(tǒng)方式,消息服務(wù)模式顯得更加高效、可靠。58.1消息服務(wù)概述
3.流量削峰應(yīng)用場(chǎng)景說(shuō)明:在大型購(gòu)物節(jié)時(shí),購(gòu)物網(wǎng)站開(kāi)展秒殺活動(dòng),是流量削峰的一種應(yīng)用場(chǎng)景。秒殺活動(dòng)一般由于瞬時(shí)訪問(wèn)量過(guò)大,服務(wù)器接收過(guò)大,會(huì)導(dǎo)致流量暴增,相關(guān)系統(tǒng)無(wú)法處理請(qǐng)求甚至崩潰。為了解決這個(gè)問(wèn)題,通常會(huì)采用消息隊(duì)列緩沖瞬時(shí)高峰流量,系統(tǒng)可以從消息隊(duì)列中讀取數(shù)據(jù),對(duì)請(qǐng)求進(jìn)行分層過(guò)濾,從而過(guò)濾掉一些請(qǐng)求,如圖8-6所示。68.1消息服務(wù)概述
4.分布式事務(wù)管理應(yīng)用場(chǎng)景說(shuō)明:在分布式系統(tǒng)中,分布式事務(wù)是開(kāi)發(fā)中必須要面對(duì)的技術(shù)難題,分布式系統(tǒng)的請(qǐng)求業(yè)務(wù)處理的數(shù)據(jù)一致性通常是重點(diǎn)考慮的問(wèn)題,較為可靠的處理方式是基于消息隊(duì)列的二次提交。在失敗的情況下可以進(jìn)行多次嘗試,或者基于隊(duì)列數(shù)據(jù)進(jìn)行回滾操作。因此,在分布式系統(tǒng)中加入消息服務(wù)是一個(gè)既能保證性能不變,又能保證業(yè)務(wù)一致性的方案。針對(duì)這種分布式事務(wù)處理的需求,使用消息服務(wù)的處理機(jī)制,如圖8-7所示。78.1消息服務(wù)概述消息隊(duì)列中間件,是一種異步通訊的中間件,利用高效可靠的異步消息傳遞機(jī)制進(jìn)行與平臺(tái)無(wú)關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來(lái)進(jìn)行分布式系統(tǒng)的集成。
消息中間件也可以看做是一種容器,可以理解為郵局,發(fā)送者將消息投遞到郵局,然后郵局幫我們發(fā)送給具體的接收者,數(shù)據(jù)的發(fā)送和接收是由郵局來(lái)完成。
當(dāng)前使用較多的消息隊(duì)列中間件有ActiveMQ、RabbitMQ、Kafka、RocketMQ等。目前市面上消息中間件各有重點(diǎn),選擇適合自己的無(wú)疑是最好的選擇,下面通過(guò)對(duì)比來(lái)了解各消息隊(duì)列中間件,如表8-1所示。88.2整合JMS
8.2.1JMS簡(jiǎn)介JMS(JavaMessageService)即Java消息服務(wù),它是JavaEE技術(shù)規(guī)范中的一個(gè)重要組成部分,是一種企業(yè)消息機(jī)制的規(guī)范。通過(guò)統(tǒng)一JavaAPI層面的標(biāo)準(zhǔn),使得多個(gè)客戶端可以通過(guò)JMS進(jìn)行交互,在應(yīng)用程序之間或分布式系統(tǒng)中發(fā)送、接受消息,從而進(jìn)行異步通信。大部分消息中間件提供商都對(duì)JMS提供支持。JMS和ActiveMQ的關(guān)系就像JDBC和JDBC驅(qū)動(dòng)的關(guān)系。JMS具有以下的優(yōu)勢(shì)。(1)異步:JMS天生就是異步的,客戶端獲取消息的時(shí)候,不需要主動(dòng)發(fā)送請(qǐng)求,消息會(huì)自動(dòng)發(fā)送給可用的客戶端。(2)可靠:JMS保證消息只會(huì)傳遞一次。98.2.1JMS簡(jiǎn)介
JMS消息機(jī)制模型主要分為2類。(1)P2P模型:即點(diǎn)對(duì)點(diǎn)消息傳遞模型(Point-to-Point),生產(chǎn)者生產(chǎn)了一個(gè)消息,只能由一個(gè)消費(fèi)者進(jìn)行消費(fèi)。每個(gè)消息只能有一個(gè)消費(fèi)者,類似于1對(duì)1的關(guān)系,好比個(gè)人快遞自己領(lǐng)自己的。消息的生產(chǎn)者和消費(fèi)者之間沒(méi)有時(shí)間上的相關(guān)性。無(wú)論消費(fèi)者在生產(chǎn)者發(fā)送消息的時(shí)候是否處于運(yùn)行狀態(tài),消費(fèi)者都可以提取消息。好比我們的發(fā)送短信,發(fā)送者發(fā)送后不見(jiàn)得接收者會(huì)即收即看。消息被消費(fèi)后隊(duì)列中不會(huì)再存儲(chǔ),所以消費(fèi)者不會(huì)消費(fèi)到已經(jīng)被消費(fèi)掉的消息。108.2.1JMS簡(jiǎn)介
(2)Pub-Sub模型:即基于發(fā)布/訂閱消息傳遞模型(Publish/Subscribe),生產(chǎn)者生產(chǎn)了一個(gè)消息,可以由多個(gè)消費(fèi)者進(jìn)行消費(fèi)。生產(chǎn)者將消息發(fā)布到主題中,每個(gè)消息可以有多個(gè)消費(fèi)者,屬于1:N的關(guān)系。生產(chǎn)者和消費(fèi)者之間有時(shí)間上的相關(guān)性,訂閱某一個(gè)主題的消費(fèi)者只能消費(fèi)自它訂閱之后發(fā)布的消息。生產(chǎn)者生產(chǎn)時(shí),主題不保存消息它是無(wú)狀態(tài)的不落地,假如無(wú)人訂閱就去生產(chǎn),那就是一條廢消息。
在實(shí)際工作中實(shí)現(xiàn)JMS服務(wù)的規(guī)范有很多,比較常用的有傳統(tǒng)的ActiveMQ和分布式的Kafka。為了更為可靠和安全,還存在AMQP協(xié)議,實(shí)現(xiàn)AMQP比較常用的有RabbitMQ等。118.2.2SpringBoot整合JMS
由于JMS是一套標(biāo)準(zhǔn)規(guī)范,因此SpringBoot整合JMS必然就是整合JMS的某一實(shí)現(xiàn),而底層JMS的實(shí)現(xiàn)則可選擇ActiveMQ、ActiveMQArtemis或JORAM等。這里以ActiveMQ作為JMS實(shí)現(xiàn)來(lái)介紹SpringBoot對(duì)JMS的支持。1.ActiveMQ安裝訪問(wèn)/,下載apache-activemq-5.16.3-bin.zip的Windows版本文件,啟動(dòng)ActiveMQ,如圖8-8所示。通過(guò)瀏覽器訪問(wèn)http://localhost:8161/admin/,提示輸入用戶名和密碼,默認(rèn)為admin和admin,進(jìn)入后能看到如圖8-9所示的界面,表示ActiveMQ已經(jīng)啟動(dòng)成功。128.2.2SpringBoot整合JMS2.整合SpringBoot(1)創(chuàng)建基于ActiveMQ的SpringBoot項(xiàng)目新建一個(gè)SpringBoot工程chapter08activemq,Group和Packagename為com.yzpc,在Dependencies依賴中選擇Web下的SpringWeb依賴,Messaging下的SpringforApacheActiveMQ5依賴,單擊Finish按鈕,如圖6-17所示。pom.xml中自動(dòng)添加的依賴代碼,如下所示。
(2)配置ActiveMQ的消息代理地址在項(xiàng)目的配置文件perties中,配置ActiveMQ的消息代理地址等信息,配置代碼如下。13<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!--省略Web中SpringWeb依賴代碼
-->#ActiveMQ服務(wù)器地址spring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=admin#設(shè)置是Queue隊(duì)列還是Topic,false為Queue,true為Topic,默認(rèn)falsespring.jms.pub-sub-domain=false#spring.jms.pub-sub-domain=true#變量,定義隊(duì)列和topic的名稱myqueue:activemq-queuemytopic:activemq-topic8.2.2SpringBoot整合JMS(3)新建ActiveMQ配置類ConfigBean在項(xiàng)目的src/main/java/路徑下的com.yzpc包中,新建一個(gè)config包,并在該包中新建ActiveMQ配置類ConfigBean,配置了Queue隊(duì)列和topic兩種模式,代碼如下所示。(4)隊(duì)列模式下創(chuàng)建隊(duì)列生產(chǎn)者和隊(duì)列消費(fèi)者在項(xiàng)目的src/main/java/路徑下的com.yzpc包中,新建一個(gè)controller包,并在該包中新建QueueProducerController類為隊(duì)列生產(chǎn)者控制器,主要向消息隊(duì)列中發(fā)送消息,代碼如下所示。在com.yzpc.controller包中,新建QueueConsumerController類為隊(duì)列消費(fèi)者控制器,代碼如下所示。14@Component@EnableJmspublicclassConfigBean{
@Value("${myqueue}")
privateStringqueueName;
@Value("${mytopic}")
privateStringtopicName;
@Bean
//隊(duì)列
publicQueuequeue(){
returnnewActiveMQQueue(queueName);}
@Bean
//主題
publicTopictopic(){
returnnewActiveMQTopic(topicName);}}//隊(duì)列Queue消息生產(chǎn)者@RestControllerpublicclassQueueProducerController{
@Autowired
privateJmsMessagingTemplatejmsMessagingTemplate;
@Autowired
privateQueuequeue;
//消息生產(chǎn)者
@RequestMapping("/sendmsg")
publicvoidsendmsg(Stringmsg){
System.out.println("發(fā)送消息到隊(duì)列:"+msg);
//指定消息發(fā)送的目的地及內(nèi)容
this.jmsMessagingTemplate.convertAndSend(this.queue,msg);}}packagecom.yzpc.controller;importorg.springframework.jms.annotation.JmsListener;importorg.springframework.web.bind.annotation.RestController;//隊(duì)列Queue消費(fèi)者控制器@RestControllerpublicclassQueueConsumerController{
//消費(fèi)者接收消息
@JmsListener(destination="${myqueue}")
publicvoidreadActiveQueue(Stringmessage){
System.out.println("接受到:"+message);}}8.2.2SpringBoot整合JMS(5)運(yùn)行測(cè)試項(xiàng)目啟動(dòng)項(xiàng)目,訪問(wèn)http://localhost:8080/sendmsg?msg=HelloActiveMQQueue!,向消息隊(duì)列中發(fā)送消息”HelloActiveMQQueue!”,控制臺(tái)的輸出效果,如圖8-11所示。在ActiveMQ管理界面上,點(diǎn)擊“Queues”的鏈接,顯示出在perties配置文件定義的“activemq-queue”的隊(duì)列名稱,其效果如圖8-12所示。ActiveMQ管理界面的Queues模式中的屬性含義如下。NumberOfPendingMessages:消息隊(duì)列中待處理消息。NumberOfConsumers:消費(fèi)者的數(shù)量。MessagesEnqueued:累計(jì)進(jìn)入過(guò)消息隊(duì)列的總量。MessagesDequeued:累計(jì)消費(fèi)過(guò)的消息總量。158.2.2SpringBoot整合JMS(6)Topic模式下創(chuàng)建隊(duì)列生產(chǎn)者和隊(duì)列消費(fèi)者在pertie配置文件中將spring.jms.pub-sub-domain屬性的值設(shè)置為true,即為主題模式。在com.yzpc.controller的包下,新建TopicProducerController類為Topic生產(chǎn)者控制器,主要向消息隊(duì)列中發(fā)送消息,代碼如下所示。在com.yzpc.controller的包中,新建TopicConsumerController類為Topic消費(fèi)者控制器,其中寫了兩個(gè)消費(fèi)者方法,可以理解為有兩個(gè)用戶訂閱,代碼如下所示。16//Topic消息生產(chǎn)者@RestControllerpublicclassTopicProducerController{
@Autowired
privateJmsMessagingTemplatejmsMessagingTemplate;
@Autowired
privateTopictopic;
//消息生產(chǎn)者
@RequestMapping("/topicsendmsg")
publicvoidsendmsg(Stringmsg){
System.out.println("發(fā)送消息到MQ:"+msg);
//指定消息發(fā)送的目的地及內(nèi)容
this.jmsMessagingTemplate.convertAndSend(this.topic,msg);}}//Topic消費(fèi)者控制器@RestControllerpublicclassTopicConsumerController{
//消費(fèi)者接收消息
@JmsListener(destination="${mytopic}")
publicvoidreadActiveTopic(Stringmessage){
System.out.println("接受到:"+message);}
@JmsListener(destination="${mytopic}")
publicvoidreadActiveTopic1(Stringmessage){
System.out.println("接受到:"+message);}}8.2.2SpringBoot整合JMS重新啟動(dòng)項(xiàng)目,訪問(wèn)http://localhost:8080/topicsendmsg?msg=HelloActiveMQTopic!,向消息隊(duì)列中發(fā)送消息“HelloActiveMQTopic!”,控制臺(tái)輸出效果(有兩個(gè)消費(fèi)者方法),如圖8-13所示。在ActiveMQ管理界面上,點(diǎn)擊“Topics”的鏈接,顯示出在perties配置文件定義的“activemq-topic”的Topic名稱,如圖8-14所示。ActiveMQ管理界面的Topics模式中的屬性含義如下。
NumberOfConsumers:消費(fèi)者的數(shù)量。
MessagesEnqueued:累計(jì)進(jìn)入過(guò)消息隊(duì)列的總量。MessagesDequeued:累計(jì)消費(fèi)過(guò)的消息總量。178.3整合AMQP
AMQP(AdvancedMessageQueuingProtocol)為高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端、中間件的不同產(chǎn)品、不同的開(kāi)發(fā)語(yǔ)言等條件的限制。8.3.1RabbitMQ1.RabbitMQ簡(jiǎn)介RabbitMQ是基于AMQP協(xié)議的輕量級(jí)、可靠、可伸縮和可移植的消息代理,是Rabbit技術(shù)公司于2007年基于AMQP標(biāo)準(zhǔn)開(kāi)發(fā)的,并提供對(duì)其的支持。RabbitMQ是目前應(yīng)用相當(dāng)廣泛的消息中間件。在企業(yè)級(jí)應(yīng)用、微服務(wù)應(yīng)用中,RabbitMQ擔(dān)當(dāng)著十分重要的角色,RabbitMQ對(duì)消息的處理流程,如圖8-13所示。188.3.1RabbitMQ
2.RabbitMQ基礎(chǔ)架構(gòu)在所有的消息服務(wù)中,消息中間件都會(huì)作為一個(gè)第三方消息代理,接收發(fā)布者發(fā)布的消息,并推送給消息消費(fèi)者。不同消息中間件內(nèi)部轉(zhuǎn)換消息的細(xì)節(jié)不同,RabbitMQ的基礎(chǔ)架構(gòu),如圖8-14所示。198.3.1RabbitMQRabbitMQ做了一層抽象,在發(fā)消息者和隊(duì)列之間,加入了交換器(Exchange)。這樣發(fā)消息者和隊(duì)列就沒(méi)有直接聯(lián)系,轉(zhuǎn)而變成發(fā)消息者把消息交給交換器,交換器根據(jù)調(diào)度策略再把消息再給隊(duì)列。RabbitMQ的基礎(chǔ)架構(gòu)中有很多細(xì)節(jié)內(nèi)容和內(nèi)部組件,部分說(shuō)明如下。Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括Exchange和Queue兩個(gè)部分。Exchange:消息隊(duì)列交換器,按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對(duì)消息進(jìn)行過(guò)慮。Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)方。Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消費(fèi)發(fā)送到MQ。Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。208.3.1RabbitMQ
使用RabbitMQ進(jìn)行消息發(fā)布和接收的流程。(1)發(fā)送消息生產(chǎn)者和Broker建立TCP連接。生產(chǎn)者和Broker建立通道。生產(chǎn)者通過(guò)通道消息發(fā)送Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)。(2)接收消息消費(fèi)者和Broke建立TCP連接。消費(fèi)者和Broker建立通道。消費(fèi)者監(jiān)聽(tīng)指定的Queue(隊(duì)列)。當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。消費(fèi)者接收到消息。218.3.1RabbitMQ3.RabbitMQ工作模式RabbitMQ消息中間件針對(duì)不同的服務(wù)需求,提供了多種工作模式,下面對(duì)RabbitMQ支持的工作模式進(jìn)行說(shuō)明。(1)簡(jiǎn)單模式
生產(chǎn)者把消息放入隊(duì)列,消費(fèi)者獲得消息,這個(gè)模式只有一個(gè)消費(fèi)者、一個(gè)生產(chǎn)者、一個(gè)隊(duì)列,只需要配置主機(jī)參數(shù),其他參數(shù)使用默認(rèn)值即可通信,如8-15所示。(2)工作隊(duì)列模式
這種模式出現(xiàn)了多個(gè)消費(fèi)者,為了保證消費(fèi)者之間負(fù)載均衡和同步,需要在消息隊(duì)列之間加上同步功能,如圖8-16所示。228.3.1RabbitMQ(3)發(fā)布訂閱模式發(fā)布訂閱模式也稱為交換器模式,在該模式中,必須先配置一個(gè)fanout類型的交換器,不需要指定的路由鍵,同時(shí)會(huì)將消息路由到每一個(gè)消息隊(duì)列中上,然后每個(gè)消息隊(duì)列都可對(duì)相同的消息進(jìn)行接收存儲(chǔ),進(jìn)而由各自消息隊(duì)列關(guān)聯(lián)的消費(fèi)者進(jìn)行消費(fèi),如圖8-17所示。(4)路由模式在路由工作模式中,必須先配置一個(gè)direct類型的交換器,并指定不同的路由鍵值(RoutingKey)將對(duì)應(yīng)的消息從交換器路由到不同的消息隊(duì)列中進(jìn)行存儲(chǔ),由消費(fèi)者進(jìn)行各自消費(fèi),如圖8-18所示。238.3.1RabbitMQ(5)主題模式在主題轉(zhuǎn)發(fā)工作模式中,必須先配置一個(gè)topic類型的交換器,并指定不同的路由鍵值(RoutingKey)將對(duì)應(yīng)的消息從交換器路由到不同的消息隊(duì)列進(jìn)行存儲(chǔ),然后由消費(fèi)者進(jìn)行各自消費(fèi),如圖8-19所示。(6)RPC模式RPC模式與工作隊(duì)列模式主體流程相似,不需要設(shè)置交換器,需指定唯一消息隊(duì)列進(jìn)行消費(fèi)。RPC模式是一個(gè)回環(huán)結(jié)構(gòu),主要針對(duì)分布式架構(gòu)的消息傳遞業(yè)務(wù),客戶端先發(fā)送消息到消息隊(duì)列,遠(yuǎn)程服務(wù)端獲取消息,然后再寫入另一個(gè)消息隊(duì)列,向原始客戶端相應(yīng)消息處理結(jié)果,如圖8-20所示。248.3.2安裝RabbitMQ以及整合環(huán)境搭建
1.下載安裝RabbitMQ
RabbitMQ是基于erlang語(yǔ)言開(kāi)發(fā)的,先下載安裝Erlang,Erlang語(yǔ)言的下載地址為/downloads,RabbitMQ的下載地址為/rabbitmq/rabbitmq-server/releases/,有安裝版和解壓版。此處下載的erlang語(yǔ)言版本為”otp_win64_24.0.exe”,下載的RabbitMQ版本是”rabbitmq-server-3.9.4.exe”,此版本需要Erlang23.2或更新版本,并支持Erlang24.0。運(yùn)行Erlang語(yǔ)言安裝包“otp_win64_24.0.exe”,按照提示一步一步操作即可完成Erlang語(yǔ)言包的安裝。安裝Erlang后需要配置環(huán)境變量ERLANG_HOME,變量值為Erlang選擇安裝的具體路徑,并在path中新增%ERLANG_HOME%\bin,如圖8-15和圖8-16所示。
258.3.2安裝RabbitMQ以及整合環(huán)境搭建運(yùn)行RabbitMQ安裝包“rabbitmq-server-3.9.4.exe”,按照提示一步一步操作即可完成RabbitMQ安裝。然后將安裝路徑下的sbin子目錄添加到環(huán)境變量的Path中。運(yùn)行rabbitmq-pluginsenablerabbitmq_management命令,開(kāi)啟rabbitmq_management插件,為RabbitMQ的管理界面提供支持,如圖8-17所示。
注:如果最后一行出現(xiàn)PluginConfigurationunchanged的提示,表明插件啟用失敗,可刪除用戶目錄(C:\Users\<用戶名>)下AppData\Roaming\RabbitMQ子目錄里的內(nèi)容,而后重新運(yùn)行上述命令。運(yùn)行rabbitmq-server.bat命令啟動(dòng)RabbitMQ服務(wù)器,RabbitMQ服務(wù)器啟動(dòng)完成后,信息提示如圖8-18所示。268.3.2安裝RabbitMQ以及整合環(huán)境搭建
RabbitMQ默認(rèn)提供兩個(gè)端口號(hào)5672和15762,其中5672用作服務(wù)端口號(hào),15762用作可視化管理端口號(hào)。打開(kāi)瀏覽器訪問(wèn)http://localhost:15672/,進(jìn)入登錄頁(yè)面,如圖8-19所示。在該界面中輸入默認(rèn)的賬號(hào)和密碼(guest和guest),進(jìn)入RabbitMQ的Web管理界面,通過(guò)可視化方式查看RabbitMQ,如圖8-20所示。在可視化管理頁(yè)面中,顯示出了RabbitMQ的版本,用戶信息等,界面上方可以看到Connections、Channels、Exchanges、Queues、Admin等標(biāo)簽。278.3.2安裝RabbitMQ以及整合環(huán)境搭建2.SpringBoot整合RabbitMQ環(huán)境搭建下面對(duì)SpringBoot整合RabbitMQ實(shí)現(xiàn)消息服務(wù)需要的整合環(huán)境進(jìn)行搭建,步驟如下所示。(1)使用SpringInitializr方式創(chuàng)建一名為chapter08rabbitmq的項(xiàng)目,Group和Packagename為com.yzpc,在Dependencies依賴中選擇Web節(jié)點(diǎn)下的SpringWeb依賴,Messaging節(jié)點(diǎn)下的SpringforRabbitMQ依賴,單擊Finish按鈕,如圖8-21所示。pom.xml中自動(dòng)添加的依賴代碼,如下所示。(2)打開(kāi)項(xiàng)目創(chuàng)建時(shí)自動(dòng)生成的perties配置文件,添加RabbitMQ服務(wù)對(duì)應(yīng)的連接配置,內(nèi)容如下。28<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--省略Web中SpringWeb依賴代碼-->#配置RabbitMQ消息中間件連接配置spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest#配置RabbitMQ的虛擬主機(jī)路徑/,默認(rèn)可以省略spring.rabbitmq.virtual-host=/8.3.3SpringBoot整合RabbitMQ實(shí)現(xiàn)在RabbitMQ的后幾種工作模式中,消息由Exchange進(jìn)行再分配,Exchange會(huì)根據(jù)不同的策略將消息分發(fā)到不同Queue中。下面選取常用的發(fā)布訂閱模式、路由模式和主題模式完成在SpringBoot項(xiàng)目中的消息服務(wù)整合實(shí)現(xiàn)。1.發(fā)布訂閱模式(Publisher/Subscribe)該模式策略是把所有到達(dá)交換器的消息轉(zhuǎn)發(fā)給所有與它綁定的隊(duì)列。SpringBoot整合RabbitMQ中間件實(shí)現(xiàn)消息服務(wù),主要圍繞定制中間件、生產(chǎn)者發(fā)送消息、消費(fèi)者接收消息這個(gè)3個(gè)部分的進(jìn)行,具體步驟如下。(1)基于配置類方式實(shí)現(xiàn)消息發(fā)送組件在項(xiàng)目的src/main/java/目錄下的com.yzpc包中,新建一個(gè)config包,并在該包中新建RabbitMQConfig的消息配置類,用來(lái)定制消息發(fā)送相關(guān)組件,代碼如下所示。
29packagecom.yzpc.config;importorg.springframework.amqp.core.*;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
@Configuration
publicclassRabbitMQConfig{
//定義fanout類型的轉(zhuǎn)換器
@Bean
publicFanoutExchangefanoutExchange(){
returnnewFanoutExchange("fanout_exchange");
}
//定義兩個(gè)不同的消息隊(duì)列
@Bean
publicQueuefanoutQueue1(){
returnnewQueue("fanout_queue1");
}
@Bean
publicQueuefanoutQueue2(){
returnnewQueue("fanout_queue2");
}
@Bean//將兩個(gè)不同名稱的消息隊(duì)列與交換器進(jìn)行綁定
publicBindingfanoutBinding1(){
returnBindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
publicBindingfanoutBinding2(){
returnBindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}8.3.3SpringBoot整合RabbitMQ實(shí)現(xiàn)
(2)消息生產(chǎn)者發(fā)送消息在項(xiàng)目的測(cè)試類Chapter08rabbitmqApplicationTests中,創(chuàng)建消息發(fā)送者發(fā)送消息隊(duì)列,這里使用Spring框架提供的AmqpTemplate模板類實(shí)現(xiàn)消息發(fā)送,代碼如下所示。運(yùn)行單元測(cè)試方法fanoutTest(),在控制臺(tái)可以看到相應(yīng)的“向fanout_exchange交換器發(fā)送消息:HelloFanout!”的輸出語(yǔ)句。驗(yàn)證RabbitMQ消息組件的效果,通過(guò)RabbitMQ可視化頁(yè)面Exchanges面板,看到一個(gè)類型為fanout的交換器,名稱為fanout_exchange,其余7個(gè)交換器是RabbitMQ自帶的,如圖8-22所示。單擊fanout_exchange交換器進(jìn)入詳情頁(yè),在Bindings節(jié)點(diǎn)下,可以看到綁定的兩個(gè)消息隊(duì)列,如圖8-23所示。30@AutowiredprivateAmqpTemplateamqpTemplate;
privateStringmsg="HelloFanout!";
@Test
publicvoidfanoutTest(){
amqpTemplate.convertAndSend("fanout_exchange",null,msg);
System.out.println("向fanout_exchange交換器發(fā)送消息:"+msg);
}8.3.3SpringBoot整合RabbitMQ實(shí)現(xiàn)在可視化管理頁(yè)面,點(diǎn)擊Queues面板,查看定制生成的兩個(gè)消息隊(duì)列信息,由于目前尚未提供消費(fèi)者,在發(fā)布訂閱模式下綁定的兩個(gè)消息隊(duì)列中都擁有一條待接收的消息,所以測(cè)試類發(fā)送的消息會(huì)暫存在隊(duì)列。如圖8-24所示。單擊fanout_queue1消息隊(duì)列進(jìn)入隊(duì)列詳情頁(yè),在GetMessages節(jié)點(diǎn)下,單擊GetMessage(s)按鈕,可以看到消息信息,如圖8-25所示。318.3.3SpringBoot整合RabbitMQ實(shí)現(xiàn)
(3)消息消費(fèi)者接收消息在項(xiàng)目src/main/java/目錄下的com.yzpc包中,新建一個(gè)receiver包,并在該包中新建RabbitMQReceiver業(yè)務(wù)類,用來(lái)實(shí)現(xiàn)消息的接收和處理,代碼如下所示。啟動(dòng)Chapter08rabbitmqApplication,控制臺(tái)顯示的消息消費(fèi)信息,如圖8-26所示。32@Component
publicclassRabbitMQReceiver
{
//發(fā)布訂閱工作模式下接收、處理業(yè)務(wù)
@RabbitListener(queues="fanout_queue1")
publicvoidreceiveFanout1(Stringmsg){
System.out.println("fanout_queue1隊(duì)列監(jiān)聽(tīng)接收到消息:"+msg);
}
@RabbitListener(queues="fanout_queue2")
publicvoidreceiveFanout2(Stringmsg){
System.out.println("fanout_queue2隊(duì)列監(jiān)聽(tīng)接收到消息:"+msg);
}
}
8.3.3SpringBoot整合RabbitMQ實(shí)現(xiàn)
2.路由模式(Routing)
路由模式策略是將消息隊(duì)列綁定到一個(gè)direct類型的交換器上,當(dāng)消息到達(dá)該交換器時(shí)會(huì)被轉(zhuǎn)發(fā)到與該條消息routingkey相同的隊(duì)列上。這里使用基于注解的方式實(shí)現(xiàn)路由模式的整合進(jìn)行講解。
(1)定制消息組件和消息消費(fèi)者打開(kāi)com.yzpc.receiver包中的RabbitMQReceiver業(yè)務(wù)類,使用@RabbitListener注解及其相關(guān)屬性定制路由模式的消息組件,并編寫消息消費(fèi)者接收消息的方法,代碼如下所示。(2)消息生產(chǎn)者發(fā)送消息在項(xiàng)目測(cè)試類Chapter08rabbitmqApplicationTests中,使用RabbitTemplate模板類實(shí)現(xiàn)路由模式下的消息發(fā)送,代碼如下所示。33//2.1路由模式消息接收,處理業(yè)務(wù)1
@RabbitListener(bindings=@QueueBinding(value=@Queue("routing_queue1"),exchange=@Exchange(value="routing_exchange",type="direct"),key="routing_key1"))
publicvoidreceiveRouting1(Stringmessage){
System.out.println("routing_queue1隊(duì)列監(jiān)聽(tīng)接收消息:"+message);
}
//2.2路由模式消息接收,處理業(yè)務(wù)2
@RabbitListener(bindings=@QueueBinding(value=@Queue("routing_queue2"),exchange=@Exchange(value="routing_exchange",type="direct"),key={"routing_key1","routing_key2","routing_key3"}))
publicvoidreceiveRouting2(Stringmessage){
System.out.println("routing_queue2隊(duì)列監(jiān)聽(tīng)接收消息:"+message);
}@AutowiredprivateRabbitTemplate
rabbitTemplate;privateStringrouting_msg="routingsendrouting_key1message!";@Test
publicvoidroutingTest(){
rabbitTemplate.convertAndSend("routing_exchange","routing_key1",routing_msg);
System.out.println("向routing_exchange交換器發(fā)送消息:"+routing_msg);
}
8.3.3SpringBoot整合RabbitMQ實(shí)現(xiàn)執(zhí)行上述測(cè)試方法routingTest(),控制臺(tái)輸出顯示,如圖8-27所示。修改routingTest()方法中的消息傳遞參數(shù),調(diào)整發(fā)送routing_key2級(jí)別消息(同時(shí)修改路由鍵和消息內(nèi)容),再次運(yùn)行routingTest()方法,控制臺(tái)輸出顯示,如圖8-28所示。打開(kāi)RabbitMQ可視化管理頁(yè)面查看定制的Routing模式的消息組件,使用注解方式同樣自動(dòng)生成路由模式下的消息組件,并進(jìn)行了自動(dòng)綁定,如圖8-29所示。348.3.3SpringBoot整合RabbitMQ實(shí)現(xiàn)
3.主題模式(Topic)
主題模式策略與路由模式大致相同,不同的是主題模式通過(guò)通配符訂閱多個(gè)主題的消息。下面以不同用戶對(duì)不同消息的訂閱需求為例,使用基于注解的方式來(lái)實(shí)現(xiàn)Topic通配符模式的整合進(jìn)行講解。(1)定制消息組件和消息消費(fèi)者
打開(kāi)com.yzpc.receiver包中的RabbitMQReceiver業(yè)務(wù)類,在該類中使用@RabbitListener注解及其相關(guān)屬性定制Topics通配符模式的消息組件,并編寫消息消費(fèi)者接收消息的方法,代碼如下所示。
(2)消息生產(chǎn)者發(fā)送消息在項(xiàng)目測(cè)試類Chapter08rabbitmqApplicationTests中,使用RabbitTemplate模板類實(shí)現(xiàn)消息發(fā)送,代碼如下所示。35//3.1通配符模式消息接收,進(jìn)行業(yè)務(wù)1訂閱處理@RabbitL
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026年廈門東海職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)傾向性測(cè)試題庫(kù)含答案詳解
- 2026年應(yīng)天職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)技能考試題庫(kù)及參考答案詳解1套
- 2026年長(zhǎng)江師范學(xué)院?jiǎn)握新殬I(yè)傾向性測(cè)試題庫(kù)及答案詳解一套
- 2026年廈門工學(xué)院?jiǎn)握新殬I(yè)適應(yīng)性考試題庫(kù)參考答案詳解
- 2026年單招適應(yīng)性考試題庫(kù)附答案詳解
- 森林消防員面試題及答案
- 護(hù)士仿真面試題及答案
- 2025年宜賓市敘州區(qū)婦幼保健計(jì)劃生育服務(wù)中心第二次公開(kāi)招聘聘用人員備考題庫(kù)及參考答案詳解
- 2025年市屬國(guó)企派遣員工招聘?jìng)淇碱}庫(kù)及一套答案詳解
- 2025年晉中健康學(xué)院青年教師招聘6人備考題庫(kù)及答案詳解1套
- 三通、大小頭面積計(jì)算公式
- 軟件無(wú)線電原理與應(yīng)用(第3版)-習(xí)題及答案匯總 第1-9章 虛擬人-軟件無(wú)線電的新發(fā)展 認(rèn)知無(wú)線電
- 中級(jí)會(huì)計(jì)實(shí)務(wù)-存貨
- 機(jī)械電氣設(shè)備管理制度
- 簡(jiǎn)單酒水購(gòu)銷合同
- GB/T 41933-2022塑料拉-拉疲勞裂紋擴(kuò)展的測(cè)定線彈性斷裂力學(xué)(LEFM)法
- 高中語(yǔ)文 選修中冊(cè) 第四課時(shí) 展示強(qiáng)大思想力量 邏輯思維在著作中提升-《改造我們的學(xué)習(xí)》《人的正確思想是從哪里來(lái)的》
- 大學(xué)化學(xué)試題庫(kù)
- GCB發(fā)電機(jī)出口斷路器教育課件
- 柑桔周年管理工作歷第二版課件
- 半導(dǎo)體異質(zhì)結(jié)課件
評(píng)論
0/150
提交評(píng)論