版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
第golang高并發(fā)系統(tǒng)限流策略漏桶和令牌桶算法源碼剖析目錄前言漏桶算法樣例源碼實現(xiàn)令牌桶算法樣例源碼剖析Limit類型Limiter結構體Reservation結構體Limiter消費tokenlimiter歸還Token總結
前言
今天與大家聊一聊高并發(fā)系統(tǒng)中的限流技術,限流又稱為流量控制,是指限制到達系統(tǒng)的并發(fā)請求數(shù),當達到限制條件則可以拒絕請求,可以起到保護下游服務,防止服務過載等作用。常用的限流策略有漏桶算法、令牌桶算法、滑動窗口;下文主要與大家一起分析一下漏桶算法和令牌桶算法,滑動窗口就不在這里這介紹了。好啦,廢話不多話,開整。
文中測試代碼已上傳:/asong2025/G
漏桶算法
漏桶算法比較好理解,假設我們現(xiàn)在有一個水桶,我們向這個水桶里添水,雖然我們我們無法預計一次會添多少水,也無法預計水流入的速度,但是可以固定出水的速度,不論添水的速率有多大,都按照固定的速率流出,如果桶滿了,溢出的上方水直接拋棄。我們把水當作HTTP請求,每次都把請求放到一個桶中,然后以固定的速率處理請求,說了這么多,不如看一個圖加深理解(圖片來自于網絡,手殘黨不會畫,多多包涵):
原理其實很簡單,就看我們怎么實現(xiàn)它了,uber團隊有一個開源的uber-go/ratelimit庫,這個庫就是漏桶的一種實現(xiàn),下面我們一起來看一看他的實現(xiàn)思路。
樣例
學習一個新東西的時候,往往是從會用開始的,慢慢才能明白其實現(xiàn)原理,所以我們先來看看這個庫是怎樣使用的,這里我們直接提供一個實際使用例子,配合Gin框架,我們添加一個限流中間件,來達到請求限流的作用,測試代碼如下:
//定義全局限流器對象
varrateLimitratelimit.Limiter
//在gin.HandlerFunc加入限流邏輯
funcleakyBucket()gin.HandlerFunc{
prev:=time.Now()
returnfunc(c*gin.Context){
now:=rateLimit.Take()
fmt.Println(now.Sub(prev))//為了打印時間間隔
prev=now//記錄上一次的時間,沒有這個打印的會有問題
funcmain(){
rateLimit=ratelimit.New(10)
r:=gin.Default()
r.GET("/ping",leakyBucket(),func(c*gin.Context){
c.JSON(200,true)
r.Run()//listenandserveon:8080(forwindows"localhost:8080")
我們簡單使用壓測工具ab測試一下:ab-n10-c2:8080/ping,執(zhí)行結果部分如下:
觀察結果可知,每次處理請求的時間間隔是10ms,并且后面的請求耗時越來越久,為什么會這樣呢?這里先賣個小關子,看完uber的實現(xiàn)你就知道了~
源碼實現(xiàn)
我們首先來看一下其核心結構:
typelimiterstruct{
sync.Mutex
lasttime.Time
sleepFortime.Duration
perRequesttime.Duration
maxSlacktime.Duration
clockClock
typeLimiterinterface{
//TakeshouldblocktomakesurethattheRPSismet.
Take()time.Time
限制器接口只提供了一個方法take(),take()方法會阻塞確保兩次請求之間的時間走完,具體實現(xiàn)我們在下面進行分析。實現(xiàn)限制器接口的結構體中各個字段的意義如下:
sync.Mutext:互斥鎖,控制并發(fā)的作用
last:記錄上一次的時刻
sleepFor:距離處理下一次請求需要等待的時間
perRequest:每次請求的時間間隔
maxSlack:最大松弛量,用來解決突發(fā)流量
clock:一個時鐘或模擬時鐘,提供了now和sleep方法,是實例化速率限制器
要是用該限制器,首先需要通過New方法進行初始化,一個必傳的參數(shù)是rate,代表的是每秒請求量(RPS),還有一個可選參數(shù),參數(shù)類型option,也就是我們可以自定義limit,不過一般使用場景不多,這里就不過多介紹了。我主要看一下他是怎么保證固定速率的,截取New方法部分代碼如下:
l:=limiter{
perRequest:time.Second/time.Duration(rate),
maxSlack:-10*time.Second/time.Duration(rate),
根據(jù)我們傳入的請求數(shù)量,能計算出1s內要通過n個請求,每個請求之間的間隔時間是多少,這樣在take方法中就可以根據(jù)這個字段來處理請求的固定速率問題,這里還初始化了最大松弛化字段,他的值是負數(shù),默認最大松弛量是10個請求的時間間隔。
接下來我們主要看一下take方法:
func(t*limiter)Take()time.Time{
t.Lock()
defert.Unlock()
now:=t.clock.Now()
ift.last.IsZero(){
t.last=now
returnt.last
t.sleepFor+=t.perRequest-now.Sub(t.last)
ift.sleepFort.maxSlack{
t.sleepFor=t.maxSlack
ift.sleepFor0{
t.clock.Sleep(t.sleepFor)
t.last=now.Add(t.sleepFor)
t.sleepFor=0
}else{
t.last=now
returnt.last
take()方法的執(zhí)行步驟如下:
為了控制并發(fā),所以進入該方法就需要進行上鎖,該鎖的粒度比較大,整個方法都加上了鎖通過IsZero方法來判斷當前是否是第一次請求,如果是第一次請求,直接取now時間即可返回。如果不是第一次請求,就需要計算距離處理下一次請求需要等待的時間,這里有一個要注意點的是累加需要等待的時間,目的是可以給后面的抵消使用如果當前累加需要等待的時間大于最大松弛量了,將等待的時間設置為最大松弛量的時間。如果當前請求多余的時間無法完全抵消此次的所需量,調用sleep方法進行阻塞,同時清空等待的時間。如果sleepFor小于0,說明此次請求時間間隔大于預期間隔,也就說無需等待可以直接處理請求。
步驟其實不是很多,主要需要注意一個知識點最大松弛量。
漏桶算法有個天然缺陷就是無法應對突發(fā)流量(勻速,兩次請求req1和req2之間的延遲至少應該=perRequest),舉個例子說明:假設我們現(xiàn)在有三個請求req1、req2、req3按順序處理,每個請求處理間隔為100ms,req1請求處理完成之后150ms,req2請求到來,依據(jù)限速策略可以對req2立即處理,當req2完成后,50ms后,req3到來,這個時候距離上次請求還不足100ms,因此還需要等待50ms才能繼續(xù)執(zhí)行,但是,對于這種情況,實際上這三個請求一共消耗了250ms才完成,并不是預期的200ms。
對于上面這種情況,我們可以把之前間隔比較長的請求的時間勻給后面的請求判斷限流時使用,減少請求等待的時間了,但是當兩個請求之間到達的間隔比較大時,就會產生很大的可抵消時間,以至于后面大量請求瞬間到達時,也無法抵消這個時間,那樣就已經失去了限流的意義,所以引入了最大松弛量(maxSlack)的概念,該值為負值,表示允許抵消的最長時間,防止以上情況的出現(xiàn)。
以上就是漏桶實現(xiàn)的基本思路了,整體還是很簡單的,你學會了嗎?
令牌桶算法
令牌桶其實和漏桶的原理類似,令牌桶就是想象有一個固定大小的桶,系統(tǒng)會以恒定速率向桶中放Token,桶滿則暫時不放。從網上找了圖,表述非常恰當:
關于令牌桶限流算法的實現(xiàn),Github有一個高效的基于令牌桶限流算法實現(xiàn)的限流庫:/juju/ratelimit,Golang的timer/rate也是令牌桶的一種實現(xiàn),本文就不介紹juju/ratelimit庫了,有興趣的自己學習一下的他的實現(xiàn)思想吧,我們主要來看一看time/rate是如何實現(xiàn)的。
還是老樣子,我們還是結合gin寫一個限流中間件看看他是怎么使用的,例子如下:
import(
"net/http"
"time"
"/gin-gonic/gin"
"/x/time/rate"
varrateLimit*rate.Limiter
functokenBucket()gin.HandlerFunc{
returnfunc(c*gin.Context){
ifrateLimit.Allow(){
c.String(http.StatusOK,"ratelimit,Drop")
c.Abort()
return
c.Next()
funcmain(){
limit:=rate.Every(100*time.Millisecond)
rateLimit=rate.NewLimiter(limit,10)
r:=gin.Default()
r.GET("/ping",tokenBucket(),func(c*gin.Context){
c.JSON(200,true)
r.Run()//listenandserveon:8080(forwindows"localhost:8080")
上面的例子我們首先調用NewLimiter方法構造一個限流器,第一個參數(shù)是rlimit,代表每秒可以向Token桶中產生多少token,第二個參數(shù)是bint,代表Token桶的容量大小,對于上面的例子,表示每100ms往桶中放一個token,也就是1s鐘產生10個,桶的容量就是10。消費token的方法這里我們使用Allow方法,Allow實際上就是AllowN(time.Now(),1),AllowN方法表示,截止到某一時刻,目前桶中數(shù)目是否至少為n個,滿足則返回true,同時從桶中消費n個token。反之返回不消費Token。對應上面的例子,當桶中的數(shù)目不足于1個時,就會丟掉該請求。
源碼剖析
Limit類型
time/rate自定義了一個limit類型,其實他本質就是float64的別名,Limit定了事件的最大頻率,表示每秒事件的數(shù)據(jù)量,0就表示無限制。Inf是無限的速率限制;它允許所有事件(即使突發(fā)為0)。還提供Every方法來指定向Token桶中放置Token的間隔,計算出每秒時間的數(shù)據(jù)量。
typeLimitfloat64
//Infistheinfiniteratelimit;itallowsallevents(evenifburstiszero).
constInf=Limit(math.MaxFloat64)
//EveryconvertsaminimumtimeintervalbetweeneventstoaLimit.
funcEvery(intervaltime.Duration)Limit{
ifinterval=0{
returnInf
return1/Limit(interval.Seconds())
Limiter結構體
typeLimiterstruct{
musync.Mutex
limitLimit
burstint
tokensfloat64
//lastisthelasttimethelimiter'stokensfieldwasupdated
lasttime.Time
//lastEventisthelatesttimeofarate-limitedevent(pastorfuture)
lastEventtime.Time
各個字段含義如下:
mu:互斥鎖、為了控制并發(fā)limit:每秒允許處理的事件數(shù)量,即每秒處理事件的頻率burst:令牌桶的最大數(shù)量,如果burst為0,并且limit==Inf,則允許處理任何事件,否則不允許tokens:令牌桶中可用的令牌數(shù)量last:記錄上次limiter的tokens被更新的時間lastEvent:lastEvent記錄速率受限制(桶中沒有令牌)的時間點,該時間點可能是過去的,也可能是將來的(Reservation預定的結束時間點)
Reservation結構體
typeReservationstruct{
okbool
lim*Limiter
tokensint
timeToActtime.Time
//ThisistheLimitatreservationtime,itcanchangelater.
limitLimit
各個字段含義如下:
ok:到截至時間是否可以獲取足夠的令牌
lim:limiter對象
tokens:需要獲取的令牌數(shù)量
timeToAct:需要等待的時間點
limit:代表預定的時間,是可以更改的。
reservation就是一個預定令牌的操作,timeToAct是本次預約需要等待到的指定時間點才有足夠預約的令牌。
Limiter消費token
Limiter有三個token的消費方法,分別是Allow、Reserve和Wait,最終三種消費方式都調用了reserveN、advance這兩個方法來生成和消費Token。所以我們主要看看reserveN、advance函數(shù)的具體實現(xiàn)。
advance方法的實現(xiàn):
func(lim*Limiter)advance(nowtime.Time)(newNowtime.Time,newLasttime.Time,newTokensfloat64){
//last不能在當前時間now之后,否則計算出來的elapsed為負數(shù),會導致令牌桶數(shù)量減少
last:=lim.last
ifnow.Before(last){
last=now
//根據(jù)令牌桶的缺數(shù)計算出令牌桶未進行更新的最大時間
maxElapsed:=lim.limit.durationFromTokens(float64(lim.burst)-lim.tokens)
elapsed:=now.Sub(last)//令牌桶未進行更新的時間段
ifelapsedmaxElapsed{
elapsed=maxElapsed
//根據(jù)未更新的時間(未向桶中加入令牌的時間段)計算出產生的令牌數(shù)
delta:=lim.limit.tokensFromDuration(elapsed)
tokens:=lim.tokens+delta//計算出可用的令牌數(shù)
ifburst:=float64(lim.burst);tokensburst{
tokens=burst
returnnow,last,tokens
advance方法的作用是更新令牌桶的狀態(tài),計算出令牌桶未更新的時間(elapsed),根據(jù)elapsed算出需要向桶中加入的令牌數(shù)delta,然后算出桶中可用的令牌數(shù)newTokens.
reserveN方法的實現(xiàn):reserveN是AllowN,ReserveN及WaitN的輔助方法,用于判斷在maxFutureReserve時間內是否有足夠的令牌。
//@paramn要消費的token數(shù)量
//@parammaxFutureReserve愿意等待的最長時間
func(lim*Limiter)reserveN(nowtime.Time,nint,maxFutureReservetime.Duration)Reservation{
lim.mu.Lock()
//如果沒有限制
iflim.limit==Inf{
lim.mu.Unlock()
returnReservation{
ok:true,//桶中有足夠的令牌
lim:lim,
tokens:n,
timeToAct:now,
//更新令牌桶的狀態(tài),tokens為目前可用的令牌數(shù)量
now,last,tokens:=lim.advance(now)
//計算取完之后桶還能剩能下多少token
tokens-=float64(n)
varwaitDurationtime.Duration
//如果token0,說明目前的token不夠,需要等待一段時間
iftokens0{
waitDuration=lim.limit.durationFromTokens(-tokens)
ok:=n=lim.burstwaitDuration=maxFutureReserve
r:=Reservation{
ok:ok,
lim:lim,
limit:lim.limit,
//timeToAct表示當桶中滿足token數(shù)目等于n的時間
ifok{
r.tokens=n
r.timeToAct=now.Add(waitDuration)
//更新桶里面的token數(shù)目
//更新last時間
//lastEvent
ifok{
lim.last=now
lim.tokens=tokens
lim.lastEvent=r.timeToAct
}else{
lim.last=last
lim.mu.Unlock()
returnr
上面的代碼我已經進行了注釋,這里在總結一下流程:
首選判斷是否擁有速率限制,沒有速率限制也就是桶中一致?lián)碛凶銐虻牧钆啤S嬎銖纳洗稳oken的時間到當前時刻,期間一共新產生了多少Token:我們只在取Token之前生成新的Token,也就意味著每次取Token的間隔,實際上也是生成Token的間隔。我們可以利用tokensFromDuration,輕易的算出這段時間一共產生Token的數(shù)目。所以當前Token數(shù)目=新產生的Token數(shù)目+之前剩余的Token數(shù)目-要消費的Token數(shù)目。如果消費后剩余Token數(shù)目大于零,說明此時Token桶內仍不為空,此時Token充足,無需調用側等待。如果Token數(shù)目小于零,則需等待一段時間。那么這個時候,我們可以利用durationFromTokens將當前負值的Token數(shù)轉化為需要等待的時間。將需要等待的時間等相關結果返回給調用方
其實整個過程就是利用了Token數(shù)可以和時間相互轉化的原理。而如果Token數(shù)為負,則需要等待相應時間即可。
上面提到了durationFromTokens、tokensFromDuration這兩個方法,是關鍵,他們的實現(xiàn)如下:
func(limitLimit)durationFromTokens(tokensfloat64)time.Duration{
seconds:=tokens/float64(limit)
returntime.Nanosecond*time.Duration(1e9*seconds)
func(limitLimit)tokensFromDuration(dtime.Duration)float64{
//Splittheintegerandfractionalpartsourselftominimizeroundingerrors.
//See/issues/34861.
sec:=float64(d/time.Second)*float64(limit)
nsec:=float64(d%time.Second)*float64(limit)
returnsec+nsec/1e9
durationFromTokens:功能是計算出生成N個新的Token一共需要多久。tokensFromDuration:給定一段時長,這段時間一共可以生成多少個Token。
細心的網友會發(fā)現(xiàn)tokensFromDuration方法既然是計算一段時間一共可以生成多少個Token,為什么不直接進行相乘呢?其實Golang最初的版本就是采用d.Seconds()*float64(limit)直接相乘實現(xiàn)的,雖然看上去一點問題沒有,但是這里是兩個小數(shù)相乘,會帶來精度損失,所以采用現(xiàn)在這種方法實現(xiàn),分別求出秒的整數(shù)部分和小數(shù)部分,進行相乘后再相加,這樣可以得到最精確的精度。
limiter歸還Token
既然我們可以消費Token,那么對應也可以取消此次消費,將token歸還,當調用Cancel()函數(shù)時,消費的Token數(shù)將會盡可能歸還給Token桶。歸還也并不是那么簡單,接下我們我們看看歸還token是如何實現(xiàn)的。
func(r*Reservation)CancelAt(nowtime.Time){
if!r.ok{
return
r.lim.mu.Lock()
deferr.lim.mu.Unlock()
1.如果無需限流
2.tokens為0(需要獲取的令牌數(shù)量為0)
3.已經過了截至時間
以上三種情況無需處理取消操作
ifr.lim.limit==Inf||r.tokens==0||r.timeToAct.Before(now){
return
//計算出需要還原的令牌數(shù)量
//這里的r.lim.lastEvent可能是本次Reservation的結束時間,也可能是后來的Reservation的結束時間,所以要把本次結束時間點(r.timeToAct)之后產生的令牌數(shù)減去
restoreTokens:=float64(r.tokens)-r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
//當小于0,表示已經都預支完了,不能歸還了
ifrestoreTokens=0{
return
//從新計算令牌桶的狀態(tài)
now,_,tokens:=r.lim.advance(now)
//還原當前令牌桶的令牌數(shù)量,當前的令牌數(shù)tokens加上需要還原的令牌數(shù)restoreTokens
tokens+=restoreTokens
//如果tokens大于桶的最大容量,則將tokens置為桶的最大容量
ifburst:=float64(r.lim.burst)
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 吉林省吉林市蛟河市2025-2026學年七年級上學期1月期末考試地理試卷(無答案)
- 貴州省安順市2025-2026學年上學期期末高二數(shù)學試卷(含答案)
- 廣東省中山市2025-2026學年八年級上學期期末測試地理試卷(無答案)
- 2025-2026學年山東省煙臺市高三(上)期末數(shù)學試卷(含答案)
- 飛機配送員培訓課件模板
- 2026年玉灃科技(西安)有限公司招聘(39人)備考考試題庫及答案解析
- 2026山東事業(yè)單位統(tǒng)考煙臺招遠市招聘47人備考考試題庫及答案解析
- 2026年度延邊州教育局所屬事業(yè)單位教師專項招聘(53人)參考考試題庫及答案解析
- 取電施工方案(3篇)
- 防疫場地設施管理制度(3篇)
- 數(shù)字孿生方案
- 金融領域人工智能算法應用倫理與安全評規(guī)范
- 機動車駕校安全培訓課件
- 2025年役前訓練考試題庫及答案
- 2024VADOD臨床實踐指南:耳鳴的管理課件
- 2025年湖南省公務員錄用考試錄用考試《申論》標準試卷及答案
- 行政崗位面試問題庫及應對策略
- 2025廣東潮州府城文化旅游投資集團有限公司下屬企業(yè)副總經理崗位招聘1人筆試歷年備考題庫附帶答案詳解2套試卷
- 城市軌道交通服務與管理崗位面試技巧
- 2025年公務員多省聯(lián)考《申論》題(陜西A卷)及參考答案
- GB/T 46607.1-2025塑料熱固性粉末模塑料(PMCs)試樣的制備第1部分:一般原理及多用途試樣的制備
評論
0/150
提交評論