Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解_第1頁
Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解_第2頁
Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解_第3頁
Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解_第4頁
Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解_第5頁
已閱讀5頁,還剩12頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

第Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解目錄前言案例演示和execute區(qū)別原理實(shí)現(xiàn)RunnableFuture類介紹任務(wù)執(zhí)行run()原理任務(wù)結(jié)果獲取get()原理任務(wù)取消cancel()原理總結(jié)

前言

Java線程池中提交任務(wù)運(yùn)行,通常使用execute()方法就足夠了。那如果想要實(shí)現(xiàn)在主線程中阻塞獲取線程池任務(wù)運(yùn)行的結(jié)果,該怎么辦呢?答案是用submit()方法提交任務(wù)。這也是面試中經(jīng)常被問到的一個(gè)知識(shí)點(diǎn),execute()和submit()提交任務(wù)的的區(qū)別是什么?底層是如何實(shí)現(xiàn)的?

案例演示

現(xiàn)在我們通過簡單的例子演示下submit()方法的妙處。

@Test

publicvoidtestSubmit()throwsExecutionException,InterruptedException{

//創(chuàng)建一個(gè)核心線程數(shù)為5的線程池

ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(5,10,30,TimeUnit.SECONDS,newArrayBlockingQueue(50));

//創(chuàng)建一個(gè)計(jì)算任務(wù)

CallableIntegermyTask=newCallableInteger(){

@Override

publicIntegercall()throwsException{

intresult=0;

for(inti=0;i10000;i++){

result+=i;

Thread.sleep(1000);

returnresult;

("startsubmittask.....");

FutureIntegerfuture=threadPoolExecutor.submit(myTask);

Integersum=future.get();

("getsubmitresult:[{}]",sum);

//usesumdootherthings

運(yùn)行結(jié)果:

主線程的確阻塞等待線程返回。

Future類API

我們看到用submit提交任務(wù)最后返回一個(gè)Future對(duì)象,F(xiàn)uture表示異步計(jì)算的結(jié)果。那它都提供了什么API呢?

方法說明Vget()等待任務(wù)執(zhí)行完成,然后獲取其結(jié)果。Vget(longtimeout,TimeUnitunit)等待獲取任務(wù)執(zhí)行的結(jié)果,如果任務(wù)超過一定時(shí)間沒有執(zhí)行完畢,直接返回,拋出異常,不會(huì)一直等待下去。booleanisDone()如果此任務(wù)已完成,則返回true。完成可能是由于正常終止、異?;蛉∠谒羞@些情況下,該方法都將返回true。booleanisCancelled()如果該任務(wù)在正常完成之前被取消,則返回true。booleancancel(booleanmayInterruptIfRunning)試圖取消此任務(wù)的執(zhí)行。1.如果任務(wù)已經(jīng)完成、已經(jīng)取消或由于其他原因無法取消,則此嘗試將失敗。

如果在調(diào)用cancel時(shí)此任務(wù)尚未啟動(dòng),則此任務(wù)不應(yīng)運(yùn)行。如果任務(wù)已經(jīng)開始,那么mayInterruptIfRunning參數(shù)確定是否應(yīng)該中斷執(zhí)行此任務(wù)的線程以試圖停止該任務(wù)。|

和execute區(qū)別

從功能層面,我們已經(jīng)很明白他們最大區(qū)別,

execute()方式提交任務(wù)沒有返回值,直接線程中池異步運(yùn)行任務(wù)。submit()方式提交任務(wù)有返回值Future,調(diào)用get方法可以阻塞調(diào)用線程,等待任務(wù)運(yùn)行返回的結(jié)果。

那從源碼層面,二者又有什么區(qū)別和聯(lián)系呢?

我們看下submit()提交的入口方法,代碼如下:

//AbstractExecutorService#submit

publicTFutureTsubmit(CallableTtask){

//判空處理

if(task==null)thrownewNullPointerException();

//將提交的任務(wù)包裝成RunnableFuture

RunnableFutureTftask=newTaskFor(task);

//最終還是調(diào)用execute方法執(zhí)行任務(wù)

execute(ftask);

returnftask;

殊途同歸,最終都是調(diào)用execute()方法,只不過submit()方法在調(diào)用前做一層包裝,將任務(wù)包裝成RunnableFuture對(duì)象。

關(guān)于線程池中execute()方法提交的流程和原理實(shí)現(xiàn)不理解的,強(qiáng)烈建議先學(xué)習(xí)這篇文章:Java線程池源碼深度解析。

原理實(shí)現(xiàn)

本節(jié)內(nèi)容我們聚焦在submit()方法的實(shí)現(xiàn)原理。

我們先思考下,如果讓我們?cè)O(shè)計(jì)實(shí)現(xiàn)調(diào)用get阻塞知道線程返回結(jié)果,要考慮哪些方面呢

任務(wù)是否執(zhí)行結(jié)束或者執(zhí)行出錯(cuò)等情況,是不是需要有個(gè)狀態(tài)位標(biāo)記?任務(wù)的執(zhí)行結(jié)果如何保存?如果任務(wù)沒有執(zhí)行結(jié)束,如何阻塞當(dāng)前線程,LockSupport.park()是一種方式。如果有多個(gè)外部線程獲取get,是不是應(yīng)該也要把外部線程存下來,怎么存?因?yàn)楹竺嫒蝿?wù)執(zhí)行完后需要喚醒他們。

帶著這些問題和基本思路我們看下jdk8中是如何實(shí)現(xiàn)的。

RunnableFuture類介紹

submit()方法中調(diào)用newTaskFor()方法獲取RunnableFuture對(duì)象。

//AbstractExecutorService#newTaskFor

protectedTRunnableFutureTnewTaskFor(CallableTcallable){

//調(diào)用FutureTask的構(gòu)造方法返回RunnableFuture對(duì)象

returnnewFutureTaskT(callable);

FutureTask類結(jié)構(gòu)圖如下:

FutureTask是一個(gè)異步計(jì)算任務(wù),包裝了我們外部提交的任務(wù)。

實(shí)現(xiàn)了Runnable接口實(shí)現(xiàn)了Future接口,該接口封裝了任務(wù)結(jié)果的獲取、任務(wù)是否結(jié)束等接口。

RunnableFuture類重要屬性

1.任務(wù)運(yùn)行狀態(tài)state

//存儲(chǔ)當(dāng)前任務(wù)運(yùn)行狀態(tài)

privatevolatileintstate;

//當(dāng)前任務(wù)尚未執(zhí)行

privatestaticfinalintNEW=0;

//當(dāng)前任務(wù)正在結(jié)束,尚未完全結(jié)束,一種臨界狀態(tài)

privatestaticfinalintCOMPLETING=1;

//當(dāng)前任務(wù)正常結(jié)束

privatestaticfinalintNORMAL=2;

//當(dāng)前任務(wù)執(zhí)行過程中發(fā)生了異常

privatestaticfinalintEXCEPTIONAL=3;

//當(dāng)前任務(wù)被取消

privatestaticfinalintCANCELLED=4;

//當(dāng)前任務(wù)中斷中

privatestaticfinalintINTERRUPTING=5;

//當(dāng)前任務(wù)已中斷

privatestaticfinalintINTERRUPTED=6;

可能的狀態(tài)轉(zhuǎn)換有如下幾種:

NEW-COMPLETING-NORMALNEW-COMPLETING-EXCEPTIONALNEW-CANCELLEDNEW-INTERRUPTING-INTERRUPTED

2.真正要執(zhí)行的任務(wù)callble

//存放真正提交的原始任務(wù)

privateCallableVcallable;

3.存放執(zhí)行結(jié)果outcome

返回的結(jié)果或從get()中拋出的異常

privateObjectoutcome;

4.當(dāng)前正在運(yùn)行任務(wù)的線程runner

//當(dāng)前任務(wù)被線程執(zhí)行期間,保存當(dāng)前任務(wù)的線程對(duì)象引用

privatevolatileThreadrunner;

5.調(diào)用get獲取任務(wù)結(jié)果的等待線程集合waiters

//因?yàn)闀?huì)有很多線程去get當(dāng)前任務(wù)的結(jié)果,所以這里使用了一種stack數(shù)據(jù)結(jié)構(gòu)來保存

privatevolatileWaitNodewaiters;

staticfinalclassWaitNode{

volatileThreadthread;

volatileWaitNodenext;

WaitNode(){thread=Thread.currentThread();}

數(shù)據(jù)結(jié)構(gòu)如下圖:

RunnableFuture類構(gòu)造方法

publicFutureTask(CallableVcallable){

if(callable==null)

thrownewNullPointerException();

//設(shè)置要執(zhí)行的任務(wù)

this.callable=callable;

//初始化時(shí)任務(wù)狀態(tài)為NEW

this.state=NEW;

任務(wù)執(zhí)行run()原理

submit()方法最終調(diào)用線程池的execute()方法,而execute()方法會(huì)創(chuàng)建出工人Worker對(duì)象,調(diào)用runWorker()方法,它主要是執(zhí)行外部提交的任務(wù),也就是這里的FutureTask對(duì)象的run()方法,我們重點(diǎn)看下run()方法。

FutureTask#run()開始執(zhí)行任務(wù)。

它主要的功能是完成包裝的callable的call方法執(zhí)行,并將執(zhí)行結(jié)果保存到outcome中,同時(shí)捕獲了call方法執(zhí)行出現(xiàn)的異常,并保存異常信息,而不是直接拋出。

publicvoidrun(){

//狀態(tài)機(jī)不為NEW表示執(zhí)行完成或任務(wù)被取消了,直接返回

//狀態(tài)機(jī)為NEW,同時(shí)將runner設(shè)置為當(dāng)前線程,保證同一時(shí)刻只有一個(gè)線程執(zhí)行run方法,如果設(shè)置失敗也直接返回

if(state!=NEW||

!UNSAFE.compareAndSwapObject(this,runnerOffset,

null,Thread.currentThread()))

return;

try{

CallableVc=callable;

//取出原始的任務(wù)檢測不為空且再次檢查狀態(tài)為NEW(雙重校驗(yàn))

if(c!=nullstate==NEW){

//任務(wù)運(yùn)行的結(jié)果

Vresult;

//任務(wù)是否運(yùn)行是否正常,true:正常,false-異常

booleanran;

try{

//任務(wù)執(zhí)行,將結(jié)果返回給result

result=c.call();

//設(shè)置任務(wù)運(yùn)行正常

ran=true;

}catch(Throwableex){

//任務(wù)運(yùn)行報(bào)錯(cuò)的情況

//設(shè)置結(jié)果為空

result=null;

//設(shè)置任務(wù)運(yùn)行異常標(biāo)記

ran=false;

//任務(wù)執(zhí)行拋出異常時(shí),保存異常信息,而不直接拋出

setException(ex);

//執(zhí)行成功則保存結(jié)果

if(ran)

set(result);

}finally{

//runnermustbenon-nulluntilstateissettledto

//preventconcurrentcallstorun()

//執(zhí)行完成后設(shè)置runner為null

runner=null;

//statemustbere-readafternullingrunnertoprevent

//leakedinterrupts

//獲取任務(wù)狀態(tài)

ints=state;

//如果被置為了中斷狀態(tài)則進(jìn)行中斷的處理

if(s=INTERRUPTING)

handlePossibleCancellationInterrupt(s);

FutureTask#set()方法處理正常執(zhí)行的運(yùn)行結(jié)果

setException()方法主要完成做下面的工作。

將執(zhí)行結(jié)果保存到outcom變量中FutureTask的狀態(tài)從NEW修改為NORMAL喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的所有線程

protectedvoidset(Vv){

//將狀態(tài)由NEW更新為COMPLETING

if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){

//保存任務(wù)的結(jié)果

outcome=v;

//更新狀態(tài)的最終狀態(tài)-NORMAL

UNSAFE.putOrderedInt(this,stateOffset,NORMAL);//finalstate

//通用的完成操作,主要作用就是喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的線程

finishCompletion();

FutureTask#setException()方法處理執(zhí)行異常的結(jié)果

setException()方法主要完成做下面的工作。

將異常信息保存到outcom變量中FutureTask的狀態(tài)從NEW修改為EXCEPTIONAL喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的所有線程

//FutureTask#setException

protectedvoidsetException(Throwablet){

//將狀態(tài)由NEW更新為COMPLETING

if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){

//將異常信息保存到輸出結(jié)果中

outcome=t;

//更新狀態(tài)機(jī)處理異常的最終狀態(tài)-EXCEPTIONAL

UNSAFE.putOrderedInt(this,stateOffset,EXCEPTIONAL);//finalstate

//通用的完成操作,主要作用就是喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的線程

finishCompletion();

這里的finishCompletion()喚醒我們?cè)诤竺嬷v解,上面的整個(gè)邏輯可以用一張圖表示:

任務(wù)結(jié)果獲取get()原理

其他線程可以調(diào)用get()方法或者超時(shí)阻塞方法get(longtimeout,TimeUnitunit)獲取任務(wù)運(yùn)行的結(jié)果。

FutureTask#get()方法是獲取任務(wù)執(zhí)行結(jié)果的入口方法。

//阻塞獲取任務(wù)結(jié)果

publicVget()throwsInterruptedException,ExecutionException{

ints=state;

//任務(wù)還沒有執(zhí)行完成,通過awaitDone方法進(jìn)行阻塞等待

if(s=COMPLETING)

s=awaitDone(false,0L);

//返回結(jié)果

returnreport(s);

//超時(shí)阻塞獲取任務(wù)結(jié)果

publicVget(longtimeout,TimeUnitunit)

throwsInterruptedException,ExecutionException,TimeoutException{

//判空處理

if(unit==null)

thrownewNullPointerException();

ints=state;

//任務(wù)還沒有執(zhí)行完成,通過awaitDone方法進(jìn)行阻塞等待

if(s=COMPLETING

//如果awaitDone返回的結(jié)果還是小于等于COMPLETING,表示運(yùn)行中,那么直接拋出超時(shí)異常

(s=awaitDone(true,unit.toNanos(timeout)))=COMPLETING)

thrownewTimeoutException();

//返回結(jié)果

returnreport(s);

FutureTask#awaitDone()方法阻塞等待任務(wù)執(zhí)行結(jié)束

該方法主要完成下面的工作:

判斷任務(wù)是否運(yùn)行結(jié)束,結(jié)束的話直接返回運(yùn)行狀態(tài)如果任務(wù)沒有結(jié)果,將請(qǐng)求線程阻塞請(qǐng)求線程阻塞時(shí),會(huì)創(chuàng)建一個(gè)waiter節(jié)點(diǎn),然后加入到阻塞等待的棧中

//線程阻塞等待方法,timed等于true表示阻塞等待有時(shí)間限制nanos,false表示沒有,一直阻塞

privateintawaitDone(booleantimed,longnanos)throwsInterruptedException{

//計(jì)算阻塞超時(shí)時(shí)間點(diǎn)

finallongdeadline=timedSystem.nanoTime()+nanos:0L;

WaitNodeq=null;

//表示q是否添加到waiters棧中,默認(rèn)false

booleanqueued=false;

//自旋操作

for(;;){

//如果阻塞線程被中斷則將當(dāng)前線程從阻塞隊(duì)列中移除

if(Terrupted()){

//從waiters棧中移除WaitNode,

removeWaiter(q);

//返回中斷移除

thrownewInterruptedException();

//獲取任務(wù)的狀態(tài)

ints=state;

//如果任務(wù)的狀態(tài)大于COMPLETING,表示線程運(yùn)行結(jié)束了,直接返回

if(sCOMPLETING){

//任務(wù)已經(jīng)完成時(shí)直接返回結(jié)果

if(q!=null)

q.thread=null;

//返回狀態(tài)

returns;

//如果任務(wù)狀態(tài)是COMPLETING

elseif(s==COMPLETING)

//如果任務(wù)執(zhí)行完成,但還差最后一步最終完成,則讓出CPU給任務(wù)執(zhí)行線程繼續(xù)執(zhí)行

Thread.yield();

//如果任務(wù)狀態(tài)小于COMPLETING,說明任務(wù)還在運(yùn)行中

//如果q為空的情況

elseif(q==null)

//新進(jìn)來的線程添加等待節(jié)點(diǎn)

q=newWaitNode();

//如果任務(wù)還在運(yùn)行中并且當(dāng)前線程節(jié)點(diǎn)還不在waiters棧中,那么就加入

elseif(!queued)

//上一步節(jié)點(diǎn)創(chuàng)建完,還沒將其添加到waiters棧中,因此在下一個(gè)循環(huán)就會(huì)執(zhí)行此處進(jìn)行入棧操作,并將當(dāng)前線程的等待節(jié)點(diǎn)置于棧頂

queued=UNSAFE.compareAndSwapObject(this,waitersOffset,

q.next=waiters,q);

//如果任務(wù)還在運(yùn)行中并且timed為true,表示有超時(shí)限制

elseif(timed){

//如果設(shè)置了阻塞超時(shí)時(shí)間,則進(jìn)行檢查是否達(dá)到阻塞超時(shí)時(shí)間,達(dá)到了則刪除當(dāng)前線程的等待節(jié)點(diǎn)并退出循環(huán)返回,否則繼續(xù)阻塞

nanos=deadline-System.nanoTime();

//如果nanos小于等于0

if(nanos=0L){

//從waiters棧中移除

removeWaiter(q);

//返回狀態(tài)

returnstate;

//超時(shí)阻塞當(dāng)前線程,超過時(shí)間,就會(huì)恢復(fù)

LockSupport.parkNanos(this,nanos);

//如果任務(wù)還在運(yùn)行中并且timed為false,沒有有超時(shí)限制

else

//一直阻塞當(dāng)前線程

LockSupport.park(this);

FutureTask#report方法解析返回任務(wù)結(jié)果

//獲取任務(wù)結(jié)果方法:正常執(zhí)行則直接返回結(jié)果,否則拋出異常

privateVreport(ints)throwsExecutionException{

Objectx=outcome;

//如果狀態(tài)是正常情況

if(s==NORMAL)

//直接返回

return(V)x;

//如果狀態(tài)是取消了,拋出異常

if(s=CANCELLED)

thrownewCancellationException();

thrownewExecutionException((Throwable)x);

FutureTask#finishCompletion()方法用來喚醒前面等待的線程

上一步awaitDone方法會(huì)阻塞調(diào)用的線程,那么任務(wù)運(yùn)行結(jié)束總要喚醒他們?nèi)ツ媒Y(jié)果吧,這個(gè)工作就在finishCompletion()方法中。

privatevoidfinishCompletion(){

//遍歷waiters棧中的每個(gè)元素;

for(WaitNodeq;(q=waiters)!=null;){

//cas設(shè)置waiters中q節(jié)點(diǎn)數(shù)據(jù)為null,成功的話,進(jìn)入到if中

if(UNSAFE.compareAndSwapObject(this,waitersOffset,q,null)){

//自選操作

for(;;){

//獲取節(jié)點(diǎn)中的線程

Threadt=q.thread;

if(t!=null){

q.thread=null;

//喚醒線程

LockSupport.unpark(t);

//獲取下一個(gè)節(jié)點(diǎn)

WaitNodenext=q.next;

if(next==null)

break;

q.next=null;//unlinktohelpgc

q=next;

break;

//鉤子方法,有子類去實(shí)現(xiàn)

done();

//設(shè)置原來的任務(wù)callable為null

callable=null;//toreducefootprint

任務(wù)取消cancel()原理

可以調(diào)用FutureTask#cancel方法取消任務(wù)執(zhí)行,但是要注意下面幾點(diǎn):

任務(wù)取消時(shí)會(huì)先檢查是否允許取消,當(dāng)任務(wù)已經(jīng)完成或者正在完成(正常執(zhí)行并繼續(xù)處理結(jié)果或執(zhí)行異常處理

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論