版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領
文檔簡介
第JavaFlink窗口觸發(fā)器Trigger的用法詳解目錄定義Trigger源碼TriggerResult源碼Flink預置的TriggerEventTimeTrigger源碼ProcessingTimeTrigger源碼常見窗口的Trigger滾動窗口滑動窗口會話窗口全局窗口
定義
Trigger確定窗口(由窗口分配器形成)何時準備好由窗口函數(shù)處理。每個WindowAssigner都帶有一個默認值Trigger。如果默認觸發(fā)器不符合您的需求,您可以使用trigger()。
Trigger源碼
publicabstractclassTriggerT,WextendsWindowimplementsSerializable{
只要有元素落?到當前窗?,就會調(diào)?該?法
*@paramelement收到的元素
*@paramtimestamp元素抵達時間.
*@paramwindow元素所屬的window窗口.
*@paramctx?個上下?對象,通常?該對象注冊timer(ProcessingTime/EventTime)回調(diào).
publicabstractTriggerResultonElement(Tvar1,longvar2,Wvar4,Trigger.TriggerContextvar5)throwsException;
*processing-time定時器回調(diào)函數(shù)
*@paramtime定時器觸發(fā)的時間.
*@paramwindow定時器觸發(fā)的窗口對象.
*@paramctx?個上下?對象,通常?該對象注冊timer(ProcessingTime/EventTime)回調(diào).
publicabstractTriggerResultonProcessingTime(longvar1,Wvar3,Trigger.TriggerContextvar4)throwsException;
*event-time定時器回調(diào)函數(shù)
*@paramtime定時器觸發(fā)的時間.
*@paramwindow定時器觸發(fā)的窗口對象.
*@paramctx?個上下?對象,通常?該對象注冊timer(ProcessingTime/EventTime)回調(diào).
publicabstractTriggerResultonEventTime(longvar1,Wvar3,Trigger.TriggerContextvar4)throwsException;
*當多個窗口合并到?個窗?的時候,調(diào)用該方法法,例如系統(tǒng)SessionWindow
*@paramwindow合并后的新窗口對象
*@paramctx?個上下?對象,通常用該對象注冊timer(ProcessingTime/EventTime)回調(diào)以及訪問狀態(tài)
publicvoidonMerge(Wwindow,Trigger.OnMergeContextctx)throwsException{
thrownewUnsupportedOperationException("Thistriggerdoesnotsupportmerging.");
*當窗口被刪除后執(zhí)?所需的任何操作。例如:可以清除定時器或者刪除狀態(tài)數(shù)據(jù)
publicabstractvoidclear(Wvar1,Trigger.TriggerContextvar2)throwsException;
TriggerResult源碼
publicenumTriggerResult{
//表示對窗口不執(zhí)行任何操作。即不觸發(fā)窗口計算,也不刪除元素。
CONTINUE(false,false),
//觸發(fā)窗口計算,輸出結果,然后將窗口中的數(shù)據(jù)和窗口進行清除。
FIRE_AND_PURGE(true,true),
//觸發(fā)窗口計算,但是保留窗口元素
FIRE(true,false),
//不觸發(fā)窗口計算,丟棄窗口,并且刪除窗口的元素。
PURGE(false,true);
privatefinalbooleanfire;
privatefinalbooleanpurge;
privateTriggerResult(booleanfire,booleanpurge){
this.purge=purge;
this.fire=fire;
publicbooleanisFire(){
returnthis.fire;
publicbooleanisPurge(){
returnthis.purge;
一旦觸發(fā)器確定窗口已準備好進行處理,就會觸發(fā),返回狀態(tài)可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發(fā)窗口計算并保留窗口內(nèi)容,而FIRE_AND_PURGE是觸發(fā)窗口計算并刪除窗口內(nèi)容。默認情況下,預實現(xiàn)的觸發(fā)器只是簡單地FIRE不清除窗口狀態(tài)。
Flink預置的Trigger
EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發(fā)窗口計算,如果EventTime大于WindowEndTime則觸發(fā),否則不觸發(fā),窗口將繼續(xù)等待。ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發(fā)窗口,如果ProcessTime大于EndTime則觸發(fā)計算,否則窗口繼續(xù)等待。ContinuousEventTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前EndTime觸發(fā)窗口計算。ContinuousProcessingTimeTrigger:根據(jù)間隔時間周期性觸發(fā)窗口或者Window的結束時間小于當前ProcessTime觸發(fā)窗口計算。CountTrigger:根據(jù)接入數(shù)據(jù)量是否超過設定的闕值判斷是否觸發(fā)窗口計算。DeltaTrigger:根據(jù)接入數(shù)據(jù)計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發(fā)窗口計算。PurgingTrigger:可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型的觸發(fā)器,計算完成后數(shù)據(jù)將被清理。NeverTrigger:任何時候都不觸發(fā)窗口計算
主要看看EventTimeTrigger和ProcessingTimeTrigger的源碼。
EventTimeTrigger源碼
publicclassEventTimeTriggerextendsTriggerObject,TimeWindow{
privatestaticfinallongserialVersionUID=1L;
privateEventTimeTrigger(){
publicTriggerResultonElement(Objectelement,longtimestamp,TimeWindowwindow,TriggerContextctx)throwsException{
if(window.maxTimestamp()=ctx.getCurrentWatermark()){
returnTriggerResult.FIRE;
}else{
ctx.registerEventTimeTimer(window.maxTimestamp());
returnTriggerResult.CONTINUE;
publicTriggerResultonEventTime(longtime,TimeWindowwindow,TriggerContextctx){
returntime==window.maxTimestamp()TriggerResult.FIRE:TriggerResult.CONTINUE;
publicTriggerResultonProcessingTime(longtime,TimeWindowwindow,TriggerContextctx)throwsException{
returnTriggerResult.CONTINUE;
publicvoidclear(TimeWindowwindow,TriggerContextctx)throwsException{
ctx.deleteEventTimeTimer(window.maxTimestamp());
publicbooleancanMerge(){
returntrue;
publicvoidonMerge(TimeWindowwindow,OnMergeContextctx){
longwindowMaxTimestamp=window.maxTimestamp();
if(windowMaxTimestampctx.getCurrentWatermark()){
ctx.registerEventTimeTimer(windowMaxTimestamp);
publicStringtoString(){
return"EventTimeTrigger()";
publicstaticEventTimeTriggercreate(){
returnnewEventTimeTrigger();
ProcessingTimeTrigger源碼
publicclassProcessingTimeTriggerextendsTriggerObject,TimeWindow{
privatestaticfinallongserialVersionUID=1L;
privateProcessingTimeTrigger(){
publicTriggerResultonElement(Objectelement,longtimestamp,TimeWindowwindow,TriggerContextctx){
ctx.registerProcessingTimeTimer(window.maxTimestamp());
returnTriggerResult.CONTINUE;
publicTriggerResultonEventTime(longtime,TimeWindowwindow,TriggerContextctx)throwsException{
returnTriggerResult.CONTINUE;
publicTriggerResultonProcessingTime(longtime,TimeWindowwindow,TriggerContextctx){
returnTriggerResult.FIRE;
publicvoidclear(TimeWindowwindow,TriggerContextctx)throwsException{
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
publicbooleancanMerge(){
returntrue;
publicvoidonMerge(TimeWindowwindow,OnMergeContextctx){
longwindowMaxTimestamp=window.maxTimestamp();
if(windowMaxTimestampctx.getCurrentProcessingTime()){
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
publicStringtoString(){
return"ProcessingTimeTrigger()";
publicstaticProcessingTimeTriggercreate(){
returnnewProcessingTimeTrigger();
在onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數(shù)是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發(fā)并調(diào)用onProcessingTime()方法,在onProcessingTime()方法中,returnTriggerResult.FIRE即返回FIRE,觸發(fā)窗口中數(shù)據(jù)的計算,但是會保留窗口元素。
需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達的時候觸發(fā)窗口函數(shù)的計算,計算完成后并不會清除窗口中的數(shù)據(jù),這些數(shù)據(jù)存儲在內(nèi)存中,除非調(diào)用PURGE或FIRE_AND_PURGE,否則數(shù)據(jù)將一直存在內(nèi)存中。實際上,F(xiàn)link中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數(shù)據(jù)進行清除。
常見窗口的Trigger
滾動窗口
TumblingEventTimeWindows:EventTimeTrigger
publicclassTumblingEventTimeWindowsextendsWindowAssignerObject,TimeWindow{
publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){
returnEventTimeTrigger.create();
TumblingProcessingTimeWindows:ProcessingTimeTrigger
publicclassTumblingProcessingTimeWindowsextendsWindowAssignerObject,TimeWindow{
publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){
returnProcessingTimeTrigger.create();
滑動窗口
SlidingEventTimeWindows:EventTimeTrigger
publicclassSlidingEventTimeWindowsextendsWindowAssignerObject,TimeWindow{
publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){
returnEventTimeTrigger.create();
SlidingProcessingTimeWindows:ProcessingTimeTrigger
publicclassSlidingProcessingTimeWindowsextendsWindowAssignerObject,TimeWindow{
publicTriggerObject,TimeWindowgetDefaultTrigger(StreamExecutionEnvironmentenv){
returnProcessingTimeTrigger.create();
會話窗口
Even
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 老年終末期尿失禁的護理干預方案循證評價
- 老年糖尿病患者的跌倒預防策略-1
- 我國上市公司海外并購績效的多維度剖析與提升策略研究
- 電氣電子產(chǎn)品環(huán)保檢測員風險評估考核試卷含答案
- 老年癡呆癥篩查的倫理委員會職責
- 老年癡呆照護:法律風險與人文關懷
- 2026云南昭通綏江縣農(nóng)業(yè)農(nóng)村局招聘城鎮(zhèn)公益性崗位工作人員的1人備考題庫及答案詳解1套
- 2026年威海臨港經(jīng)濟技術開發(fā)區(qū)鎮(zhèn)屬事業(yè)單位公開招聘初級綜合類崗位人員備考題庫(2人)及參考答案詳解1套
- 2026廣西來賓市象州縣第四幼兒園招聘幼兒園教師崗位見習生2人備考題庫及參考答案詳解一套
- 人體胚胎發(fā)育:FGF 作用課件
- 北京市順義區(qū)2025-2026學年八年級上學期期末考試英語試題(原卷版+解析版)
- 中學生冬季防溺水主題安全教育宣傳活動
- 2026年藥廠安全生產(chǎn)知識培訓試題(達標題)
- 2026年陜西省森林資源管理局局屬企業(yè)公開招聘工作人員備考題庫及參考答案詳解1套
- 冷庫防護制度規(guī)范
- 承包團建燒烤合同范本
- 口腔種植牙科普
- 2025秋人教版七年級全一冊信息科技期末測試卷(三套)
- 搶工補償協(xié)議書
- 廣東省廣州市番禺區(qū)2026屆高一數(shù)學第一學期期末聯(lián)考試題含解析
- 2026年廣東省佛山市高三語文聯(lián)合診斷性考試作文題及3篇范文:可以“重讀”甚至“重構”這些過往
評論
0/150
提交評論