版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
第Golang實(shí)現(xiàn)Redis系列(六)如何實(shí)現(xiàn)pipeline模式的redis客戶端本文的完整代碼在/hdt3213/godis/redis/client
通常TCP客戶端的通信模式都是阻塞式的:客戶端發(fā)送請求-等待服務(wù)端響應(yīng)-發(fā)送下一個請求。因?yàn)樾枰却W(wǎng)絡(luò)傳輸數(shù)據(jù),完成一次請求循環(huán)需要等待較多時間。
我們能否不等待服務(wù)端響應(yīng)直接發(fā)送下一條請求呢?答案是肯定的。
TCP作為全雙工協(xié)議可以同時進(jìn)行上行和下行通信,不必?fù)?dān)心客戶端和服務(wù)端同時發(fā)包會導(dǎo)致沖突。
p.s.打電話的時候兩個人同時講話就會沖突聽不清,只能輪流講。這種通信方式稱為半雙工。廣播只能由電臺發(fā)送到收音機(jī)不能反向傳輸,這種方式稱為單工。
我們?yōu)槊恳粋€tcp連接分配了一個goroutine可以保證先收到的請求先先回復(fù)。另一個方面,tcp協(xié)議會保證數(shù)據(jù)流的有序性,同一個tcp連接上先發(fā)送的請求服務(wù)端先接收,先回復(fù)的響應(yīng)客戶端先收到。因此我們不必?fù)?dān)心混淆響應(yīng)所對應(yīng)的請求。
這種在服務(wù)端未響應(yīng)時客戶端繼續(xù)向服務(wù)端發(fā)送請求的模式稱為Pipeline模式。因?yàn)闇p少等待網(wǎng)絡(luò)傳輸?shù)臅r間,Pipeline模式可以極大的提高吞吐量,減少所需使用的tcp鏈接數(shù)。
pipeline模式的redis客戶端需要有兩個后臺協(xié)程程負(fù)責(zé)tcp通信,調(diào)用方通過channel向后臺協(xié)程發(fā)送指令,并阻塞等待直到收到響應(yīng),這是一個典型的異步編程模式。
我們先來定義client的結(jié)構(gòu):
typeClientstruct{
connnet.Conn//與服務(wù)端的tcp連接
pendingReqschan*Request//等待發(fā)送的請求
waitingReqschan*Request//等待服務(wù)器響應(yīng)的請求
ticker*time.Ticker//用于觸發(fā)心跳包的計(jì)時器
addrstring
ctxcontext.Context
cancelFunccontext.CancelFunc
writing*sync.WaitGroup//有請求正在處理不能立即停止,用于實(shí)現(xiàn)gracefulshutdown
typeRequeststruct{
iduint64//請求id
args[][]byte//上行參數(shù)
replyredis.Reply//收到的返回值
heartbeatbool//標(biāo)記是否是心跳請求
waiting*wait.Wait//調(diào)用協(xié)程發(fā)送請求后通過waitgroup等待請求異步處理完成
errerror
}
調(diào)用者將請求發(fā)送給后臺協(xié)程,并通過waitgroup等待異步處理完成:
func(client*Client)Send(args[][]byte)redis.Reply{
request:=request{
args:args,
heartbeat:false,
waiting:wait.Wait{},
request.waiting.Add(1)
client.working.Add(1)
deferclient.working.Done()
client.pendingReqs-request//請求入隊(duì)
timeout:=request.waiting.WaitWithTimeout(maxWait)//等待響應(yīng)或者超時
iftimeout{
returnreply.MakeErrReply("servertimeout")
ifrequest.err!=nil{
returnreply.MakeErrReply("requestfailed")
returnrequest.reply
}
client的核心部分是后臺的讀寫協(xié)程。先從寫協(xié)程開始:
//寫協(xié)程入口
func(client*Client)handleWrite(){
forreq:=rangeclient.pendingReqs{
client.doRequest(req)
//發(fā)送請求
func(client*Client)doRequest(req*request){
ifreq==nil||len(req.args)==0{
return
//序列化請求
re:=reply.MakeMultiBulkReply(req.args)
bytes:=re.ToBytes()
_,err:=client.conn.Write(bytes)
i:=0
//失敗重試
forerr!=nili3{
err=client.handleConnectionError(err)
iferr==nil{
_,err=client.conn.Write(bytes)
iferr==nil{
//發(fā)送成功等待服務(wù)器響應(yīng)
client.waitingReqs-req
}else{
req.err=err
req.waiting.Done()
}
讀協(xié)程是我們熟悉的協(xié)議解析器模板,不熟悉的朋友可以到解析RedisCluster原理了解更多。
//收到服務(wù)端的響應(yīng)
func(client*Client)finishRequest(replyredis.Reply){
deferfunc(){
iferr:=recover();err!=nil{
debug.PrintStack()
logger.Error(err)
request:=-client.waitingReqs
ifrequest==nil{
return
request.reply=reply
ifrequest.waiting!=nil{
request.waiting.Done()
//讀協(xié)程是個RESP協(xié)議解析器
func(client*Client)handleRead()error{
ch:=parser.ParseStream(client.conn)
forpayload:=rangech{
ifpayload.Err!=nil{
client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
continue
client.finishRequest(payload.Data)
returnnil
}
最后編寫client的構(gòu)造器和啟動異步協(xié)程的代碼:
funcMakeClient(addrstring)(*Client,error){
conn,err:=net.Dial("tcp",addr)
iferr!=nil{
returnnil,err
ctx,cancel:=context.WithCancel(context.Background())
returnClient{
addr:addr,
conn:conn,
sendingReqs:make(chan*Request,chanSize),
waitingReqs:make(chan*Request,chanSize),
ctx:ctx,
cancelFunc:cancel,
writing:sync.WaitGroup{},
},nil
func(client*Client)Start(){
client.ticker=time.NewTicker(10*time.Second)
goclient.handleWrite()
gofunc(){
err:=client.handleRead()
logger.Warn(err)
goclient.heartbeat()
}
關(guān)閉client的時候記得等待請求完成:
func(client*Client)Close(){
//先阻止新請求進(jìn)入隊(duì)列
close(client.sendingReqs)
//等待處理中的請求完成
client.writing.Wait()
//釋放資源
_=client.conn.Close()//關(guān)閉與服務(wù)端的連接,連接關(guān)閉后讀協(xié)程會退出
client.cancelFunc()//使用context關(guān)閉讀協(xié)程
close(client.waitingReqs)//關(guān)閉隊(duì)列
}
測試一下:
funcTestClient(t*testing.T){
client,err:=MakeClient("localhost:6379")
iferr!=nil{
t.Error(err)
client.Start()
result=client.Send([][]byte{
[]byte("SET"),
[]byte("a"),
[]byte("a"),
ifstatusRet,ok:=result.(*reply.StatusReply);ok{
ifstatusRet.Status!="OK"{
t.Error("`set`failed,result:"+statusRet.Status)
result=client.Send([][]byte{
[]byte("GET"),
[]byte(
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026年智能停車輔助系統(tǒng)項(xiàng)目公司成立分析報(bào)告
- 2025年中職水利水電工程施工(水工建筑物基礎(chǔ))試題及答案
- 2026年家政服務(wù)教學(xué)(家政服務(wù)應(yīng)用)試題及答案
- 2025年高職防災(zāi)減災(zāi)技術(shù)(災(zāi)害預(yù)防措施)試題及答案
- 2025年高職物理學(xué)(相對論)試題及答案
- 2025年中職作曲與作曲技術(shù)理論(作曲理論)試題及答案
- 2025年中職(茶葉生產(chǎn)與加工)茶葉采摘標(biāo)準(zhǔn)試題及答案
- 2025年大學(xué)大四(印刷企業(yè)管理)企業(yè)運(yùn)營專項(xiàng)測試題及答案
- 2025年大學(xué)生態(tài)環(huán)境保護(hù)(生態(tài)修復(fù)工程)試題及答案
- 2025年高職數(shù)字媒體藝術(shù)設(shè)計(jì)(數(shù)字插畫創(chuàng)作)試題及答案
- 手術(shù)室查對制度
- 支氣管哮喘患者的自我管理宣教
- 第三次全國國土調(diào)查工作分類與三大類對照表
- 質(zhì)量效應(yīng)2楷模路線文字版
- 消防設(shè)施檢查記錄表
- 酒店協(xié)議價合同
- 哈爾濱工業(yè)大學(xué)簡介宣傳介紹
- 青光眼的藥物治療演示
- 羅永浩海淀劇場演講
- 蘇州市公務(wù)員考核實(shí)施細(xì)則
- GB/T 2703-2017鞋類術(shù)語
評論
0/150
提交評論