Golang信號量設計實現(xiàn)示例詳解_第1頁
Golang信號量設計實現(xiàn)示例詳解_第2頁
Golang信號量設計實現(xiàn)示例詳解_第3頁
Golang信號量設計實現(xiàn)示例詳解_第4頁
Golang信號量設計實現(xiàn)示例詳解_第5頁
已閱讀5頁,還剩9頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領

文檔簡介

第Golang信號量設計實現(xiàn)示例詳解目錄開篇信號量semaphore擴展庫實現(xiàn)AcquireReleaseTryAcquire總結(jié)

開篇

在我們此前的文章GolangMutex原理解析中曾提到過,Mutex的底層結(jié)構(gòu)包含了兩個字段,state和sema:

typeMutexstruct{

stateint32

semauint32

state代表互斥鎖的狀態(tài),比如是否被鎖定;sema表示信號量,協(xié)程阻塞會等待該信號量,解鎖的協(xié)程釋放信號量從而喚醒等待信號量的協(xié)程。

這個sema就是semaphore信號量的意思。Golang協(xié)程之間的搶鎖,實際上爭搶給Locked賦值的權(quán)利,能給Locked置為1,就說明搶鎖成功。搶不到就阻塞等待sema信號量,一旦持有鎖的協(xié)程解鎖,那么等待的協(xié)程會依次被喚醒。

有意思的是,雖然semaphore在鎖的實現(xiàn)中起到了至關重要的作用,Golang對信號量的實現(xiàn)卻是隱藏在runtime中,并沒有包含到標準庫里來,在src源碼中我們可以看到底層依賴的信號量相關函數(shù)。

//definedinpackageruntime

//Semacquirewaitsuntil*s0andthenatomicallydecrementsit.

//Itisintendedasasimplesleepprimitiveforusebythesynchronization

//libraryandshouldnotbeuseddirectly.

funcruntime_Semacquire(s*uint32)

//Semreleaseatomicallyincrements*sandnotifiesawaitinggoroutine

//ifoneisblockedinSemacquire.

//Itisintendedasasimplewakeupprimitiveforusebythesynchronization

//libraryandshouldnotbeuseddirectly.

//Ifhandoffistrue,passcountdirectlytothefirstwaiter.

//skipframesisthenumberofframestoomitduringtracing,countingfrom

//runtime_Semrelease'scaller.

funcruntime_Semrelease(s*uint32,handoffbool,skipframesint)

runtime_Semacquire:阻塞等待直到s大于0,然后立刻將s減去1【原子操作】;runtime_Semrelease:將s增加1,然后通知一個阻塞在runtime_Semacquire的goroutine【原子操作】。

兩個原子操作,一個acquire,一個release,其實就代表了對資源的獲取和釋放。Mutex作為sync包的核心,支撐了RWMutex,channel,singleflight等多個并發(fā)控制的能力,而對信號量的管理又是Mutex的基礎。

雖然源碼看不到,但Golang其實在擴展庫/x/sync/semaphore也提供了一套信號量的實現(xiàn),我們可以由此來參考一下,理解semaphore的實現(xiàn)思路。

信號量

在看源碼之前,我們先理清楚【信號量】設計背后的場景和原理。

信號量的概念是荷蘭計算機科學家EdsgerDijkstra在1963年左右提出來的,廣泛應用在不同的操作系統(tǒng)中。在系統(tǒng)中,會給每一個進程一個信號量,代表每個進程目前的狀態(tài)。未得到控制權(quán)的進程,會在特定的地方被迫停下來,等待可以繼續(xù)進行的信號到來。

在Mutex依賴的信號量機制中我們可以看到,這里本質(zhì)就是依賴sema一個uint32的變量+原子操作來實現(xiàn)并發(fā)控制能力。當goroutine完成對信號量等待時,該變量-1,當goroutine完成對信號量的釋放時,該變量+1。

如果一個新的goroutine發(fā)現(xiàn)信號量不大于0,說明資源暫時沒有,就得阻塞等待。直到信號量0,此時的語義是有新的資源,該goroutine就會結(jié)束等待,完成對信號量的-1并返回。注意我們上面有提到,runtime支持的兩個方法都是原子性的,不用擔心兩個同時在等待的goroutine同時搶占同一份資源。

典型的信號量場景是【圖書館借書】。假設學校圖書館某熱門書籍現(xiàn)在只有100本存貨,但是上萬學生都想借閱,怎么辦?

直接買一萬本書是非常簡單粗暴的解法,但資源有限,這不是長久之計。

常見的解決方案很簡單:學生們先登記,一個一個來。我們先給100個同學發(fā)出,剩下的你們繼續(xù)等,等到什么時候借書的同學看完了,把書還回來了,就給排隊等待的同學們發(fā)放。同時,為了避免超發(fā),每發(fā)一個,都需要在維護的記錄里將【余量】減去1,每還回來一個,就把【余量】加上1。

runtime_Semacquire就是排隊等待借書,runtime_Semrelease就是看完了把書歸還給圖書館。

另外需要注意,雖然我們上面舉例的增加/減小的粒度都是1,但這本質(zhì)上只是一種場景,事實上就算是圖書館借書,也完全有可能出現(xiàn)一個人同時借了兩本一模一樣的書。所以,信號量的設計需要支持N個資源的獲取和釋放。

所以,我們對于acquire和release兩種操作的語義如下:

release:將信號量增加n【保證原子性】;acquire:若信號量n,阻塞等待,直到信號量=n,此時將信號量的值減去n【保證原子性】。

semaphore擴展庫實現(xiàn)

這里我們結(jié)合/x/sync/semaphore源碼來看看怎樣設計出來我們上面提到的信號量結(jié)構(gòu)。

//NewWeightedcreatesanewweightedsemaphorewiththegiven

//maximumcombinedweightforconcurrentaccess.

funcNewWeighted(nint64)*Weighted{

w:=Weighted{size:n}

returnw

//Weightedprovidesawaytoboundconcurrentaccesstoaresource.

//Thecallerscanrequestaccesswithagivenweight.

typeWeightedstruct{

sizeint64//最大資源數(shù)

curint64//當前已被使用的資源

musync.Mutex

waiterslist.List//等待隊列

有意思的是,雖然包名是semaphore,但是擴展庫里真正給【信號量結(jié)構(gòu)體】定義的名稱是Weighted。從上面的定義我們可以看到,傳入初始資源個數(shù)n(對應size),就可以生成一個Weighted信號量結(jié)構(gòu)。

Weighted提供了三個方法來實現(xiàn)對信號量機制的支持:

Acquire

對應上面我們提到的acquire語義,注意我們提到過,抽象的來講,acquire成功與否其實不太看返回值,而是只要獲取不了就一直阻塞,如果返回了,就意味著獲取到了。

但在Golang實現(xiàn)當中,我們肯定不希望,如果發(fā)生了異常case,導致一直阻塞在這里。所以你可以看到Acquire的入?yún)⒗镉袀€context.Context,借用context的上下文控制能力,你可以對此進行cancel,可以設置timeout等待超時,就能對acquire行為進行更多約束。

所以,acquire之后我們?nèi)匀恍枰獧z查返回值error,如果為nil,代表正常獲取了資源。否則可能是context已經(jīng)不合法了。

Release

跟上面提到的release語義完全一致,傳入你要釋放的資源數(shù)n,保證原子性地增加信號量。

TryAcquire

這里其實跟sync包中的各類TryXXX函數(shù)定位很像。并發(fā)的機制中大都包含fastpath和slowpath,比如首個goroutine先來acquire,那么一定是能拿到的,后續(xù)再來請求的goroutine由于慢了一步,只能走slowpath進行等待,自旋等操作。sync包中絕大部分精華,都在于slowpath的處理。fastpath大多是一個基于atomic包的原子操作,比如CAS就可以解決。

TryAcquire跟Acquire的區(qū)別在于,雖然也是要資源,但是不等待。有了我就獲取,就減信號量,返回trye。但是如果目前還沒有,我不會阻塞在這里,而是直接返回false。

下面我們逐個方法看看,Weighted是怎樣實現(xiàn)的。

Acquire

//Acquireacquiresthesemaphorewithaweightofn,blockinguntilresources

//areavailableorctxisdone.Onsuccess,returnsnil.Onfailure,returns

//ctx.Err()andleavesthesemaphoreunchanged.

//Ifctxisalreadydone,Acquiremaystillsucceedwithoutblocking.

func(s*Weighted)Acquire(ctxcontext.Context,nint64)error{

s.mu.Lock()

ifs.size-s.cur=ns.waiters.Len()==0{

s.cur+=n

s.mu.Unlock()

returnnil

ifns.size{

//Don'tmakeotherAcquirecallsblockononethat'sdoomedtofail.

s.mu.Unlock()

-ctx.Done()

returnctx.Err()

ready:=make(chanstruct{})

w:=waiter{n:n,ready:ready}

elem:=s.waiters.PushBack(w)

s.mu.Unlock()

select{

case-ctx.Done():

err:=ctx.Err()

s.mu.Lock()

select{

case-ready:

//Acquiredthesemaphoreafterwewerecanceled.Ratherthantryingto

//fixupthequeue,justpretendwedidn'tnoticethecancelation.

err=nil

default:

isFront:=s.waiters.Front()==elem

s.waiters.Remove(elem)

//Ifwe'reatthefrontandthere'reextratokensleft,notifyotherwaiters.

ifisFronts.sizes.cur{

s.notifyWaiters()

s.mu.Unlock()

returnerr

case-ready:

returnnil

在閱讀之前回憶一下上面Weighted結(jié)構(gòu)的定義,注意Weighted并沒有維護一個變量用來表示【當前剩余的資源】,這一點是通過size(初始化的時候設置,表示總資源數(shù))減去cur(當前已被使用的資源),二者作差得到的。

我們來拆解一下上面這段代碼:

第一步:這是常規(guī)意義上的fastpath

s.mu.Lock()

ifs.size-s.cur=ns.waiters.Len()==0{

s.cur+=n

s.mu.Unlock()

returnnil

先上鎖,保證并發(fā)安全;校驗如果size-cur=n,代表剩余的資源是足夠,同時waiters這個等待隊列為空,代表沒有別的協(xié)程在等待;此時就沒什么多想的,直接cur加上n即可,代表又消耗了n個資源,然后解鎖返回,很直接。

第二步:針對特定場景做提前剪枝

ifns.size{

//Don'tmakeotherAcquirecallsblockononethat'sdoomedtofail.

s.mu.Unlock()

-ctx.Done()

returnctx.Err()

如果請求的資源數(shù)量,甚至都大于資源總數(shù)量了,說明這個協(xié)程心里沒數(shù)。。。。就算我現(xiàn)在把所有初始化的資源都拿回來,也喂不飽你呀?。?!那能怎么辦,我就不煩勞后面流程處理了,直接等你的context什么時候Done,給你返回context的錯誤就行了,同時先解個鎖,別耽誤別的goroutine拿資源。

第三步:資源是夠的,只是現(xiàn)在沒有,那就把當前goroutine加到排隊的隊伍里

ready:=make(chanstruct{})

w:=waiter{n:n,ready:ready}

elem:=s.waiters.PushBack(w)

s.mu.Unlock()

這里ready結(jié)構(gòu)是個空結(jié)構(gòu)體的channel,僅僅是為了實現(xiàn)協(xié)程間通信,通知什么時候資源ready,建立一個屬于這個goroutine的waiter,然后塞到Weighted結(jié)構(gòu)的等待隊列waiters里。

搞定了以后直接解鎖,因為你已經(jīng)來排隊了,手續(xù)處理完成,以后的路有別的通知機制保證,就沒必要在這兒拿著鎖阻塞新來的goroutine了,人家也得排隊。

第四步:排隊等待

select{

case-ctx.Done():

err:=ctx.Err()

s.mu.Lock()

select{

case-ready:

//Acquiredthesemaphoreafterwewerecanceled.Ratherthantryingto

//fixupthequeue,justpretendwedidn'tnoticethecancelation.

err=nil

default:

isFront:=s.waiters.Front()==elem

s.waiters.Remove(elem)

//Ifwe'reatthefrontandthere'reextratokensleft,notifyotherwaiters.

ifisFronts.sizes.cur{

s.notifyWaiters()

s.mu.Unlock()

returnerr

case-ready:

returnnil

一個select語句,只看兩種情況:1.這個goroutine的context超時了;2.拿到了資源,皆大歡喜。

重點在于ctx.Done分支里default的處理。我們可以看到,如果是超時了,此時還沒拿到資源,首先會把當前goroutine從waiters等待隊列里移除(合情合理,你既然因為自己的原因做不了主,沒法繼續(xù)等待了,就別耽誤別人事了)。

然后接著判斷,若這個goroutine同時也是排在最前的goroutine,而且恰好現(xiàn)在有資源了,就趕緊通知隊里的goroutine們,伙計們,現(xiàn)在有資源了,趕緊來拿。我們來看看這個notifyWaiters干了什么:

func(s*Weighted)notifyWaiters(){

for{

next:=s.waiters.Front()

ifnext==nil{

break//Nomorewaitersblocked.

w:=next.Value.(waiter)

ifs.size-s.curw.n{

//Notenoughtokensforthenextwaiter.Wecouldkeepgoing(totryto

//findawaiterwithasmallerrequest),butunderloadthatcouldcause

//starvationforlargerequests;instead,weleaveallremainingwaiters

//blocked.

//Considerasemaphoreusedasaread-writelock,withNtokens,N

//readers,andonewriter.EachreadercanAcquire(1)toobtainaread

//lock.ThewritercanAcquire(N)toobtainawritelock,excludingall

//ofthereaders.Ifweallowthereaderstojumpaheadinthequeue,

//thewriterwillstarve—thereisalwaysonetokenavailableforevery

//reader.

break

s.cur+=w.n

s.waiters.Remove(next)

close(w.ready)

其實很簡單,遍歷waiters這個等待隊列,拿到排隊最前的waiter,判斷資源夠不夠,如果夠了,增加cur變量,資源給你,然后把你從等待隊列里移出去,再closeready那個goroutine就行,算是通知一下。

重點部分在于,如果資源不夠怎么辦?

想象一下現(xiàn)在的處境,Weighted這個semaphore的確有資源,而目前要處理的這個goroutine的的確確就是排隊最靠前的,而且人家也沒獅子大開口,要比你總size還大的資源。但是,但是,好巧不巧,現(xiàn)在你要的數(shù)量,比我手上有的少。。。。

很無奈,那怎么辦呢?

無非兩種解法:

我先不管你,反正你要的不夠,我先看看你后面那個goroutine人家夠不夠,雖然你現(xiàn)在是排位第一個,但是也得繼續(xù)等著;沒辦法,你排第一,需求我就得滿足,所以我們都繼續(xù)等,等啥時候資源夠了就給你。

擴展庫實際選用的是第2種策略,即一定要滿足排在最前面的goroutine,這里的考慮在注釋里有提到,如果直接繼續(xù)看后面的goroutine夠不夠,優(yōu)先滿足后面的,在一些情況下會餓死有大資源要求的goroutine,設計上不希望這樣的情況發(fā)生。

簡單說:要的多不是錯,既然你排第一,目前貨不多,那就大家一起阻塞等待,保障你的權(quán)利。

Release

//Releasereleasesthesemaphorewithaweightofn.

func(s*Weighted)Rele

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論