版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
第SpringCloud微服務(wù)開發(fā)基于RocketMQ實現(xiàn)分布式事務(wù)管理詳解目錄消息隊列實現(xiàn)分布式事務(wù)原理RocketMQ的事務(wù)消息代碼實現(xiàn)基礎(chǔ)配置發(fā)送半消息執(zhí)行本地事務(wù)與回查Account-Service消費消息測試小結(jié)
消息隊列實現(xiàn)分布式事務(wù)原理
首先讓我們來看一下基于消息隊列實現(xiàn)分布式事務(wù)的原理方案。
柔性事務(wù)
發(fā)送消息的服務(wù)有個OUTBOX數(shù)據(jù)表,在進(jìn)行INSERT、UPDATE、DELETE業(yè)務(wù)操作時也會給OUTBOX數(shù)據(jù)表INSERT一條消息記錄,這樣可以保證原子性,因為這是基于本地的ACID事務(wù)。
OUTBOX表充當(dāng)臨時消息隊列,然后我們在引入一個消息中繼(MessageRelay)的服務(wù),由他從OUTBOX表中讀取數(shù)據(jù)并發(fā)布消息到消息組件。
消息中繼的實現(xiàn)可以很簡單,只需要通過定時任務(wù)定期從OUTBOX表中拉取最新未發(fā)布的數(shù)據(jù),獲取到數(shù)據(jù)后將數(shù)據(jù)發(fā)送給消息組件,最后將完成發(fā)送的消息從OUTBOX表中刪除即可,對于失敗的消息可以根據(jù)業(yè)務(wù)規(guī)則進(jìn)行重試。
RocketMQ的事務(wù)消息
RocketMQ本身已經(jīng)支持事務(wù)消息,如果你們項目使用了RocketMQ,可以直接借助RocketMQ的事務(wù)消息實現(xiàn)分布式事務(wù),我們先看一下RocketMQ事務(wù)消息的原理然后再借助RocketMQ來實現(xiàn)分布式事務(wù)。
RocketMQ采用了2PC的思想來實現(xiàn)了提交事務(wù)消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。
分布式事務(wù)
RocketMQ實現(xiàn)事務(wù)消息主要分為兩個階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補償流程
整體流程為:
正常事務(wù)發(fā)送與提交階段
1、生產(chǎn)者發(fā)送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
2、服務(wù)端響應(yīng)消息寫入結(jié)果,半消息發(fā)送成功
3、開始執(zhí)行本地事務(wù)
4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作
事務(wù)信息的補償流程
1、如果MQServer長時間沒收到本地事務(wù)的執(zhí)行狀態(tài)會向生產(chǎn)者發(fā)起一個確認(rèn)回查的操作請求
2、生產(chǎn)者收到確認(rèn)回查請求后,檢查本地事務(wù)的執(zhí)行狀態(tài)
3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作
補償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時發(fā)生超時或失敗的情況。
RocketMQ事務(wù)流程關(guān)鍵
事務(wù)消息在一階段對用戶不可見
事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的,也就是說消費者不能直接消費。這里RocketMQ的實現(xiàn)方法是原消息的主題與消息消費隊列,然后把主題改成RMQ_SYS_TRANS_HALF_TOPIC,這樣由于消費者沒有訂閱這個主題,所以不會被消費。
如何處理第二階段的失敗消息?
在本地事務(wù)執(zhí)行完成后會向MQServer發(fā)送Commit或Rollback操作,此時如果在發(fā)送消息的時候生產(chǎn)者出故障了,那么要保證這條消息最終被消費,MQServer會像服務(wù)端發(fā)送回查請求,確認(rèn)本地事務(wù)的執(zhí)行狀態(tài)。
當(dāng)然了rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài),RocketMQ默認(rèn)回滾該消息。
消息狀態(tài)事務(wù)消息有三種狀態(tài):TransactionStatus.CommitTransaction:提交事務(wù)消息,消費者可以消費此消息
TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費。
TransactionStatus.Unknown:中間狀態(tài),它代表需要檢查消息隊列來確定狀態(tài)。
代碼實現(xiàn)
業(yè)務(wù)需求:用戶請求訂單微服務(wù)order-service接口刪除訂單(退貨),刪除訂單時需要調(diào)用account-service的方法給賬戶增加余額,一個典型的分布式事務(wù)問題。
基礎(chǔ)配置
在Order-Service和Account-Service中引入Rocket消息組件
dependency
groupIdorg.apache.rocketmq/groupId
artifactIdrocketmq-spring-boot-starter/artifactId
/dependency
在配置中心添加RocketMQ的相關(guān)配置
rocketmq:
name-server:xxx.xx.x.xx:9876
producer:
group:cloud-group
在OrderService服務(wù)中建立一張事務(wù)日志表rocketmq_transaction_log(作用稍后說)
發(fā)送半消息
Order-Service作為分布式事務(wù)開始的入口,在Service層我們給RocketMQ發(fā)送一條半消息
OrderController入口
/**
*根據(jù)訂單號刪除訂單
*@paramorderNo訂單編號
@PostMapping("/order/delete")
publicResultDataStringdelete(@RequestParamStringorderNo){
("deleteorderidis{}",orderNo);
orderService.delete(orderNo);
returnResultData.success("訂單刪除成功");
}
直接調(diào)用orderService的delete方法
OrderServiceImpl業(yè)務(wù)邏輯
@Override
publicvoiddelete(StringorderNo){
Orderorder=orderMapper.selectByNo(orderNo);
//如果訂單存在且狀態(tài)為有效,進(jìn)行業(yè)務(wù)處理
if(order!=nullCloudConstant.VALID_STATUS.equals(order.getStatus())){
StringtransactionId=UUID.randomUUID().toString();
//如果可以刪除訂單則發(fā)送消息給rocketmq,讓用戶中心消費消息
rocketMQTemplate.sendMessageInTransaction("add-amount",
MessageBuilder.withPayload(
UserAddMoneyDTO.builder()
.userCode(order.getAccountCode())
.amount(order.getAmount())
.build()
.setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
.setHeader("order_id",order.getId())
.build()
,order
}
首先校驗一下訂單狀態(tài),然后使用rocketMQTemplate.sendMessageInTransaction()發(fā)送事務(wù)消息。
sendMessageInTransaction方法有三個參數(shù):
destination:目的地(主題),這里發(fā)送給add-amount這個topicmessage:發(fā)送給消費者的消息體,需要使用MessageBuilder.withPayload()來構(gòu)建消息arg:參數(shù)
注意,這里我們生成了一個transactionId,并放在header中跟消息一起發(fā)送(這里實際也可以構(gòu)造成一個對象,放在arg里進(jìn)行發(fā)送),作用后面再講!
消息封裝實體UserAddMoneyDTO
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
publicclassUserAddMoneyDTO{
*用戶編碼
privateStringuserCode;
*金額
privateBigDecimalamount;
}
這個類生產(chǎn)者和消費者都需要用到,所以我直接丟到common包中,大家根據(jù)項目實際情況決定放哪。
執(zhí)行本地事務(wù)與回查
MQServer收到半消息后會告訴生產(chǎn)者order-service確認(rèn)收到半消息,這時候order-service需要執(zhí)行本地事務(wù),執(zhí)行完本地事務(wù)后再告訴MQServer本地事務(wù)的執(zhí)行狀態(tài),確認(rèn)此消息究竟是Commit還是Rollback。
RocketMQ提供了RocketMQLocalTransactionListener接口,本地事務(wù)監(jiān)聽器,這個接口類的實現(xiàn)如下:
第一個方法executeLocalTransaction為執(zhí)行本地事務(wù);第二個方法checkLocalTransaction為檢查本地事務(wù)的執(zhí)行狀態(tài),也就是回查動作。
我們需要實現(xiàn)RocketMQLocalTransactionListener接口,在executeLocalTransaction方法中執(zhí)行本地事務(wù),在執(zhí)行checkLocalTransaction回查方法時告訴RocketMQ到底該提交還是回滾。
這里大家思考一個問題,本地事務(wù)已經(jīng)執(zhí)行完成了,怎么去回查本地事務(wù)的執(zhí)行結(jié)果呢?
答案如下:我們可以在執(zhí)行本地事務(wù)的時候同時生成一條事務(wù)日志,讓本地事務(wù)與日志事務(wù)在同一個方法中,同時添加@Transactional注解,保證兩個操作事務(wù)是一個原子操作。
這樣如果事務(wù)日志表中有這個本地事務(wù)的信息,那就代表本地事務(wù)執(zhí)行成功,需要Commit,相反如果沒有對應(yīng)的事務(wù)日志,則表示執(zhí)行失敗,需要Rollback。這就是為什么我們上面在OrderService中需要建立一張事務(wù)日志表的原因。
實現(xiàn)RocketMQLocalTransactionListener接口,完成事務(wù)執(zhí)行邏輯
/**
*監(jiān)聽事務(wù)消息
*@authorjavadaily
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor=@__(@Autowired))
publicclassAddUserAmountListenerimplementsRocketMQLocalTransactionListener{
privatefinalOrderServiceorderService;
privatefinalRocketMqTransactionLogMapperrocketMqTransactionLogMapper;
*執(zhí)行本地事務(wù)
@Override
publicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemessage,Objectarg){
("執(zhí)行本地事務(wù)");
MessageHeadersheaders=message.getHeaders();
//獲取事務(wù)ID
StringtransactionId=(String)headers.get(RocketMQHeaders.TRANSACTION_ID);
IntegerorderId=Integer.valueOf((String)headers.get("order_id"));
("transactionIdis{},orderIdis{}",transactionId,orderId);
try{
//執(zhí)行本地事務(wù),并記錄日志
orderService.changeStatuswithRocketMqLog(orderId,CloudConstant.INVALID_STATUS,transactionId);
//執(zhí)行成功,可以提交事務(wù)
returnRocketMQLocalTransactionState.COMMIT;
}catch(Exceptione){
returnRocketMQLocalTransactionState.ROLLBACK;
*本地事務(wù)的檢查,檢查本地事務(wù)是否成功
@Override
publicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemessage){
MessageHeadersheaders=message.getHeaders();
//獲取事務(wù)ID
StringtransactionId=(String)headers.get(RocketMQHeaders.TRANSACTION_ID);
("檢查本地事務(wù),事務(wù)ID:{}",transactionId);
//根據(jù)事務(wù)id從日志表檢索
QueryWrapperRocketmqTransactionLogqueryWrapper=newQueryWrapper();
queryWrapper.eq("transaction_id",transactionId);
RocketmqTransactionLogrocketmqTransactionLog=rocketMqTransactionLogMapper.selectOne(queryWrapper);
if(null!=rocketmqTransactionLog){
returnRocketMQLocalTransactionState.COMMIT;
returnRocketMQLocalTransactionState.ROLLBACK;
}
本地事務(wù)執(zhí)行邏輯
@Transactional(rollbackFor=RuntimeException.class)
@Override
publicvoidchangeStatuswithRocketMqLog(Integerid,Stringstatus,StringtransactionId){
orderMapper.changeStatus(id,status);
rocketMqTransactionLogMapper.insert(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("執(zhí)行刪除訂單操作")
.build()
}
修改訂單狀態(tài)為刪除狀態(tài),同時往事務(wù)日志表中插入一條事務(wù)日志,用@Transactional注解保證事務(wù)。
Account-Service消費消息
監(jiān)聽消息并處理給用戶增加余額邏輯
@Slf4j
@Service
@RocketMQMessageListener(topic="add-amount",consumerGroup="cloud-group")
@RequiredArgsConstructor(onConstructor=@__(@Autowired))
publicclassAddUserAmountListenerimplementsRocketMQListenerUserAddMoneyDTO{
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年中職第二學(xué)年(護(hù)理)老年照護(hù)專項試題及答案
- 2025年大學(xué)本科(食品質(zhì)量與安全)食品分析試題及答案
- 2025年大學(xué)食品科學(xué)與工程(食品工程)試題及答案
- 2025年中職焊接技術(shù)與自動化(手工焊接)試題及答案
- 養(yǎng)老院老人心理咨詢師培訓(xùn)制度
- 養(yǎng)老院心理慰藉制度
- 公共交通從業(yè)人員培訓(xùn)考核制度
- 2026年人工智能計算機視覺基礎(chǔ)知識題庫含答案
- 2026年刮痧師中醫(yī)理論考核試題含答案
- 2026年中級公共文化服務(wù)面試題及答案
- 土壤微生物群落結(jié)構(gòu)優(yōu)化研究
- 2024外研版四年級英語上冊Unit 4知識清單
- 四川省南充市2024-2025學(xué)年部編版七年級上學(xué)期期末歷史試題
- 國有企業(yè)三位一體推進(jìn)內(nèi)控風(fēng)控合規(guī)建設(shè)的問題和分析
- 急診預(yù)檢分診課件教學(xué)
- 2025年高二數(shù)學(xué)建模試題及答案
- 儲能集裝箱知識培訓(xùn)總結(jié)課件
- 幼兒園中班語言《雪房子》課件
- 房地產(chǎn)項目開發(fā)管理方案
- 堆垛車安全培訓(xùn)課件
- 貝林妥單抗護(hù)理要點
評論
0/150
提交評論