Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端_第1頁
Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端_第2頁
Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端_第3頁
Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端_第4頁
Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端_第5頁
已閱讀5頁,還剩2頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

第Golang實(shí)現(xiàn)Redis系列(六)如何實(shí)現(xiàn)pipeline模式的redis客戶端本文的完整代碼在/hdt3213/godis/redis/client

通常TCP客戶端的通信模式都是阻塞式的:客戶端發(fā)送請求-等待服務(wù)端響應(yīng)-發(fā)送下一個請求。因?yàn)樾枰却W(wǎng)絡(luò)傳輸數(shù)據(jù),完成一次請求循環(huán)需要等待較多時間。

我們能否不等待服務(wù)端響應(yīng)直接發(fā)送下一條請求呢?答案是肯定的。

TCP作為全雙工協(xié)議可以同時進(jìn)行上行和下行通信,不必?fù)?dān)心客戶端和服務(wù)端同時發(fā)包會導(dǎo)致沖突。

p.s.打電話的時候兩個人同時講話就會沖突聽不清,只能輪流講。這種通信方式稱為半雙工。廣播只能由電臺發(fā)送到收音機(jī)不能反向傳輸,這種方式稱為單工。

我們?yōu)槊恳粋€tcp連接分配了一個goroutine可以保證先收到的請求先先回復(fù)。另一個方面,tcp協(xié)議會保證數(shù)據(jù)流的有序性,同一個tcp連接上先發(fā)送的請求服務(wù)端先接收,先回復(fù)的響應(yīng)客戶端先收到。因此我們不必?fù)?dān)心混淆響應(yīng)所對應(yīng)的請求。

這種在服務(wù)端未響應(yīng)時客戶端繼續(xù)向服務(wù)端發(fā)送請求的模式稱為Pipeline模式。因?yàn)闇p少等待網(wǎng)絡(luò)傳輸?shù)臅r間,Pipeline模式可以極大的提高吞吐量,減少所需使用的tcp鏈接數(shù)。

pipeline模式的redis客戶端需要有兩個后臺協(xié)程程負(fù)責(zé)tcp通信,調(diào)用方通過channel向后臺協(xié)程發(fā)送指令,并阻塞等待直到收到響應(yīng),這是一個典型的異步編程模式。

我們先來定義client的結(jié)構(gòu):

typeClientstruct{

connnet.Conn//與服務(wù)端的tcp連接

pendingReqschan*Request//等待發(fā)送的請求

waitingReqschan*Request//等待服務(wù)器響應(yīng)的請求

ticker*time.Ticker//用于觸發(fā)心跳包的計(jì)時器

addrstring

ctxcontext.Context

cancelFunccontext.CancelFunc

writing*sync.WaitGroup//有請求正在處理不能立即停止,用于實(shí)現(xiàn)gracefulshutdown

typeRequeststruct{

iduint64//請求id

args[][]byte//上行參數(shù)

replyredis.Reply//收到的返回值

heartbeatbool//標(biāo)記是否是心跳請求

waiting*wait.Wait//調(diào)用協(xié)程發(fā)送請求后通過waitgroup等待請求異步處理完成

errerror

}

調(diào)用者將請求發(fā)送給后臺協(xié)程,并通過waitgroup等待異步處理完成:

func(client*Client)Send(args[][]byte)redis.Reply{

request:=request{

args:args,

heartbeat:false,

waiting:wait.Wait{},

request.waiting.Add(1)

client.working.Add(1)

deferclient.working.Done()

client.pendingReqs-request//請求入隊(duì)

timeout:=request.waiting.WaitWithTimeout(maxWait)//等待響應(yīng)或者超時

iftimeout{

returnreply.MakeErrReply("servertimeout")

ifrequest.err!=nil{

returnreply.MakeErrReply("requestfailed")

returnrequest.reply

}

client的核心部分是后臺的讀寫協(xié)程。先從寫協(xié)程開始:

//寫協(xié)程入口

func(client*Client)handleWrite(){

forreq:=rangeclient.pendingReqs{

client.doRequest(req)

//發(fā)送請求

func(client*Client)doRequest(req*request){

ifreq==nil||len(req.args)==0{

return

//序列化請求

re:=reply.MakeMultiBulkReply(req.args)

bytes:=re.ToBytes()

_,err:=client.conn.Write(bytes)

i:=0

//失敗重試

forerr!=nili3{

err=client.handleConnectionError(err)

iferr==nil{

_,err=client.conn.Write(bytes)

iferr==nil{

//發(fā)送成功等待服務(wù)器響應(yīng)

client.waitingReqs-req

}else{

req.err=err

req.waiting.Done()

}

讀協(xié)程是我們熟悉的協(xié)議解析器模板,不熟悉的朋友可以到解析RedisCluster原理了解更多。

//收到服務(wù)端的響應(yīng)

func(client*Client)finishRequest(replyredis.Reply){

deferfunc(){

iferr:=recover();err!=nil{

debug.PrintStack()

logger.Error(err)

request:=-client.waitingReqs

ifrequest==nil{

return

request.reply=reply

ifrequest.waiting!=nil{

request.waiting.Done()

//讀協(xié)程是個RESP協(xié)議解析器

func(client*Client)handleRead()error{

ch:=parser.ParseStream(client.conn)

forpayload:=rangech{

ifpayload.Err!=nil{

client.finishRequest(reply.MakeErrReply(payload.Err.Error()))

continue

client.finishRequest(payload.Data)

returnnil

}

最后編寫client的構(gòu)造器和啟動異步協(xié)程的代碼:

funcMakeClient(addrstring)(*Client,error){

conn,err:=net.Dial("tcp",addr)

iferr!=nil{

returnnil,err

ctx,cancel:=context.WithCancel(context.Background())

returnClient{

addr:addr,

conn:conn,

sendingReqs:make(chan*Request,chanSize),

waitingReqs:make(chan*Request,chanSize),

ctx:ctx,

cancelFunc:cancel,

writing:sync.WaitGroup{},

},nil

func(client*Client)Start(){

client.ticker=time.NewTicker(10*time.Second)

goclient.handleWrite()

gofunc(){

err:=client.handleRead()

logger.Warn(err)

goclient.heartbeat()

}

關(guān)閉client的時候記得等待請求完成:

func(client*Client)Close(){

//先阻止新請求進(jìn)入隊(duì)列

close(client.sendingReqs)

//等待處理中的請求完成

client.writing.Wait()

//釋放資源

_=client.conn.Close()//關(guān)閉與服務(wù)端的連接,連接關(guān)閉后讀協(xié)程會退出

client.cancelFunc()//使用context關(guān)閉讀協(xié)程

close(client.waitingReqs)//關(guān)閉隊(duì)列

}

測試一下:

funcTestClient(t*testing.T){

client,err:=MakeClient("localhost:6379")

iferr!=nil{

t.Error(err)

client.Start()

result=client.Send([][]byte{

[]byte("SET"),

[]byte("a"),

[]byte("a"),

ifstatusRet,ok:=result.(*reply.StatusReply);ok{

ifstatusRet.Status!="OK"{

t.Error("`set`failed,result:"+statusRet.Status)

result=client.Send([][]byte{

[]byte("GET"),

[]byte(

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論