版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
Redis這節(jié)課,我們就來(lái)聊一聊消息隊(duì)列的特征和Reis提供的消息隊(duì)列方案。只有把這兩方面的知識(shí)和實(shí)踐經(jīng)驗(yàn)串連起來(lái),才能徹底理解基于Redis實(shí)現(xiàn)消息隊(duì)列的技術(shù)實(shí)踐。以后當(dāng)你需要為分布式系統(tǒng)組件做消息隊(duì)列選型時(shí),就可以根據(jù)組件通信量和消息通信速度的要求,選擇出適合的Reis消息隊(duì)列方案了。我先介紹一下消息隊(duì)列存取消息的過(guò)程。在分布式系統(tǒng)中,當(dāng)兩個(gè)組件要基于消息隊(duì)列進(jìn)行通信時(shí),一個(gè)組件會(huì)把要處理的數(shù)據(jù)以消息的形式傳遞給消息隊(duì)列,然后,這個(gè)組件就可以繼續(xù)執(zhí)行其他操作了;遠(yuǎn)端的另一個(gè)組件從消息隊(duì)列中把消息出來(lái),再在本地進(jìn)行處理。假設(shè)組件1需要對(duì)到的數(shù)據(jù)進(jìn)行求和計(jì)算,并寫(xiě)入數(shù)據(jù)庫(kù),但是,消息到達(dá)的速度很快,組件1沒(méi)有辦法及時(shí)地既做,又做計(jì)算,并且寫(xiě)入數(shù)據(jù)庫(kù)。所以,我們可以使用基于消息隊(duì)列的通信,讓組件1把數(shù)據(jù)x和y保存為JSON格式的消息,再發(fā)到消息隊(duì)列,這樣它就可以繼續(xù)接收新的數(shù)據(jù)了。組件2則異步地從消息隊(duì)列中把數(shù)據(jù)出來(lái),在服務(wù)器2上進(jìn)行求和計(jì)算后,再寫(xiě)入數(shù)據(jù)庫(kù)。這個(gè)過(guò)程如下圖所示:我們一般把消息隊(duì)列中發(fā)送消息的組件稱為生產(chǎn)者(1),把接收消息的組件稱為消費(fèi)者(例子中的組件2),下圖展示了一個(gè)通用的消息隊(duì)列的架構(gòu)模型:在使用消息隊(duì)列時(shí),消費(fèi)者可以異步生產(chǎn)者消息,然后再進(jìn)行處理。這樣一來(lái),即使生產(chǎn)者發(fā)送消息的速度遠(yuǎn)遠(yuǎn)超過(guò)了消費(fèi)者處理消息的速度,生產(chǎn)者已經(jīng)發(fā)送的消息也可以緩存在消息隊(duì)列中,避免阻塞生產(chǎn)者,這是消息隊(duì)列作為分布式組件通信的一大優(yōu)勢(shì)。雖然消費(fèi)者是異步處理消息,但是,消費(fèi)者仍然需要按照生產(chǎn)者發(fā)送消息的順序來(lái)處理消息,避免后發(fā)送的消息被先處理了。對(duì)于要求消息保序的場(chǎng)景來(lái)說(shuō),一旦出現(xiàn)這種消息被亂序處理的情況,就可能會(huì)導(dǎo)致業(yè)務(wù)邏輯被錯(cuò)誤執(zhí)行,從而給業(yè)務(wù)方造成損失。假設(shè)生產(chǎn)者負(fù)責(zé)接收庫(kù)存更新請(qǐng)求,消費(fèi)者負(fù)責(zé)實(shí)際更新庫(kù)存,現(xiàn)有庫(kù)存量是10。生產(chǎn)者先后發(fā)送了消息1和消息2,消息1要把商品X的庫(kù)存記錄更新為5,消息2是把商品X庫(kù)存更新為3。如果消息1和2在消息隊(duì)列中無(wú)法保序,出現(xiàn)消息2早于消息1被處理的面對(duì)這種情況,你可能會(huì)想到一種解決方案:更新后的庫(kù)存量作為生產(chǎn)者發(fā)送的消息,而是把庫(kù)存扣除值作為消息的內(nèi)容。這樣一來(lái),消息1是扣減庫(kù)存量5,消息2是扣212但是,我們還需要考慮這樣一種情況:假如消費(fèi)者收到了這樣三條消息:消息1是扣減庫(kù)存量5,消息2是庫(kù)存量,消息3是扣減庫(kù)存量2,此時(shí),如果消費(fèi)者先處理了消息3(把庫(kù)存量扣減2),那么庫(kù)存量就變成了8。然后,消費(fèi)者處理了消息2,當(dāng)前的庫(kù)存量是8,這就會(huì)出現(xiàn)庫(kù)存量查詢不正確的情況。從業(yè)務(wù)應(yīng)用層面看,消息1、2、3應(yīng)該是順序執(zhí)行的,所以,消息2查詢到的應(yīng)該是扣減了5以后的庫(kù)存量,而不是扣減了2以后的庫(kù)存量。所以,用庫(kù)存扣除值作為消息的方案,在消息中同時(shí)包含讀寫(xiě)操作的場(chǎng)景下,會(huì)帶來(lái)數(shù)據(jù)錯(cuò)誤的問(wèn)題。而且,這個(gè)方案還會(huì)一個(gè)問(wèn)題,那就是重復(fù)消息處理。消費(fèi)者從消息隊(duì)列消息時(shí),有時(shí)會(huì)因?yàn)榫W(wǎng)絡(luò)堵塞而出現(xiàn)消息重傳的情況。此時(shí),消費(fèi)者可能會(huì)收到多條重復(fù)的消息。對(duì)于重復(fù)的消息,消費(fèi)者如果多次處理的話,就可能造成一個(gè)業(yè)務(wù)邏輯被多次執(zhí)行,如果業(yè)務(wù)邏輯正好是要修改數(shù)據(jù),那就會(huì)出現(xiàn)數(shù)據(jù)被多次修改的問(wèn)題了。還是以庫(kù)存更新為例,假設(shè)消費(fèi)者收到了一次消息1,要扣減庫(kù)存量5,然后又收到了一次消息1,那么,如果消費(fèi)者無(wú)法識(shí)別這兩條消息實(shí)際是一條相同消息的話,就會(huì)執(zhí)行兩次扣減庫(kù)存量5的操作,此時(shí),庫(kù)存量就不對(duì)了。這當(dāng)然也是無(wú)法接受的。另外,消費(fèi)者在處理消息的時(shí)候,還可能出現(xiàn)因?yàn)楣收匣蝈礄C(jī)導(dǎo)致消息沒(méi)有處理完成的情況。此時(shí),消息隊(duì)列需要能提供消息可靠性的保證,也就是說(shuō),當(dāng)消費(fèi)者重啟后,可以重新消息再次進(jìn)行處理,否則,就會(huì)出現(xiàn)消息漏處理的問(wèn)題了。RedisListStreams解下基于List的消息隊(duì)列實(shí)現(xiàn)方法。ListListListLPUSHList,而消費(fèi)者則可以使用RPOP命令,從List的另一端按照消息的寫(xiě)入順序,依次消息并進(jìn)行處理。LPUSH53,表示要把庫(kù)存更新為5和3;消費(fèi)者則用RPOP把兩條消息依次讀出,然后進(jìn)行相應(yīng)的處理。List,List想要及時(shí)處理消息,就需要在程序中不停地調(diào)用RPOP命令(比如使用一個(gè)while(1)循環(huán))。如果有新消息寫(xiě)入,RPOP命令就會(huì)返回結(jié)果,否則,RPOPListRPOP程序的CPU一直消耗在執(zhí)行RPOP命令上,帶來(lái)不必要的性能損失。為了解決這個(gè)問(wèn)題,Reis提供了BRPOP命令。BROP命令也稱為阻塞式,客戶端在沒(méi)有讀到隊(duì)列數(shù)據(jù)時(shí),自動(dòng)阻塞,直到有新的數(shù)據(jù)寫(xiě)入隊(duì)列,再開(kāi)始新數(shù)據(jù)。和消費(fèi)者程序自己不停地調(diào)用RPOP命令相比,這種方式能節(jié)省CPU開(kāi)銷。ID經(jīng)處理過(guò)的消息的ID號(hào)記錄下來(lái)。當(dāng)收到一條消息后,消費(fèi)者程序就可以對(duì)比收到的消息D和記錄的已處理過(guò)的消息D,來(lái)判斷當(dāng)前收到的消息有沒(méi)有經(jīng)過(guò)處理。如果已經(jīng)處理過(guò),那么,消費(fèi)者程序就不再進(jìn)行處理了。這種處理特性也稱為冪等性,冪等性就是指,對(duì)于同一條消息,消費(fèi)者收到一次的處理結(jié)果和收到多次的處理結(jié)果是一致的。不過(guò),List本身是不會(huì)為每個(gè)消息生成ID號(hào)的,所以,消息的全局唯一ID號(hào)就需要生產(chǎn)者程序在發(fā)送消息前自行生成。生成之后,我們?cè)谟肔PUSH命令把消息插入List時(shí),需要在消息中包含這個(gè)全局唯一ID。例如,我們執(zhí)行以下命令,就把一條全局ID為 、庫(kù)存量為5的消息插入了消12LPUSHmq(integer)當(dāng)消費(fèi)者程序從List中一條消息后,List就不會(huì)再留存這條消息了。所以,如果消費(fèi)者程序在處理消息的過(guò)程出現(xiàn)了故障或宕機(jī),就會(huì)導(dǎo)致消息沒(méi)有處理完成,那么,消費(fèi)者程序再次啟動(dòng)后,就沒(méi)法再次從List中消息了。為了留存消息,List類型提供了BRPOPLPUSH命令,這個(gè)命令的作用是讓消費(fèi)者程序從一個(gè)List中消息,同時(shí),Redis會(huì)把這個(gè)消息再插入到另一個(gè)List(可以叫作備份備份List中重新消息并進(jìn)行處理了。我畫(huà)了一張示意圖,展示了使用BRPOPLPUSH命令留存消息,以及消費(fèi)者再次消息生產(chǎn)者先用LPUSH把消息“5”“3”插入到消息隊(duì)列mq中。消費(fèi)者程序使用BRPOPLPUSH命令消息“5”,同時(shí),消息“5”還會(huì)被Redis插入到mqback隊(duì)列中。如果消費(fèi)者程序處理消息“5”時(shí)宕機(jī)了,等它重啟后,可以從mqback中再次消List需求。但是,在用List做消息隊(duì)列時(shí),我們還可能遇到過(guò)一個(gè)問(wèn)題:生產(chǎn)者消息發(fā)送很ListRedis這個(gè)時(shí)候,我們希望啟動(dòng)多個(gè)消費(fèi)者程序組成一個(gè)消費(fèi)組,一起分擔(dān)處理List中的消息。但是,ListRedis從5.0版本開(kāi)始提供的Streams數(shù)據(jù)類型了。ListStreams消息。接下來(lái),我們就來(lái)了解下Streams的使用方法。Streams是XADD:插入消息,保證有序,可以自動(dòng)生成全局唯一ID;XREAD:用于消息,可以按ID數(shù)據(jù);XREADGROUP:XPENDINGXACK:XPENDING但尚未確認(rèn)的消息,而XACK命令用于向消息隊(duì)列確認(rèn)消息處理已完成。首先,我們來(lái)學(xué)習(xí)下Streams類型存取消息的操作XADDXADD命令可以往消息隊(duì)列中插入新消息,消息的格式是鍵-值對(duì)形式。對(duì)于插入的每一條消息,Streams可以自動(dòng)為其生成一個(gè)全局唯一的ID。比如說(shuō),我們執(zhí)行下面令,就可以往名稱為mqstream的消息隊(duì)列中插入一條消息,消息的鍵是repo,值是5。其中,消息隊(duì)列名稱后面的*,表示讓Redis為插入的數(shù)據(jù)自動(dòng)生成一個(gè)全局唯一的ID,例如“1599203861727-0”。當(dāng)然,我們也可以不用*,直接在消息隊(duì)列名稱后自行設(shè)定一個(gè)ID號(hào),只要保證這個(gè)ID號(hào)是全局唯一的就行。不過(guò),相比自行設(shè)定ID號(hào),使用*會(huì)更加方便高效。XADDmqstream*repo可以看到,消息的全局唯一D由兩部分組成,第一部分“1599203861727”是數(shù)據(jù)插入時(shí),以毫秒為單位計(jì)算的當(dāng)前服務(wù)器時(shí)間,第二部分表示插入消息在當(dāng)前毫秒內(nèi)的消息序號(hào),這是從0開(kāi)始編號(hào)的。例如,“1599203861727-0”就表示在“1599203861727”毫秒內(nèi)的第1當(dāng)消費(fèi)者需要消息時(shí),可以直接使用XREAD命令從消息隊(duì)列中XREAD在消息時(shí),可以指定一個(gè)消息ID,并從這個(gè)消息ID的下一條消息開(kāi)始進(jìn)行讀例如,我們可以執(zhí)行下面令,從ID號(hào)為1599203861727-0的消息開(kāi)始,后續(xù)的所有消息(示例中一共3條)。123456789XREADBLOCK1001)1)mqstream1599203861727-2)1)1)"1599274912765-2)1)2)2)1)"1599274925823-2)1)2)3)1)"1599274927910-2)1)2)另外,消費(fèi)者也可以在調(diào)用XRAED時(shí)設(shè)定block配置項(xiàng),實(shí)現(xiàn)類似于BRPOP的阻塞讀blockXREAD時(shí)長(zhǎng)可以在block配置項(xiàng)進(jìn)行設(shè)置。舉個(gè)例子,我們來(lái)看一下下面令,其中,命令最后的“$”符號(hào)表示的消息,同時(shí),我們?cè)O(shè)置了block10000的配置項(xiàng),10000的單位是毫秒,表明XREAD在最新消息時(shí),如果沒(méi)有消息到來(lái),XREAD將阻塞10000毫秒(即10秒),然后再返回。下面命令中的XREAD執(zhí)行后,消息隊(duì)列mqstream中一直沒(méi)有消息,所以,XREAD在10秒后返回空值(nil)。XREADblock10000streamsmqstream剛剛講到的這些操作是ListStreams例如,我們執(zhí)行下面令,創(chuàng)建一個(gè)名為group1的消費(fèi)組,這個(gè)消費(fèi)組消費(fèi)的消息隊(duì)列是mqstream。XGROUPcreatemqstreamgroup1然后,我們?cè)賵?zhí)行一段命令,讓group1消費(fèi)組里的消費(fèi)者consumer1從mqstream中因?yàn)樵赾onsumer1消息前,group1中沒(méi)有其他消費(fèi)者過(guò)消息,所以,consumer1就得到mqstream消息隊(duì)列中的所有消息了(一共4條)。XREADGROUPgroupgroup1consumer1streamsmqstream1)1) 2)1)1)"1599203861727- 2)1) 2) 2)1)"1599274912765- 2)1) 2) 3)1)"1599274925823- 2)1) 2) 4)1)"1599274927910- 2)1) 2)需要注意的是,消息隊(duì)列中的消息一旦被消費(fèi)組里的一個(gè)消費(fèi)者了,就不能再被該消費(fèi)組內(nèi)的其他消費(fèi)者了。比如說(shuō),我們執(zhí)行完剛才的XREADGROUP命令后,再執(zhí)行下面令,讓rou1內(nèi)的onsumer2消息時(shí),consmer2讀到的就是空值,因?yàn)橄⒁呀?jīng)被onsumer1完了,如下所示:12XREADGROUPgroupgroup11)1)streamsmqstream 2)(emptylistor使用消費(fèi)組的目的是讓組內(nèi)的多個(gè)消費(fèi)者共同分擔(dān)消息,所以,我們通常會(huì)讓每個(gè)消費(fèi)者部分消息,從而實(shí)現(xiàn)消息負(fù)載在多個(gè)消費(fèi)者間是均衡分布的。例如,我們執(zhí)行下列命令,讓rou2中的onsumer1、2、3各自一條消息。代代123456789XREADGROUPgroupgroup2consumer1count1streamsmqstream1)2)1)1)"1599203861727-1)2)XREADGROUPgroupgroup2consumer2count1streamsmqstream1)2)1)1)"1599274912765-1)2)XREADGROUPgroupgroup2consumer3count1streamsmqstream1)2)1)1)"1599274925823-1)2)自動(dòng)使用內(nèi)列(也稱為PENDINGList)留存消費(fèi)組里每個(gè)消費(fèi)者的消息,直到消費(fèi)者使用XACK命令通知Streams“消息已經(jīng)處理完成”。如果消費(fèi)者沒(méi)有成功處理消息,它就不會(huì)給Streams發(fā)送XACK命令,消息仍然會(huì)留存。此時(shí),消費(fèi)者可以在重啟后,用XPENDING命令查看已、但尚未確認(rèn)處理完成的消息例如,我們來(lái)查看一下group2中各個(gè)消費(fèi)者已、但尚未確認(rèn)的消息個(gè)數(shù)。其中,XPENDING返回結(jié)果的第二、三行分別表示group2中所有消費(fèi)者的消息最小ID和最大ID。XPENDINGmqstream1)(integer)2)"1599203861727-3)"1599274925823-554)1)1) 2) 2)1) 2) 3)1) 2)XPENDINGmqstreamgroup2-+101)1)"1599274912765-2)3)(integer)4)(integer)可以看到,consumer2已的消息的ID是1599274912765-0一旦消息1599274912765-0被consumer2處理了,consumer2就可以使用XACK命令通知Streams,然后這條消息就會(huì)被刪除。當(dāng)我們?cè)偈褂肵PENDING命令查看時(shí),就可以看到,consumer2已經(jīng)沒(méi)有已、但尚未確認(rèn)處理的消息了。XACKmqstreamgroup21599274912765-(integer)XPENDINGmqstreamgroup2-+10(emptylistor現(xiàn)在,我們就知道了用Streams實(shí)現(xiàn)消息隊(duì)列的方法,我還想再?gòu)?qiáng)調(diào)下,Streams是Redis5.0Redis5.05.0本,就可以考慮把Streams用作消息隊(duì)列了。這節(jié)課,我們學(xué)習(xí)了分布式系統(tǒng)組件使用消息隊(duì)列時(shí)的三大需求:消息保序、重復(fù)消息處理和消息可靠性保證,這三大需求可以進(jìn)一步轉(zhuǎn)換為對(duì)消息隊(duì)列的三大要求:消息數(shù)據(jù)有序存取,消息數(shù)據(jù)具有全局唯一編號(hào),以及消息數(shù)據(jù)在消費(fèi)完成后被刪除。ListStreams其實(shí),關(guān)于Redis是否適合做消息隊(duì)列,業(yè)界一直是有爭(zhēng)論的。很多人認(rèn)為,要使用消息隊(duì)列,就應(yīng)該采用Kafka、RabbitMQ這些專門(mén)面向消息隊(duì)列場(chǎng)景的軟件,而Redis更加根據(jù)這些年做Redis研發(fā)工作的經(jīng)驗(yàn),我的看法
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 河北省邯鄲市肥鄉(xiāng)區(qū)固中學(xué)、北高鎮(zhèn)中心校聯(lián)考2026屆九年級(jí)上學(xué)期10月期中考試數(shù)學(xué)試卷(含答案)
- 廣東省廣州市荔灣區(qū)2025-2026學(xué)年第一學(xué)期四年級(jí)數(shù)學(xué)期末試卷(無(wú)答案)
- 五年級(jí)數(shù)學(xué)上冊(cè)期中測(cè)試卷及答案
- 解讀教育部《中小學(xué)生健康體檢管理辦法(2021年版)》全文解讀
- 22春北京語(yǔ)言大學(xué)《漢語(yǔ)寫(xiě)作》在線作業(yè)一答案參考8
- 七年級(jí)下語(yǔ)文課堂作業(yè)本答案第一單元
- 新部編人教版一年級(jí)數(shù)學(xué)上冊(cè)期末知識(shí)點(diǎn)及答案(三套)
- 電氣工程造價(jià)管理技術(shù)方法
- 深圳職工考試題庫(kù)及答案
- 人文地理常識(shí)試題及答案
- 2026年年長(zhǎng)租公寓市場(chǎng)分析
- 生態(tài)環(huán)境監(jiān)測(cè)數(shù)據(jù)分析報(bào)告
- 2025年下半年四川成都溫江興蓉西城市運(yùn)營(yíng)集團(tuán)有限公司第二次招聘人力資源部副部長(zhǎng)等崗位5人考試參考試題及答案解析
- 煤炭裝卸施工方案(3篇)
- 八年級(jí)歷史上冊(cè)小論文觀點(diǎn)及范文
- 重慶康德卷2025-2026學(xué)年高一數(shù)學(xué)第一學(xué)期期末達(dá)標(biāo)檢測(cè)試題含解析
- 浙江省杭州市蕭山區(qū)2024-2025學(xué)年六年級(jí)上學(xué)期語(yǔ)文期末試卷(含答案)
- 設(shè)備隱患排查培訓(xùn)
- 2025至2030磷酸二氫鈉行業(yè)產(chǎn)業(yè)運(yùn)行態(tài)勢(shì)及投資規(guī)劃深度研究報(bào)告
- 國(guó)家事業(yè)單位招聘2025中國(guó)農(nóng)業(yè)科學(xué)院植物保護(hù)研究所招聘12人筆試歷年參考題庫(kù)附帶答案詳解
- 裝載機(jī)安全培訓(xùn)課件
評(píng)論
0/150
提交評(píng)論