SpringcloudStream消息驅(qū)動(dòng)工具使用介紹_第1頁(yè)
SpringcloudStream消息驅(qū)動(dòng)工具使用介紹_第2頁(yè)
SpringcloudStream消息驅(qū)動(dòng)工具使用介紹_第3頁(yè)
SpringcloudStream消息驅(qū)動(dòng)工具使用介紹_第4頁(yè)
SpringcloudStream消息驅(qū)動(dòng)工具使用介紹_第5頁(yè)
已閱讀5頁(yè),還剩5頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第SpringcloudStream消息驅(qū)動(dòng)工具使用介紹目錄springcloudStream什么是springcloudStream什么是Binder為什么使用StreamStream使用案例前置知識(shí)Stream處理消息的架構(gòu)Stream常用注解消息生產(chǎn)者8801模塊搭建消息消費(fèi)者8802模塊搭建Stream帶來(lái)的問(wèn)題重復(fù)消費(fèi)問(wèn)題自定義分組持久化問(wèn)題

springcloudStream

什么是springcloudStream

現(xiàn)在市面上有很多的消息中間件,每一個(gè)公司使用的都有所不同,為了減少學(xué)習(xí)的成本,springcloudStream可以讓我們不再關(guān)注消息中間件MQ的具體細(xì)節(jié),我們只需要通過(guò)適配綁定的方式即可實(shí)現(xiàn)不同MQ之間的切換,但是遺憾的是springcloudStream目前只支持RabbitMQ和Kafka。

SpringCloudStream是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架,應(yīng)用程序通過(guò)inputs或者outputs來(lái)與SpringCloudStream中的binder進(jìn)行交互,我們可以通過(guò)配置來(lái)binding,而SpringCloudStream的binder負(fù)責(zé)與中間件交互,所以我們只需要搞清楚如何與Stream交互就可以很方便的使用消息驅(qū)動(dòng)了!

什么是Binder

Binder是SpringCloudStream的一個(gè)抽象概念,是應(yīng)用與消息中間件之間的粘合劑,通過(guò)定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離,可以動(dòng)態(tài)的改變消息的destinations(對(duì)應(yīng)于Kafka的topic,RabbitMQ的exchanges),這些都可以通過(guò)外部配置項(xiàng)來(lái)做到,甚至可以任意的改變中間件的類型但是不需要修改一行代碼

為什么使用Stream

比方說(shuō)我們用到了RabbitMQ和Kafka,由于這兩個(gè)消息中間件的架構(gòu)上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分區(qū),這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開(kāi)發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種,后面的業(yè)務(wù)需求,我想往另外一種消息隊(duì)列進(jìn)行遷移;這時(shí)候無(wú)疑就是一個(gè)災(zāi)難性的,一大堆東西都要重新推倒重新做,因?yàn)樗覀兊南到y(tǒng)耦合了,這襯候springcloudStream給我們提供了一種解耦合的方式。

Stream使用案例

前置知識(shí)

Stream處理消息的架構(gòu)

Source、Sink:簡(jiǎn)單的可理解為參照對(duì)象是SpringCloudStream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。Channel:通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介。Binder:消息的生產(chǎn)者和消費(fèi)者中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離

通過(guò)以上兩張圖片可知,消息的處理流向是:消息生產(chǎn)者處理完業(yè)務(wù)邏輯之后消息到達(dá)source中,接著前往Channel通道進(jìn)行排隊(duì),然后通過(guò)binder綁定器將消息數(shù)據(jù)發(fā)送到底層mq,然后又通過(guò)binder綁定器接收到底層mq發(fā)送來(lái)的消息數(shù)據(jù),接著前往Channel通道進(jìn)行排隊(duì),由Sink接收到消息數(shù)據(jù),消息消費(fèi)者拿到消息數(shù)據(jù)執(zhí)行相應(yīng)的業(yè)務(wù)邏輯

Stream常用注解

消息生產(chǎn)者8801模塊搭建

第一步:創(chuàng)建一個(gè)maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴

!--stream的rabbitmq依賴--

dependency

groupIdorg.springframework.cloud/groupId

artifactIdspring-cloud-starter-stream-rabbit/artifactId

/dependency

第二步:配置文件的編寫

server:

port:8801

spring:

application:

name:cloud-stream-provider

cloud:

stream:

binders:#在此處配置要綁定的rabbitmq的服務(wù)信息;

defaultRabbit:#表示定義的名稱,用于于binding整合

type:rabbit#消息組件類型

environment:#設(shè)置rabbitmq的相關(guān)的環(huán)境配置

spring:

rabbitmq:

host:localhost

port:5672

username:guest

password:guest

bindings:#服務(wù)的整合處理

output:#這個(gè)名字是一個(gè)通道的名稱

destination:studyExchange#表示要使用的Exchange名稱定義

content-type:application/json#設(shè)置消息類型,本次為json,文本則設(shè)置text/plain

binder:defaultRabbit#設(shè)置要綁定的消息服務(wù)的具體設(shè)置

eureka:

client:#客戶端進(jìn)行Eureka注冊(cè)的配置

service-url:

defaultZone:http://localhost:7001/eureka

第三步:主程序類

