版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
第Java線程池submit阻塞獲取結(jié)果的實(shí)現(xiàn)原理詳解目錄前言案例演示和execute區(qū)別原理實(shí)現(xiàn)RunnableFuture類介紹任務(wù)執(zhí)行run()原理任務(wù)結(jié)果獲取get()原理任務(wù)取消cancel()原理總結(jié)
前言
Java線程池中提交任務(wù)運(yùn)行,通常使用execute()方法就足夠了。那如果想要實(shí)現(xiàn)在主線程中阻塞獲取線程池任務(wù)運(yùn)行的結(jié)果,該怎么辦呢?答案是用submit()方法提交任務(wù)。這也是面試中經(jīng)常被問到的一個(gè)知識(shí)點(diǎn),execute()和submit()提交任務(wù)的的區(qū)別是什么?底層是如何實(shí)現(xiàn)的?
案例演示
現(xiàn)在我們通過簡單的例子演示下submit()方法的妙處。
@Test
publicvoidtestSubmit()throwsExecutionException,InterruptedException{
//創(chuàng)建一個(gè)核心線程數(shù)為5的線程池
ThreadPoolExecutorthreadPoolExecutor=newThreadPoolExecutor(5,10,30,TimeUnit.SECONDS,newArrayBlockingQueue(50));
//創(chuàng)建一個(gè)計(jì)算任務(wù)
CallableIntegermyTask=newCallableInteger(){
@Override
publicIntegercall()throwsException{
intresult=0;
for(inti=0;i10000;i++){
result+=i;
Thread.sleep(1000);
returnresult;
("startsubmittask.....");
FutureIntegerfuture=threadPoolExecutor.submit(myTask);
Integersum=future.get();
("getsubmitresult:[{}]",sum);
//usesumdootherthings
運(yùn)行結(jié)果:
主線程的確阻塞等待線程返回。
Future類API
我們看到用submit提交任務(wù)最后返回一個(gè)Future對(duì)象,F(xiàn)uture表示異步計(jì)算的結(jié)果。那它都提供了什么API呢?
方法說明Vget()等待任務(wù)執(zhí)行完成,然后獲取其結(jié)果。Vget(longtimeout,TimeUnitunit)等待獲取任務(wù)執(zhí)行的結(jié)果,如果任務(wù)超過一定時(shí)間沒有執(zhí)行完畢,直接返回,拋出異常,不會(huì)一直等待下去。booleanisDone()如果此任務(wù)已完成,則返回true。完成可能是由于正常終止、異?;蛉∠谒羞@些情況下,該方法都將返回true。booleanisCancelled()如果該任務(wù)在正常完成之前被取消,則返回true。booleancancel(booleanmayInterruptIfRunning)試圖取消此任務(wù)的執(zhí)行。1.如果任務(wù)已經(jīng)完成、已經(jīng)取消或由于其他原因無法取消,則此嘗試將失敗。
如果在調(diào)用cancel時(shí)此任務(wù)尚未啟動(dòng),則此任務(wù)不應(yīng)運(yùn)行。如果任務(wù)已經(jīng)開始,那么mayInterruptIfRunning參數(shù)確定是否應(yīng)該中斷執(zhí)行此任務(wù)的線程以試圖停止該任務(wù)。|
和execute區(qū)別
從功能層面,我們已經(jīng)很明白他們最大區(qū)別,
execute()方式提交任務(wù)沒有返回值,直接線程中池異步運(yùn)行任務(wù)。submit()方式提交任務(wù)有返回值Future,調(diào)用get方法可以阻塞調(diào)用線程,等待任務(wù)運(yùn)行返回的結(jié)果。
那從源碼層面,二者又有什么區(qū)別和聯(lián)系呢?
我們看下submit()提交的入口方法,代碼如下:
//AbstractExecutorService#submit
publicTFutureTsubmit(CallableTtask){
//判空處理
if(task==null)thrownewNullPointerException();
//將提交的任務(wù)包裝成RunnableFuture
RunnableFutureTftask=newTaskFor(task);
//最終還是調(diào)用execute方法執(zhí)行任務(wù)
execute(ftask);
returnftask;
殊途同歸,最終都是調(diào)用execute()方法,只不過submit()方法在調(diào)用前做一層包裝,將任務(wù)包裝成RunnableFuture對(duì)象。
關(guān)于線程池中execute()方法提交的流程和原理實(shí)現(xiàn)不理解的,強(qiáng)烈建議先學(xué)習(xí)這篇文章:Java線程池源碼深度解析。
原理實(shí)現(xiàn)
本節(jié)內(nèi)容我們聚焦在submit()方法的實(shí)現(xiàn)原理。
我們先思考下,如果讓我們?cè)O(shè)計(jì)實(shí)現(xiàn)調(diào)用get阻塞知道線程返回結(jié)果,要考慮哪些方面呢
任務(wù)是否執(zhí)行結(jié)束或者執(zhí)行出錯(cuò)等情況,是不是需要有個(gè)狀態(tài)位標(biāo)記?任務(wù)的執(zhí)行結(jié)果如何保存?如果任務(wù)沒有執(zhí)行結(jié)束,如何阻塞當(dāng)前線程,LockSupport.park()是一種方式。如果有多個(gè)外部線程獲取get,是不是應(yīng)該也要把外部線程存下來,怎么存?因?yàn)楹竺嫒蝿?wù)執(zhí)行完后需要喚醒他們。
帶著這些問題和基本思路我們看下jdk8中是如何實(shí)現(xiàn)的。
RunnableFuture類介紹
submit()方法中調(diào)用newTaskFor()方法獲取RunnableFuture對(duì)象。
//AbstractExecutorService#newTaskFor
protectedTRunnableFutureTnewTaskFor(CallableTcallable){
//調(diào)用FutureTask的構(gòu)造方法返回RunnableFuture對(duì)象
returnnewFutureTaskT(callable);
FutureTask類結(jié)構(gòu)圖如下:
FutureTask是一個(gè)異步計(jì)算任務(wù),包裝了我們外部提交的任務(wù)。
實(shí)現(xiàn)了Runnable接口實(shí)現(xiàn)了Future接口,該接口封裝了任務(wù)結(jié)果的獲取、任務(wù)是否結(jié)束等接口。
RunnableFuture類重要屬性
1.任務(wù)運(yùn)行狀態(tài)state
//存儲(chǔ)當(dāng)前任務(wù)運(yùn)行狀態(tài)
privatevolatileintstate;
//當(dāng)前任務(wù)尚未執(zhí)行
privatestaticfinalintNEW=0;
//當(dāng)前任務(wù)正在結(jié)束,尚未完全結(jié)束,一種臨界狀態(tài)
privatestaticfinalintCOMPLETING=1;
//當(dāng)前任務(wù)正常結(jié)束
privatestaticfinalintNORMAL=2;
//當(dāng)前任務(wù)執(zhí)行過程中發(fā)生了異常
privatestaticfinalintEXCEPTIONAL=3;
//當(dāng)前任務(wù)被取消
privatestaticfinalintCANCELLED=4;
//當(dāng)前任務(wù)中斷中
privatestaticfinalintINTERRUPTING=5;
//當(dāng)前任務(wù)已中斷
privatestaticfinalintINTERRUPTED=6;
可能的狀態(tài)轉(zhuǎn)換有如下幾種:
NEW-COMPLETING-NORMALNEW-COMPLETING-EXCEPTIONALNEW-CANCELLEDNEW-INTERRUPTING-INTERRUPTED
2.真正要執(zhí)行的任務(wù)callble
//存放真正提交的原始任務(wù)
privateCallableVcallable;
3.存放執(zhí)行結(jié)果outcome
返回的結(jié)果或從get()中拋出的異常
privateObjectoutcome;
4.當(dāng)前正在運(yùn)行任務(wù)的線程runner
//當(dāng)前任務(wù)被線程執(zhí)行期間,保存當(dāng)前任務(wù)的線程對(duì)象引用
privatevolatileThreadrunner;
5.調(diào)用get獲取任務(wù)結(jié)果的等待線程集合waiters
//因?yàn)闀?huì)有很多線程去get當(dāng)前任務(wù)的結(jié)果,所以這里使用了一種stack數(shù)據(jù)結(jié)構(gòu)來保存
privatevolatileWaitNodewaiters;
staticfinalclassWaitNode{
volatileThreadthread;
volatileWaitNodenext;
WaitNode(){thread=Thread.currentThread();}
數(shù)據(jù)結(jié)構(gòu)如下圖:
RunnableFuture類構(gòu)造方法
publicFutureTask(CallableVcallable){
if(callable==null)
thrownewNullPointerException();
//設(shè)置要執(zhí)行的任務(wù)
this.callable=callable;
//初始化時(shí)任務(wù)狀態(tài)為NEW
this.state=NEW;
任務(wù)執(zhí)行run()原理
submit()方法最終調(diào)用線程池的execute()方法,而execute()方法會(huì)創(chuàng)建出工人Worker對(duì)象,調(diào)用runWorker()方法,它主要是執(zhí)行外部提交的任務(wù),也就是這里的FutureTask對(duì)象的run()方法,我們重點(diǎn)看下run()方法。
FutureTask#run()開始執(zhí)行任務(wù)。
它主要的功能是完成包裝的callable的call方法執(zhí)行,并將執(zhí)行結(jié)果保存到outcome中,同時(shí)捕獲了call方法執(zhí)行出現(xiàn)的異常,并保存異常信息,而不是直接拋出。
publicvoidrun(){
//狀態(tài)機(jī)不為NEW表示執(zhí)行完成或任務(wù)被取消了,直接返回
//狀態(tài)機(jī)為NEW,同時(shí)將runner設(shè)置為當(dāng)前線程,保證同一時(shí)刻只有一個(gè)線程執(zhí)行run方法,如果設(shè)置失敗也直接返回
if(state!=NEW||
!UNSAFE.compareAndSwapObject(this,runnerOffset,
null,Thread.currentThread()))
return;
try{
CallableVc=callable;
//取出原始的任務(wù)檢測不為空且再次檢查狀態(tài)為NEW(雙重校驗(yàn))
if(c!=nullstate==NEW){
//任務(wù)運(yùn)行的結(jié)果
Vresult;
//任務(wù)是否運(yùn)行是否正常,true:正常,false-異常
booleanran;
try{
//任務(wù)執(zhí)行,將結(jié)果返回給result
result=c.call();
//設(shè)置任務(wù)運(yùn)行正常
ran=true;
}catch(Throwableex){
//任務(wù)運(yùn)行報(bào)錯(cuò)的情況
//設(shè)置結(jié)果為空
result=null;
//設(shè)置任務(wù)運(yùn)行異常標(biāo)記
ran=false;
//任務(wù)執(zhí)行拋出異常時(shí),保存異常信息,而不直接拋出
setException(ex);
//執(zhí)行成功則保存結(jié)果
if(ran)
set(result);
}finally{
//runnermustbenon-nulluntilstateissettledto
//preventconcurrentcallstorun()
//執(zhí)行完成后設(shè)置runner為null
runner=null;
//statemustbere-readafternullingrunnertoprevent
//leakedinterrupts
//獲取任務(wù)狀態(tài)
ints=state;
//如果被置為了中斷狀態(tài)則進(jìn)行中斷的處理
if(s=INTERRUPTING)
handlePossibleCancellationInterrupt(s);
FutureTask#set()方法處理正常執(zhí)行的運(yùn)行結(jié)果
setException()方法主要完成做下面的工作。
將執(zhí)行結(jié)果保存到outcom變量中FutureTask的狀態(tài)從NEW修改為NORMAL喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的所有線程
protectedvoidset(Vv){
//將狀態(tài)由NEW更新為COMPLETING
if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){
//保存任務(wù)的結(jié)果
outcome=v;
//更新狀態(tài)的最終狀態(tài)-NORMAL
UNSAFE.putOrderedInt(this,stateOffset,NORMAL);//finalstate
//通用的完成操作,主要作用就是喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的線程
finishCompletion();
FutureTask#setException()方法處理執(zhí)行異常的結(jié)果
setException()方法主要完成做下面的工作。
將異常信息保存到outcom變量中FutureTask的狀態(tài)從NEW修改為EXCEPTIONAL喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的所有線程
//FutureTask#setException
protectedvoidsetException(Throwablet){
//將狀態(tài)由NEW更新為COMPLETING
if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){
//將異常信息保存到輸出結(jié)果中
outcome=t;
//更新狀態(tài)機(jī)處理異常的最終狀態(tài)-EXCEPTIONAL
UNSAFE.putOrderedInt(this,stateOffset,EXCEPTIONAL);//finalstate
//通用的完成操作,主要作用就是喚醒阻塞在waiters隊(duì)列中請(qǐng)求get的線程
finishCompletion();
這里的finishCompletion()喚醒我們?cè)诤竺嬷v解,上面的整個(gè)邏輯可以用一張圖表示:
任務(wù)結(jié)果獲取get()原理
其他線程可以調(diào)用get()方法或者超時(shí)阻塞方法get(longtimeout,TimeUnitunit)獲取任務(wù)運(yùn)行的結(jié)果。
FutureTask#get()方法是獲取任務(wù)執(zhí)行結(jié)果的入口方法。
//阻塞獲取任務(wù)結(jié)果
publicVget()throwsInterruptedException,ExecutionException{
ints=state;
//任務(wù)還沒有執(zhí)行完成,通過awaitDone方法進(jìn)行阻塞等待
if(s=COMPLETING)
s=awaitDone(false,0L);
//返回結(jié)果
returnreport(s);
//超時(shí)阻塞獲取任務(wù)結(jié)果
publicVget(longtimeout,TimeUnitunit)
throwsInterruptedException,ExecutionException,TimeoutException{
//判空處理
if(unit==null)
thrownewNullPointerException();
ints=state;
//任務(wù)還沒有執(zhí)行完成,通過awaitDone方法進(jìn)行阻塞等待
if(s=COMPLETING
//如果awaitDone返回的結(jié)果還是小于等于COMPLETING,表示運(yùn)行中,那么直接拋出超時(shí)異常
(s=awaitDone(true,unit.toNanos(timeout)))=COMPLETING)
thrownewTimeoutException();
//返回結(jié)果
returnreport(s);
FutureTask#awaitDone()方法阻塞等待任務(wù)執(zhí)行結(jié)束
該方法主要完成下面的工作:
判斷任務(wù)是否運(yùn)行結(jié)束,結(jié)束的話直接返回運(yùn)行狀態(tài)如果任務(wù)沒有結(jié)果,將請(qǐng)求線程阻塞請(qǐng)求線程阻塞時(shí),會(huì)創(chuàng)建一個(gè)waiter節(jié)點(diǎn),然后加入到阻塞等待的棧中
//線程阻塞等待方法,timed等于true表示阻塞等待有時(shí)間限制nanos,false表示沒有,一直阻塞
privateintawaitDone(booleantimed,longnanos)throwsInterruptedException{
//計(jì)算阻塞超時(shí)時(shí)間點(diǎn)
finallongdeadline=timedSystem.nanoTime()+nanos:0L;
WaitNodeq=null;
//表示q是否添加到waiters棧中,默認(rèn)false
booleanqueued=false;
//自旋操作
for(;;){
//如果阻塞線程被中斷則將當(dāng)前線程從阻塞隊(duì)列中移除
if(Terrupted()){
//從waiters棧中移除WaitNode,
removeWaiter(q);
//返回中斷移除
thrownewInterruptedException();
//獲取任務(wù)的狀態(tài)
ints=state;
//如果任務(wù)的狀態(tài)大于COMPLETING,表示線程運(yùn)行結(jié)束了,直接返回
if(sCOMPLETING){
//任務(wù)已經(jīng)完成時(shí)直接返回結(jié)果
if(q!=null)
q.thread=null;
//返回狀態(tài)
returns;
//如果任務(wù)狀態(tài)是COMPLETING
elseif(s==COMPLETING)
//如果任務(wù)執(zhí)行完成,但還差最后一步最終完成,則讓出CPU給任務(wù)執(zhí)行線程繼續(xù)執(zhí)行
Thread.yield();
//如果任務(wù)狀態(tài)小于COMPLETING,說明任務(wù)還在運(yùn)行中
//如果q為空的情況
elseif(q==null)
//新進(jìn)來的線程添加等待節(jié)點(diǎn)
q=newWaitNode();
//如果任務(wù)還在運(yùn)行中并且當(dāng)前線程節(jié)點(diǎn)還不在waiters棧中,那么就加入
elseif(!queued)
//上一步節(jié)點(diǎn)創(chuàng)建完,還沒將其添加到waiters棧中,因此在下一個(gè)循環(huán)就會(huì)執(zhí)行此處進(jìn)行入棧操作,并將當(dāng)前線程的等待節(jié)點(diǎn)置于棧頂
queued=UNSAFE.compareAndSwapObject(this,waitersOffset,
q.next=waiters,q);
//如果任務(wù)還在運(yùn)行中并且timed為true,表示有超時(shí)限制
elseif(timed){
//如果設(shè)置了阻塞超時(shí)時(shí)間,則進(jìn)行檢查是否達(dá)到阻塞超時(shí)時(shí)間,達(dá)到了則刪除當(dāng)前線程的等待節(jié)點(diǎn)并退出循環(huán)返回,否則繼續(xù)阻塞
nanos=deadline-System.nanoTime();
//如果nanos小于等于0
if(nanos=0L){
//從waiters棧中移除
removeWaiter(q);
//返回狀態(tài)
returnstate;
//超時(shí)阻塞當(dāng)前線程,超過時(shí)間,就會(huì)恢復(fù)
LockSupport.parkNanos(this,nanos);
//如果任務(wù)還在運(yùn)行中并且timed為false,沒有有超時(shí)限制
else
//一直阻塞當(dāng)前線程
LockSupport.park(this);
FutureTask#report方法解析返回任務(wù)結(jié)果
//獲取任務(wù)結(jié)果方法:正常執(zhí)行則直接返回結(jié)果,否則拋出異常
privateVreport(ints)throwsExecutionException{
Objectx=outcome;
//如果狀態(tài)是正常情況
if(s==NORMAL)
//直接返回
return(V)x;
//如果狀態(tài)是取消了,拋出異常
if(s=CANCELLED)
thrownewCancellationException();
thrownewExecutionException((Throwable)x);
FutureTask#finishCompletion()方法用來喚醒前面等待的線程
上一步awaitDone方法會(huì)阻塞調(diào)用的線程,那么任務(wù)運(yùn)行結(jié)束總要喚醒他們?nèi)ツ媒Y(jié)果吧,這個(gè)工作就在finishCompletion()方法中。
privatevoidfinishCompletion(){
//遍歷waiters棧中的每個(gè)元素;
for(WaitNodeq;(q=waiters)!=null;){
//cas設(shè)置waiters中q節(jié)點(diǎn)數(shù)據(jù)為null,成功的話,進(jìn)入到if中
if(UNSAFE.compareAndSwapObject(this,waitersOffset,q,null)){
//自選操作
for(;;){
//獲取節(jié)點(diǎn)中的線程
Threadt=q.thread;
if(t!=null){
q.thread=null;
//喚醒線程
LockSupport.unpark(t);
//獲取下一個(gè)節(jié)點(diǎn)
WaitNodenext=q.next;
if(next==null)
break;
q.next=null;//unlinktohelpgc
q=next;
break;
//鉤子方法,有子類去實(shí)現(xiàn)
done();
//設(shè)置原來的任務(wù)callable為null
callable=null;//toreducefootprint
任務(wù)取消cancel()原理
可以調(diào)用FutureTask#cancel方法取消任務(wù)執(zhí)行,但是要注意下面幾點(diǎn):
任務(wù)取消時(shí)會(huì)先檢查是否允許取消,當(dāng)任務(wù)已經(jīng)完成或者正在完成(正常執(zhí)行并繼續(xù)處理結(jié)果或執(zhí)行異常處理
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026屆陜西省西安市西工大附中高三語文第一學(xué)期期末檢測模擬試題含解析
- 內(nèi)勤技能培訓(xùn)
- 小學(xué)的活動(dòng)策劃方案(3篇)
- 烘焙食品安全管理制度(3篇)
- 班級(jí)管理制度的理念是(3篇)
- 美術(shù)活動(dòng)海報(bào)策劃方案(3篇)
- 藥企行業(yè)輿情管理制度(3篇)
- 食品溯源管理制度流程表(3篇)
- 中學(xué)學(xué)生社團(tuán)活動(dòng)表彰獎(jiǎng)勵(lì)制度
- 兼培訓(xùn)課件教學(xué)課件
- 特種工安全崗前培訓(xùn)課件
- 新疆維吾爾自治區(qū)普通高中2026屆高二上數(shù)學(xué)期末監(jiān)測試題含解析
- 2026屆福建省三明市第一中學(xué)高三上學(xué)期12月月考?xì)v史試題(含答案)
- 2026年遼寧金融職業(yè)學(xué)院單招職業(yè)技能測試題庫附答案解析
- (正式版)DB51∕T 3342-2025 《爐灶用合成液體燃料經(jīng)營管理規(guī)范》
- 2026北京海淀初三上學(xué)期期末語文試卷和答案
- 2024-2025學(xué)年北京市東城區(qū)五年級(jí)(上)期末語文試題(含答案)
- 人工智能在醫(yī)療領(lǐng)域的應(yīng)用
- 2025學(xué)年度人教PEP五年級(jí)英語上冊(cè)期末模擬考試試卷(含答案含聽力原文)
- 【10篇】新部編五年級(jí)上冊(cè)語文課內(nèi)外閱讀理解專項(xiàng)練習(xí)題及答案
- 南京市雨花臺(tái)區(qū)醫(yī)療保險(xiǎn)管理中心等單位2025年公開招聘編外工作人員備考題庫有完整答案詳解
評(píng)論
0/150
提交評(píng)論