JavaFlink窗口觸發(fā)器Trigger的用法詳解_第1頁
JavaFlink窗口觸發(fā)器Trigger的用法詳解_第2頁
JavaFlink窗口觸發(fā)器Trigger的用法詳解_第3頁
JavaFlink窗口觸發(fā)器Trigger的用法詳解_第4頁
JavaFlink窗口觸發(fā)器Trigger的用法詳解_第5頁
已閱讀5頁,還剩4頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

第JavaFlink窗口觸發(fā)器Trigger的用法詳解目錄定義Trigger源碼TriggerResult源碼Flink預置的TriggerEventTimeTrigger源碼ProcessingTimeTrigger源碼常見窗口的Trigger滾動窗口滑動窗口會話窗口全局窗口

定義

Trigger確定窗口(由窗口分配器形成)何時準備好由窗口函數(shù)處理。每個WindowAssigner都帶有一個默認值Trigger。如果默認觸發(fā)器不符合您的需求,您可以使用trigger()。

Trigger源碼

publicabstractclassTriggerT,WextendsWindowimplementsSerializable{

只要有元素落?到當前窗?,就會調(diào)?該?法

*@paramelement收到的元素

*@paramtimestamp元素抵達時間.

*@paramwindow元素所屬的window窗口.

*@paramctx?個上下?對象,通常?該對象注冊timer(ProcessingTime/EventTime)回調(diào).

publicabstractTriggerResultonElement(Tvar1,longvar2,Wvar4,Trigger.TriggerContextvar5)throwsException;

*processing-time定時器回調(diào)函數(shù)

*@paramtime定時器觸發(fā)的時間.

*@paramwindow定時器觸發(fā)的窗口對象.

*@paramctx?個上下?對象,通常?該對象注冊timer(ProcessingTime/EventTime)回調(diào).

publicabstractTriggerResultonProcessingTime(longvar1,Wvar3,Trigger.TriggerContextvar4)throwsException;

*event-time定時器回調(diào)函數(shù)

*@paramtime定時器觸發(fā)的時間.

*@paramwindow定時器觸發(fā)的窗口對象.

*@paramctx?個上下?對象,通常?該對象注冊timer(ProcessingTime/EventTime)回調(diào).

publicabstractTriggerResultonEventTime(longvar1,Wvar3,Trigger.TriggerContextvar4)throwsException;

*當多個窗口合并到?個窗?的時候,調(diào)用該方法法,例如系統(tǒng)SessionWindow

*@paramwindow合并后的新窗口對象

*@paramctx?個上下?對象,通常用該對象注冊timer(ProcessingTime/EventTime)回調(diào)以及訪問狀態(tài)

publicvoidonMerge(Wwindow,Trigger.OnMergeContextctx)throwsException{

thrownewUnsupportedOperationException("Thistriggerdoesnotsupportmerging.");

*當窗口被刪除后執(zhí)?所需的任何操作。例如:可以清除定時器或者刪除狀態(tài)數(shù)據(jù)

publicabstractvoidclear(Wvar1,Trigger.TriggerContextvar2)throwsException;

TriggerResult源碼

publicenumTriggerResult{

//表示對窗口不執(zhí)行任何操作。即不觸發(fā)窗口計算,也不刪除元素。

CONTINUE(false,false),

//觸發(fā)窗口計算,輸出結果,然后將窗口中的數(shù)據(jù)和窗口進行清除。

FIRE_AND_PURGE(true,true),

//觸發(fā)窗口計算,但是保留窗口元素

FIRE(true,false),

//不觸發(fā)窗口計算,丟棄窗口,并且刪除窗口的元素。

PURGE(false,true);

privatefinalbooleanfire;

privatefinalbooleanpurge;

privateTriggerResult(booleanfire,booleanpurge){

this.purge=purge;

this.fire=fire;

publicbooleanisFire(){

returnthis.fire;

publicbooleanisPurge(){

returnthis.purge;

一旦觸發(fā)器確定窗口已準備好進行處理,就會觸發(fā),返回狀態(tài)可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發(fā)窗口計算并保留窗口內(nèi)容,而FIRE_AND_PURGE是觸發(fā)窗口計算并刪除窗口內(nèi)容。默認情況下,預實現(xiàn)的觸發(fā)器只是簡單地FIRE不清除窗口狀態(tài)。

Flink預置的Trigger

EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發(fā)窗口計算,如果EventTime大于WindowEndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計算,否則窗口繼續(xù)等待。ContinuousEventTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前EndTime觸發(fā)窗口計算。ContinuousProcessingTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前ProcessTime觸發(fā)窗口計算。CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過設定的闕值判斷是否觸發(fā)窗口計算。DeltaTrigger:根據(jù)接入數(shù)據(jù)計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發(fā)窗口計算。PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型的觸發(fā)器,計算完成后數(shù)據(jù)將被清理。NeverTrigger:任何時候都不觸發(fā)窗口計算

主要看看EventTimeTrigger和ProcessingTimeTrigger的源碼。

EventTimeTrigger源碼

publicclassEventTimeTriggerextendsTriggerObject,TimeWindow{

privatestaticfinallongserialVersionUID=1L;

privateEventTimeTrigger(){

publicTriggerResultonElement(Objectelement,longtimestamp,TimeWindowwindow,TriggerContextctx)throwsException{

if(window.maxTimestamp()=ctx.getCurrentWatermark()){

returnTriggerResult.FIRE;

}else{

ctx.registerEventTimeTimer(window.maxTimestamp());

returnTriggerResult.CONTINUE;

publicTriggerResultonEventTime(longtime,TimeWindowwindow,TriggerContextctx){

returntime==window.maxTimestamp()TriggerResult.FIRE:TriggerResult.CONTINUE;

publicTriggerResultonProcessingTime(longtime,TimeWindowwindow,TriggerContextctx)throwsException{

returnTriggerResult.CONTINUE;

publicvoidclear(TimeWindowwindow,TriggerContextctx)throwsException{

ctx.deleteEventTimeTimer(window.maxTimestamp());

publicbooleancanMerge(){

returntrue;

publicvoidonMerge(TimeWindowwindow,OnMergeContextctx){

longwindowMaxTimestamp=window.maxTimestamp();

if(windowMaxTimestampctx.getCurrentWatermark()){

ctx.registerEventTimeTimer(windowMaxTimestamp);

publicStringtoString(){

return"EventTimeTrigger()";

publicstaticEventTimeTriggercreate(){

returnnewEventTimeTrigger();

ProcessingTimeTrigger源碼

publicclassProcessingTimeTriggerextendsTriggerObject,TimeWindow{

privatestaticfinallongserialVersionUID=1L;

privateProcessingTimeTrigger(){

publicTriggerResultonElement(Objectelement,longtimestamp,TimeWindowwindow,TriggerContextctx){

ctx.registerProcessingTimeTimer(window.maxTimestamp());

returnTriggerResult.CONTINUE;

publicTriggerResultonEventTime(longtime,TimeWindowwindow,TriggerContextctx)throwsException{

returnTriggerResult.CONTINUE;

publicTriggerResultonProcessingTime(longtime,TimeWindowwindow,TriggerContextctx){

returnTriggerResult.FIRE;

publicvoidclear(TimeWindowwindow,TriggerContextctx)throwsException{

ctx.deleteProcessingTimeTimer(window.maxTimestamp());

publicbooleancanMerge(){

returntrue;

publicvoidonMerge(TimeWindowwindow,OnMergeContextctx){

longwindowMaxTimestamp=window.maxTimestamp();

if(windowMaxTimestampctx.getCurrentProcessingTime()){

ctx.registerProcessingTimeTimer(windowMaxTimestamp);

publicStringtoString(){

return"ProcessingTimeTrigger()";

publicstaticProcessingTimeTriggercreate(){

returnnewProcessingTimeTrigger();

在onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數(shù)是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發(fā)并調(diào)用onProcessingTime()方法,在onProcessingTime()方法中,returnTriggerResult.FIRE即返回FIRE,觸發(fā)窗口中數(shù)據(jù)的計算,但是會保留窗口元素。

需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達的時候觸發(fā)窗口函數(shù)的計算,計算完成后并不會清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲在內(nèi)存中,除非調(diào)用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內(nèi)存中。實際上,F(xiàn)link中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數(shù)據(jù)進行清除。

常見窗口的Trigger

滾動窗口

TumblingEventTimeWindows:EventTimeTrigger

publicclassTumblingEventTimeWindowsextendsWindowAssignerObject,TimeWindow{

publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){

returnEventTimeTrigger.create();

TumblingProcessingTimeWindows:ProcessingTimeTrigger

publicclassTumblingProcessingTimeWindowsextendsWindowAssignerObject,TimeWindow{

publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){

returnProcessingTimeTrigger.create();

滑動窗口

SlidingEventTimeWindows:EventTimeTrigger

publicclassSlidingEventTimeWindowsextendsWindowAssignerObject,TimeWindow{

publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){

returnEventTimeTrigger.create();

SlidingProcessingTimeWindows:ProcessingTimeTrigger

publicclassSlidingProcessingTimeWindowsextendsWindowAssignerObject,TimeWindow{

publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){

returnProcessingTimeTrigger.create();

會話窗口

Even

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論