@SpringBootApplication

publicclassCloudStreamRabbitmqProvider8801Application{

publicstaticvoidmain(String[]args){

SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class,args);

System.out.println("啟動(dòng)成功");

}

第四步:業(yè)務(wù)層service代碼編寫,注意:這里實(shí)現(xiàn)類注入的對(duì)象由之前的dao層對(duì)象換成了channel通道對(duì)象,詳細(xì)的發(fā)送由實(shí)現(xiàn)類的第12完成

publicinterfaceIMessageProviderService{

*定義消息的推送管道

*@return

Stringsend();

@EnableBinding(Source.class)

publicclassMessageProviderServiceImplimplementsIMessageProviderService{

*消息發(fā)送管道/信道

@Resource

privateMessageChanneloutput;

@Override

publicStringsend(){

Stringserial=UUID.randomUUID().toString();

output.send(MessageBuilder.withPayload(serial).build());

System.out.println("*****serial:"+serial);

returnserial;

第五步:controller接口

@RestController

publicclassSendMessageController{

@Resource

privateIMessageProviderServicemessageProviderService;

@GetMapping(value="/sendMessage")

publicStringsendMessage(){

returnmessageProviderService.send();

}

消息消費(fèi)者8802模塊搭建

第一步:創(chuàng)建一個(gè)maven模塊,引入相關(guān)依賴,最主要的就是stream整合rabbitmq的依賴

!--stream的rabbitmq依賴--

dependency

groupIdorg.springframework.cloud/groupId

artifactIdspring-cloud-starter-stream-rabbit/artifactId

/dependency

第二步:配置文件的編寫,與生產(chǎn)者的區(qū)別就在于bindings下的是input而不是output

server:

port:8802

spring:

application:

name:cloud-stream-consumer

cloud:

stream:

binders:#在此處配置要綁定的rabbitmq的服務(wù)信息;

defaultRabbit:#表示定義的名稱,用于于binding整合

type:rabbit#消息組件類型

environment:#設(shè)置rabbitmq的相關(guān)的環(huán)境配置

spring:

rabbitmq:

host:localhost

port:5672

username:guest

password:guest

bindings:#服務(wù)的整合處理

input:#這個(gè)名字是一個(gè)通道的名稱

destination:studyExchange#表示要使用的Exchange名稱定義

content-type:application/json#設(shè)置消息類型,本次為json,文本則設(shè)置text/plain

binder:defaultRabbit#設(shè)置要綁定的消息服務(wù)的具體設(shè)置

eureka:

client:#客戶端進(jìn)行Eureka注冊(cè)的配置

service-url:

defaultZone:http://localhost:7001/eureka

第三步:主程序類

@SpringBootApplication

publicclassCloudStreamRabbitmqConsumer8802Application{

publicstaticvoidmain(String[]args){

SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class,args);

System.out.println("啟動(dòng)成功");

}

第四步:controller接口,使用url請(qǐng)求生產(chǎn)者8801,即可在消費(fèi)者8802端接收到8801發(fā)送的消息

@Component

@EnableBinding(Sink.class)

publicclassReceiveMessageListener{

@Value("${server.port}")

privateStringserverPort;

@StreamListener(Sink.INPUT)

publicvoidinput(MessageStringmessage){

System.out.println("消費(fèi)者1號(hào)----port:"+serverPort+"\t從8801接受到的消息是:"+message.getPayload());

兩個(gè)模塊搭建完成進(jìn)行測(cè)試,首先啟動(dòng)注冊(cè)中心7001,然后分別啟動(dòng)消息生產(chǎn)者8801和消息消費(fèi)者8802,通過(guò)url請(qǐng)求訪問(wèn)8001的發(fā)送消息請(qǐng)求,會(huì)向指定管道中發(fā)送一條消息,如果此時(shí)這個(gè)管道中有消費(fèi)者即可接收到這條消息。而如何指定消息的管道歸屬呢,就是通過(guò)配置文件中的indings.input.destination來(lái)指定,命名相同的服務(wù)就會(huì)處在同一條管道中

Stream帶來(lái)的問(wèn)題

重復(fù)消費(fèi)問(wèn)題

按照之前的使用,會(huì)帶來(lái)重復(fù)消費(fèi)問(wèn)題:也就是說(shuō)一個(gè)通道上有不止一個(gè)消息消費(fèi)者,stream上默認(rèn)每一個(gè)消費(fèi)者都屬于不同的組,這樣的話就會(huì)導(dǎo)致這個(gè)消息被多個(gè)組的消費(fèi)者重復(fù)消費(fèi)

知道了問(wèn)題出現(xiàn)的原因就很容易解決了,只要我們自定義配置分組,將這些消費(fèi)者都分配到同一個(gè)組中就能避免重復(fù)消費(fèi)的問(wèn)題出現(xiàn)了(同一個(gè)組間的消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,不管組間有多少的消費(fèi)者都只會(huì)消費(fèi)一次)

自定義分組

只需要在配置文件修改一處配置即可實(shí)現(xiàn)自定義組名并且自定義分組,組名相同的服務(wù)會(huì)被分配到同一組,通道內(nèi)的消息數(shù)據(jù)會(huì)被

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論