版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
6.狀態(tài)和檢查點(diǎn)本章將重點(diǎn)圍繞狀態(tài)、檢查點(diǎn)(Checkpoint)和保存點(diǎn)(Savepoint)三個概念來介紹如何在Flink上進(jìn)行有狀態(tài)的計算。在Flink架構(gòu)體系中,有狀態(tài)計算可以說是Flink非常重要的特性之一。有狀態(tài)計算是指在程序計算過程中,在Flink程序內(nèi)部存儲計算產(chǎn)生的中間結(jié)果,并提供給后續(xù)Function或算子計算結(jié)果使用。檢查點(diǎn)是Flink保證exactly-once的重要特性。通過本節(jié)學(xué)習(xí)您將可以:掌握Flink中幾種常用的狀態(tài)以及具體使用方法。掌握Checkpoint機(jī)制的原理和配置方法。了解Savepoint機(jī)制的原理和使用方法。實(shí)現(xiàn)有狀態(tài)的計算Checkpoint原理及配置方法Savepoint原理及使用方法
什么是有狀態(tài)的計算有狀態(tài)計算的潛在場景數(shù)據(jù)去重:需要記錄哪些數(shù)據(jù)已經(jīng)流入過應(yīng)用,當(dāng)新數(shù)據(jù)流入時,根據(jù)已流入數(shù)據(jù)去重檢查輸入流是否符合某個特定模式:之前流入的數(shù)據(jù)以狀態(tài)的形式緩存下來對一個窗口內(nèi)的數(shù)據(jù)進(jìn)行聚合分析,比如分析一小時內(nèi)某項(xiàng)指標(biāo)75分位值或99分位值Flink分布式計算,一個算子有多個算子子任務(wù)狀態(tài)可以被理解為某個算子子任務(wù)在當(dāng)前實(shí)例上的一個變量,變量記錄了數(shù)據(jù)流的歷史信息,新數(shù)據(jù)流入,可以結(jié)合歷史信息來進(jìn)行計算接收輸入流/獲取對應(yīng)狀態(tài)/更新狀態(tài)狀態(tài)管理的難點(diǎn)要解決問題:實(shí)時性,延遲不能太高數(shù)據(jù)不丟不重、恰好計算一次,尤其發(fā)生故障恢復(fù)后程序的可靠性要高,保證7*24小時穩(wěn)定運(yùn)行難點(diǎn)不能將狀態(tài)直接交由內(nèi)存,因?yàn)閮?nèi)存空間有限用持久化的系統(tǒng)備份狀態(tài),出現(xiàn)故障時,如何從備份中恢復(fù)需要考慮擴(kuò)展到多個節(jié)點(diǎn)時的伸縮性Flink解決了上述問題,提供有狀態(tài)的計算APIManaged
State和Raw
State托管狀態(tài)(ManagedState)是由Flink管理的,F(xiàn)link幫忙存儲、恢復(fù)和優(yōu)化原生狀態(tài)(RawState)是開發(fā)者自己管理的,需要自己序列化Managed
State又細(xì)分為Keyed
State和Operator
StateFlink的幾種狀態(tài)類型
ManagedStateRawState狀態(tài)管理方式FlinkRuntime托管,自動存儲、自動恢復(fù)、自動伸縮用戶自己管理狀態(tài)數(shù)據(jù)結(jié)構(gòu)Flink提供的常用數(shù)據(jù)結(jié)構(gòu),如ListState、MapState等字節(jié)數(shù)組:byte[]使用場景絕大多數(shù)Flink函數(shù)用戶自定義函數(shù)Keyed
State是KeyedStream上的狀態(tài),每個Key共享一個狀態(tài)OperatorState每個算子子任務(wù)共享一個狀態(tài)Keyed
State和Operator
StateKeyed
State相同Key的數(shù)據(jù)可以訪問、更新這個狀態(tài)Operator
State流入這個算子子任務(wù)的所有數(shù)據(jù)可以訪問、更新這個狀態(tài)Keyed
State和Operator
State都是基于本地的,每個算子子任務(wù)維護(hù)著自身的狀態(tài),不能訪問其他算子子任務(wù)的狀態(tài)具體的實(shí)現(xiàn)層面,Keyed
State需要重寫Rich
Function函數(shù)類,Operator
State需要實(shí)現(xiàn)CheckpointedFunction等接口Keyed
State和Operator
State
KeyedStateOperatorState適用算子類型只適用于KeyedStream上的算子可以用于所有算子狀態(tài)分配每個Key對應(yīng)一個狀態(tài)一個算子子任務(wù)對應(yīng)一個狀態(tài)創(chuàng)建和訪問方式重寫RichFunction,通過里面的RuntimeContext訪問實(shí)現(xiàn)CheckpointedFunction等接口橫向擴(kuò)展?fàn)顟B(tài)隨著Key自動在多個算子子任務(wù)上遷移有多種狀態(tài)重新分配的方式支持的數(shù)據(jù)結(jié)構(gòu)ValueState、ListState、MapState等ListState、BroadcastState等修改Flink應(yīng)用的并行度:每個算子的并行算子子任務(wù)數(shù)發(fā)生了變化,整個應(yīng)用需要關(guān)停和啟動一些算子子任務(wù)某份在原來某個算子子任務(wù)的狀態(tài)需要平滑更新到新的算子子任務(wù)上Flink的Checkpoint可以輔助狀態(tài)數(shù)據(jù)在算子子任務(wù)之間遷移算子子任務(wù)生成快照(Snapshot)保存到分布式存儲上子任務(wù)重啟后,相應(yīng)的狀態(tài)在分布式存儲上重建(Restore)Keyed
State與Operator
State的橫向擴(kuò)展方式稍有不同橫向擴(kuò)展問題Flink提供了封裝好的數(shù)據(jù)結(jié)構(gòu)供我們使用,包括ValueState、ListState等主要有:ValueState:單值MapState:Key-Value對ListState:列表ReducingState和AggregatingState:合并Keyed
State由于跟Key綁定,Key自動分布到不同算子子任務(wù),Keyed
State也可以根據(jù)Key分發(fā)到不同算子子任務(wù)上Keyed
State實(shí)現(xiàn)RichFunction函數(shù)類,比如RichFlatMapFunction創(chuàng)建StateDescriptor,StateDescriptor描述狀態(tài)的名字和狀態(tài)的數(shù)據(jù)結(jié)構(gòu),每種類型的狀態(tài)有對應(yīng)的StateDescriptor通過StateDescriptor,從RuntimeContext中獲取狀態(tài)調(diào)用狀態(tài)提供的方法,獲取狀態(tài),更新數(shù)據(jù)Keyed
State//創(chuàng)建StateDescriptor
MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通過StateDescriptor獲取運(yùn)行時上下文中的狀態(tài)
behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);MapState<UK,UV>:UVget(UKkey)voidput(UKkey,UVvalue)booleancontains(UKkey)…案例:統(tǒng)計電商用戶行為UserBehavior場景下,某個用戶(userId)下某種用戶行為(behavior)的數(shù)量Keyed
State/**
*MapStateFunction繼承并實(shí)現(xiàn)RichFlatMapFunction*兩個泛型分別為輸入數(shù)據(jù)類型和輸出數(shù)據(jù)類型*/
publicstaticclass
MapStateFunction
extends
RichFlatMapFunction<UserBehavior,Tuple3<Long,String,Integer>>{//指向MapState的句柄
privateMapState<String,Integer>behaviorMapState;@Overridepublicvoidopen(Configurationconfiguration){//創(chuàng)建StateDescriptor
MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通過StateDescriptor獲取運(yùn)行時上下文中的狀態(tài)
behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);}@OverridepublicvoidflatMap(UserBehaviorinput,Collector<Tuple3<Long,String,Integer>>out)throwsException{intbehaviorCnt=1;//behavior有可能為pv、cart、fav、buy等
//判斷狀態(tài)中是否有該behavior
if(behaviorMapState.contains(input.behavior)){behaviorCnt=behaviorMapState.get(input.behavior)+1;}//更新狀態(tài)
behaviorMapState.put(input.behavior,behaviorCnt);out.collect(Tuple3.of(input.userId,input.behavior,behaviorCnt));}}使用MapState記錄某個behavior下的數(shù)量<behavior,
behaviorCnt>UserBehavior案例先基于userId進(jìn)行keyBy再使用有狀態(tài)的MapStateFunction進(jìn)行處理Keyed
Stateenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorStream=...//生成一個KeyedStream
KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上進(jìn)行flatMap
DataStream<Tuple3<Long,String,Integer>>behaviorCountStream=keyedStream.flatMap(newMapStateFunction());狀態(tài):算子子任務(wù)的本地數(shù)據(jù)在Checkpoint過程時寫入存儲,這個過程被稱為備份(Snapshot)初始化或重啟一個Flink作業(yè)時,以一定邏輯從存儲中讀出并變?yōu)樗阕幼尤蝿?wù)的本地數(shù)據(jù),這個過程被稱為重建(Restore)Keyed
State開箱即用:數(shù)據(jù)劃分基于Key,Snapshot和Restore過程可以基于Key在多個算子子任務(wù)之間做數(shù)據(jù)遷移Operator
State每個算子子任務(wù)管理自己的狀態(tài),流入到這個算子子任務(wù)上的所有數(shù)據(jù)可以訪問和修改Operator
State故障重啟后,數(shù)據(jù)流中某個元素不一定流入重啟前的算子子任務(wù)上需要根據(jù)具體業(yè)務(wù)場景設(shè)計Snapshot和Restore的邏輯使用CheckpointedFunction接口類Operator
StateFlink定期執(zhí)行Checkpoint,會將狀態(tài)數(shù)據(jù)Snapshot到存儲上每次執(zhí)行Snapshot,會調(diào)用snapshotState()方法,因此我們要實(shí)現(xiàn)一些Snapshot邏輯,比如將哪些狀態(tài)持久化initializeState()在算子子任務(wù)初始化狀態(tài)時調(diào)用,有兩種被調(diào)用的可能:整個Flink作業(yè)第一次執(zhí)行,狀態(tài)數(shù)據(jù)需要初始化一個默認(rèn)值Flink作業(yè)遇到故障重啟,基于之前已經(jīng)持久化的狀態(tài)恢復(fù)ListState
/
UnionListStateBroadcastStateOperator
Statepublic
interface
CheckpointedFunction{//Checkpoint時會調(diào)用這個方法,我們要實(shí)現(xiàn)具體的snapshot邏輯,比如將哪些本地狀態(tài)持久化
void
snapshotState(FunctionSnapshotContextcontext)
throwsException;//初始化時會調(diào)用這個方法,向本地狀態(tài)中填充數(shù)據(jù)
void
initializeState(FunctionInitializationContextcontext)
throwsException;}CheckpointedFunction源碼狀態(tài)以列表的形式序列化并存儲單個狀態(tài)為S,每個算子子任務(wù)有零到多個狀態(tài),共同組成一個列表ListState[S],Snapshot時將這些狀態(tài)以列表形式寫入存儲包含所有狀態(tài)的大列表,當(dāng)作業(yè)重啟時,將這個大列表重新分布到各個算子子任務(wù)上ListState:將大列表按照Round-Ribon模式均勻分布到各個算子子任務(wù)上,每個算子子任務(wù)得到的是大列表的子集UnionListState:將大列表廣播給所有算子子任務(wù)應(yīng)用場景:Source上保存流入數(shù)據(jù)的偏移量,Sink上對輸出數(shù)據(jù)做緩存Operator
State
–
ListState、UnionListStateOperator
State使用方法重點(diǎn)實(shí)現(xiàn)snapshotState()和initializeState()兩個方法在initializeState()方法里初始化并獲取狀態(tài)注冊StateDescriptor,指定狀態(tài)名字和數(shù)據(jù)類型從FunctionInitializationContext中獲取OperatorStateStore,進(jìn)而獲取Operator
State在snapshotState()方法里實(shí)現(xiàn)一些業(yè)務(wù)邏輯基于ListState實(shí)現(xiàn)可緩存的Sink//重寫CheckpointedFunction中的snapshotState()
//將本地緩存Snapshot到存儲上
@OverridepublicvoidsnapshotState(FunctionSnapshotContextcontext)throwsException{//將之前的Checkpoint清理
checkpointedState.clear();for(Tuple2<String,Integer>element:bufferedElements){//將最新的數(shù)據(jù)寫到狀態(tài)中
checkpointedState.add(element);}}//重寫CheckpointedFunction中的initializeState()
//初始化狀態(tài)
@OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{//注冊ListStateDescriptor
ListStateDescriptor<Tuple2<String,Integer>>descriptor=newListStateDescriptor<>("buffered-elements",TypeInformation.of(newTypeHint<Tuple2<String,Integer>>(){}));//從FunctionInitializationContext中獲取OperatorStateStore,進(jìn)而獲取ListState
checkpointedState=context.getOperatorStateStore().getListState(descriptor);//如果是作業(yè)重啟,讀取存儲中的狀態(tài)數(shù)據(jù)并填充到本地緩存中
if(context.isRestored()){for(Tuple2<String,Integer>element:checkpointedState.get()){bufferedElements.add(element);}}}Sink先將數(shù)據(jù)放在本地緩存中,并定期通過snapshotState()方法進(jìn)行SnapshotinitializeState()初始化狀態(tài),需判斷是新作業(yè)還是重啟作業(yè)snapshotState()initializeState()Broadcast可以將部分?jǐn)?shù)據(jù)同步到所有實(shí)例上使用場景:一個主數(shù)據(jù)流,一個控制規(guī)則流,主數(shù)據(jù)流比較大,只能分散在多個算子實(shí)例上,控制規(guī)則流數(shù)據(jù)比較小,可以廣播分發(fā)到所有算子實(shí)例上。與Join的區(qū)別:控制規(guī)則流較小,可以放到每個算子實(shí)例里電商用戶行為分析案例:識別用戶行為模式,行為模式包括“反復(fù)猶豫下單類”、“頻繁爬取數(shù)據(jù)類”等,控制流里包含了這些行為模式,使用Flink實(shí)時識別用戶Broadcast
State主邏輯中讀取兩個數(shù)據(jù)流Broadcast
State支持Key-Value形式,需要使用MapStateDescriptor來構(gòu)建再使用broadcast()方法將數(shù)據(jù)廣播到所有算子子任務(wù)上,得到BroadcastStream主數(shù)據(jù)流先進(jìn)行keyBy(),然后與廣播流合并,在KeyedBroadcastProcessFunction中實(shí)現(xiàn)具體業(yè)務(wù)邏輯BroadcastPatternFunction是KeyedBroadcastProcessFunction的具體實(shí)現(xiàn)Broadcast
State//主數(shù)據(jù)流
DataStream<UserBehavior>userBehaviorStream=...//BehaviorPattern數(shù)據(jù)流
DataStream<BehaviorPattern>patternStream=...//BroadcastState只能使用Key->Value結(jié)構(gòu),基于MapStateDescriptor
MapStateDescriptor<Void,BehaviorPattern>broadcastStateDescriptor=newMapStateDescriptor<>("behaviorPattern",Types.VOID,Types.POJO(BehaviorPattern.class));BroadcastStream<BehaviorPattern>broadcastStream=patternStream.broadcast(broadcastStateDescriptor);//生成一個KeyedStream
KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上進(jìn)行connect()和process()
DataStream<Tuple2<Long,BehaviorPattern>>matchedStream=keyedStream.connect(broadcastStream).process(newBroadcastPatternFunction());processElement()方法處理主數(shù)據(jù)流中的每條元素,輸出零到多個數(shù)據(jù)processBroadcastElement()方法處理廣播流,可以輸出零到多個數(shù)據(jù),一般用來更新BroadcastStateKeyedBroadcastProcessFunction屬于ProcessFunction系列函數(shù),可以注冊Timer,并在onTimer方法中實(shí)現(xiàn)回調(diào)邏輯。KeyedBroadcastProcessFunction實(shí)現(xiàn)有狀態(tài)的計算Checkpoint原理及配置方法Savepoint原理及使用方法Flink的狀態(tài)是基于本地的,本地狀態(tài)數(shù)據(jù)不可靠Checkpoint機(jī)制:Flink定期將狀態(tài)數(shù)據(jù)保存到存儲上,故障發(fā)生后將狀態(tài)數(shù)據(jù)恢復(fù)。快照(Snapshot)、分布式快照(DistributedSnapshot)和檢查點(diǎn)(Checkpoint)均指的是Flink將狀態(tài)寫入存儲的過程一個簡單的Checkpoint流程:暫停處理新流入數(shù)據(jù),將新數(shù)據(jù)緩存下來將算子子任務(wù)的本地狀態(tài)數(shù)據(jù)拷貝到一個遠(yuǎn)程的持久化存儲上繼續(xù)處理新流入的數(shù)據(jù),包括剛才緩存起來的數(shù)據(jù)Checkpoint機(jī)制檢查點(diǎn)分界線(CheckpointBarrier)被插入到數(shù)據(jù)流中,將數(shù)據(jù)流切分成段。Flink的算子接收到CheckpointBarrier后,對狀態(tài)進(jìn)行快照。每個CheckpointBarrier有一個ID,表示該段數(shù)據(jù)屬于哪次Checkpoint。當(dāng)ID為n的CheckpointBarrier到達(dá)每個算子后,表示要對n-1和n之間狀態(tài)更新做快照。
Checkpoint
Barrier構(gòu)建并行度為2的數(shù)據(jù)流圖Flink的檢查點(diǎn)協(xié)調(diào)器(CheckpointCoordinator)觸發(fā)一次Checkpoint(TriggerCheckpoint),這個請求會發(fā)送給Source的各個子任務(wù)。分布式快照流程各Source算子子任務(wù)接收到這個Checkpoint請求之后,會將自己的狀態(tài)寫入到狀態(tài)后端,生成一次快照向下游廣播CheckpointBarrier分布式快照流程Source算子做完快照后,還會給CheckpointCoodinator發(fā)送一個確認(rèn)(ACK)ACK中包括了一些元數(shù)據(jù),包括備份到State
Backend的狀態(tài)句柄(指向狀態(tài)的指針)Source算子完成了一次Checkpoint分布式快照流程對于下游算子來說,可能有多個與之相連的上游輸入。一個輸入被稱為一條通道。Id為n的Checkpoint
Barrier會被廣播到多個通道。不同通道的Checkpoint
Barrier傳播速度不同。需要進(jìn)行對齊(BarrierAlignment)對齊分四步:1
.算子子任務(wù)在某個輸入通道中收到第一個ID為n的CheckpointBarrier,其他輸入通道中ID為n的CheckpointBarrier還未到達(dá)。2
.算子子任務(wù)將第一個輸入通道的數(shù)據(jù)緩存下來,同時繼續(xù)處理其他輸入通道的數(shù)據(jù),這個過程被稱為對齊。3
.第二個輸入通道的CheckpointBarrier抵達(dá)該算子子任務(wù),該算子子任務(wù)執(zhí)行快照,將狀態(tài)寫入StateBackend,然后將ID為n的CheckpointBarrier向下游所有輸出通道廣播。4
.對于這個算子子任務(wù),快照執(zhí)行結(jié)束,繼續(xù)處理各個通道中新流入數(shù)據(jù),包括剛才緩存起來的數(shù)據(jù)。Checkpoint
Barrier對齊每個算子都要執(zhí)行一遍上述的對齊、快照、確認(rèn)的工作最后的Sink算子發(fā)送確認(rèn)后,說明ID為n的Checkpoint執(zhí)行結(jié)束,CheckpointCoordinator向StateBackend寫入一些本次Checkpoint的元數(shù)據(jù)Checkpoint完成CheckpointBarrier對齊時,必須等待所有上游通道都處理完。假如某個上游通道處理很慢,這可能造成整個數(shù)據(jù)流堵塞。一個算子子任務(wù)不需要等待所有上游通道的CheckpointBarrier,直接將CheckpointBarrier廣播,算子子任務(wù)直接執(zhí)行快照并繼續(xù)處理后續(xù)流入數(shù)據(jù)。Flink必須將那些上下游正在傳輸?shù)臄?shù)據(jù)也作為狀態(tài)保存到快照中。開啟Unaligned
Checkpoint:Unaligned
Checkpoint優(yōu)缺點(diǎn):不需要對齊,Checkpoint速度快傳輸數(shù)據(jù)也要快照,狀態(tài)數(shù)據(jù)大,磁盤負(fù)載加重,重啟后狀態(tài)恢復(fù)時間過長,運(yùn)維管理難度大Unaligned
Checkpointenv.getCheckpointConfig().enableUnalignedCheckpoints();
每次執(zhí)行數(shù)據(jù)快照時,不需要暫停新流入數(shù)據(jù)。Flink啟動一個后臺線程,它創(chuàng)建本地狀態(tài)的一份復(fù)制,這個線程用來將本地狀態(tài)的復(fù)制同步到StateBackend上,一旦數(shù)據(jù)同步完成,再給CheckpointCoordinator發(fā)送確認(rèn)信息。該過程被稱為異步快照(AsynchronousSnapshot)。利用寫入時復(fù)制(Copy-on-Write):如果這份內(nèi)存數(shù)據(jù)沒有任何修改,那沒必要生成一份復(fù)制,如果這份內(nèi)存數(shù)據(jù)有一些更新,那再去申請額外的內(nèi)存空間并維護(hù)兩份數(shù)據(jù),一份是快照時的數(shù)據(jù),一份是更新后的數(shù)據(jù)。是否開啟異步快照可配置。異步快照State
Backend用來持久化狀態(tài)數(shù)據(jù)Flink內(nèi)置三種State
Backend:MemoryStateBackendFsStateBackendRocksDBStateBackendState
Backend基于內(nèi)存,數(shù)據(jù)存儲在Java的堆區(qū)。進(jìn)行分布式快照時,所有算子子任務(wù)會將自己內(nèi)存上的狀態(tài)同步到JobManager的堆上,因此一個作業(yè)的所有狀態(tài)要小于JobManager的內(nèi)存大小,否則將拋出OutOfMemoryError異常。只適合調(diào)試或者實(shí)驗(yàn),不建議在生產(chǎn)環(huán)境下使用。如果不做其他聲明,默認(rèn)情況是使用這種模式作為StateBackend。設(shè)置使用內(nèi)存作為State
Backend,MAX_MEM_STATE_SIZE為設(shè)置的狀態(tài)的最大值:MemoryStateBackendenv.setStateBackend(newMemoryStateBackend(MAX_MEM_STATE_SIZE));基于文件系統(tǒng),數(shù)據(jù)最終持久化到文件系統(tǒng)上文件系統(tǒng)包括本地磁盤、HDFS、Amazon、阿里云等在內(nèi)的云存儲服務(wù),使用時需要提供文件系統(tǒng)的地址,寫明前綴:file://、hdfs://或s3://默認(rèn)開啟異步快照本地的狀態(tài)在TaskManager的堆內(nèi)存上,執(zhí)行快照時狀態(tài)數(shù)據(jù)會寫到文件系統(tǒng)上FsStateBackend//使用HDFS作為StateBackend
env.setStateBackend(newFsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));//使用阿里云OSS作為StateBackend
env.setStateBackend(newFsStateBackend("oss://<your-bucket>/<object-name>"));//使用Amazon作為StateBackend
env.setStateBackend(newFsStateBackend("s3://<your-bucket>/<endpoint>"));//關(guān)閉AsynchronousSnapshot
env.setStateBackend(newFsStateBackend(checkpointPath,false));本地狀態(tài)存儲在本地RocksDB上,Checkpoint時將RocksDB數(shù)據(jù)再寫到遠(yuǎn)程的存儲上,因此需要配置一個分布式存儲的地址。本地狀態(tài)基于RocksDB,可以突破內(nèi)存空間的限制,可存儲的狀態(tài)量更大。但RocksDB需要序列化和反序列化,讀寫時間成本高。支持增量快照(IncrementalCheckpoint):只對發(fā)生變化的數(shù)據(jù)增量寫到分布式存儲上,而不是將所有的本地狀態(tài)都拷貝過去。非常適合超大規(guī)模的狀態(tài)。但重啟恢復(fù)的時間更長。需要手動開啟:RocksDBStateBackend//開啟IncrementalCheckpoint
booleanenableIncrementalCheckpointing=true;env.setStateBackend(newRocksDBStateBackend(checkpointPath,enableIncrementalCheckpointing));默認(rèn)情況下,Checkpoint機(jī)制是關(guān)閉的,開啟:n表示每隔n毫秒進(jìn)行一次CheckpointCheckpoint耗時可能比較長,n設(shè)置過小,有可能出現(xiàn)一次Checkpoint還沒完成,下次Checkpoint已經(jīng)被觸發(fā),n設(shè)置過大,如果重啟,整個作業(yè)需要從更長的Offset開始重新處理數(shù)據(jù)開啟Checkpoint,使用Checkpoint
Barrier對齊功能,可以提供Exactly-Once語義At-Least-Once語義:不使用Checkpoint
Barrier對齊功能,但某些數(shù)據(jù)可能被處理多次一些其他Checkpoint設(shè)置,在CheckpointConfig中設(shè)置:Checkpoint相關(guān)配置env.enableCheckpointing(n)//使用At-Least-OncecheckpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);CheckpointConfigcheckpointCfg=env.getCheckpointConfig();重啟恢復(fù)流程:重啟應(yīng)用,在集群上重新部署數(shù)據(jù)流圖。從持久化存儲上讀取最近一次的Checkpoint數(shù)據(jù),加載到各算子子任務(wù)上。繼續(xù)處理新流入的數(shù)據(jù)。作業(yè)故障重啟,會暫停一段時間,這段時間上游數(shù)據(jù)仍然會繼續(xù)發(fā)送過來,作業(yè)重啟后,需要消化這些未處理的數(shù)據(jù)。重啟恢復(fù)流程由于異常導(dǎo)致故障,異常根源不消除,重啟后仍然出現(xiàn)故障,因此要避免無限次重啟。固定延遲(FixedDelay):作業(yè)每次失敗后,按照設(shè)定的時間間隔進(jìn)行重啟嘗試,重啟次數(shù)不會超過某個設(shè)定值失敗率(FailureRate):計算一個時間段內(nèi)作業(yè)失敗的次數(shù),如果失敗次數(shù)小于設(shè)定值,繼續(xù)重啟,否則不重啟不重啟(NoRestart)在conf/flink-conf.yaml設(shè)置或者在代碼中設(shè)置三種重啟策略實(shí)現(xiàn)有狀態(tài)的計算Checkpoint原理及配置方法Savepoint原理及使用方法Checkpoint和Savepoint生成的數(shù)據(jù)近乎一樣Checkpoint目的是為了故障重啟,使得重啟前后作業(yè)狀態(tài)一致Savepoint目的是手動備份數(shù)據(jù),以便進(jìn)行調(diào)試、遷移、迭代等狀態(tài)數(shù)據(jù)從零積累成本很高迭代:在初版代碼的基礎(chǔ)上,保留狀態(tài)到Savepoint中,方便修改業(yè)務(wù)邏輯遷移:把程序遷移到新的機(jī)房、集群等有計劃地備份、停機(jī),手動管理和刪除狀態(tài)數(shù)據(jù)場景:同一個作業(yè)不斷調(diào)整并行度,以找到最優(yōu)方案進(jìn)行A/B實(shí)驗(yàn),使用相同的狀態(tài)數(shù)據(jù)測試不同的程序版本Savepoint與Checkpoint的區(qū)別每個算子應(yīng)該分配一個唯一ID,Savepoint中的狀態(tài)數(shù)據(jù)以算子ID來存儲和區(qū)分不設(shè)置ID,F(xiàn)link自動為其分配一個ID算子IDDataStream<X>stream=env.//一個帶有OperatorState的Source,例如KafkaSource
.addSource(newStatefulSource()).uid(“source-id”)
//算子ID
.keyBy(...)//一個帶有KeyedState的StatefulMap
.map(newStatefulMapper()).uid(“mapper-id”)
//算子ID
//print是一種無狀態(tài)的Sink
.print();//Flink為其自動分配一個算子ID對某個作業(yè)的狀態(tài)進(jìn)行備份,將Savepoint目錄保存到某個目錄下:從某個Savepoint目錄中恢復(fù)一個作業(yè):備份和恢復(fù)$
./bin/flinksavepoint<jobId>[savepointDirectory]$
./bin/flinkrun-s<savepointPath>[OPTIONS]<xxx.jar>StateProcessorAPI:基于DataSet
API,讀寫和修改Savepoint數(shù)據(jù)Savepoint以一定的Schema存儲,像讀寫數(shù)據(jù)庫一樣讀寫SavepointReaderFunction是一個KeyedStateReaderFunction的實(shí)現(xiàn),需要實(shí)現(xiàn)open()和readKey()方法:open()方法中注冊StateDescriptorreadKey()方法中逐Key讀取數(shù)據(jù),輸出到Collector中從Savepoint中讀數(shù)據(jù)Savepoint中的數(shù)據(jù)存儲形式DataSet<Integer>listState=savepoint.readListState<>("source-id","os1",Types.INT);//ReaderFunction需要繼承并實(shí)現(xiàn)KeyedStateReaderFunction
DataSet<KeyedState>keyedState=savepoint.readKeyedState("mapper-id",newReaderFunction());向Savepoint中寫入狀態(tài),適合作業(yè)冷啟動構(gòu)建BootstrapTransformation操作,是一個狀態(tài)寫入的過程,可以理解為流處理時使用的有狀態(tài)的算子withOperator()向Savepoint中添加算子,參數(shù)分別為:算子ID一個BootstrapTransformationKeyed
State和Operator
State的BootstrapTransformation實(shí)現(xiàn)不同向Savepoint寫數(shù)據(jù)ExecutionEnvironmentbEnv=ExecutionEnvironment.getExecutionEnvironment();//最大并行度
intmaxParallelism=128;StateBackendbackend=...//準(zhǔn)備好寫入狀態(tài)的數(shù)據(jù)
DataSet<Account>accountDataSet=bEnv.fromCollection(accounts);//構(gòu)建一個BootstrapTransformation,將accountDataSet寫入
BootstrapTransformation<Account>transformation=OperatorTransformation.bootstrapWith(accountDataSet).keyBy(acc->acc.id).transform(newAccountBootstrapper());//創(chuàng)建算子,算子ID為accountsSavepoint.create(backend,maxParallelism).withOperator("accounts",transformation).write(savepointPath);bEnv.execute("bootstrap");Operator
State要實(shí)現(xiàn)StateBootstrapFunction實(shí)現(xiàn)processElement()方法,每來一個輸入,processElement()方法會被調(diào)用一次,用于將數(shù)據(jù)寫入Savepoint。Operator
State:StateBootstrapFunction/**
*繼承并實(shí)現(xiàn)StateBootstrapFunction
*泛型參數(shù)為輸入類型*/
public
class
SimpleBootstrapFunction
extends
StateBootstrapFunction<Integer>{privateListState<Integer>state;//每個輸入都會調(diào)用一次processElement,這里將輸入加入到狀態(tài)中
@Overridepublic
void
processElement(Integervalue,Contextctx)
throwsException{state.add(value);}@Overridepublic
void
snapshotState(FunctionSnapshotContextcontext)
throwsException{}//獲取狀態(tài)句柄
@Overridepublic
void
initializeState(FunctionInitializationContextcontext)
throwsException{state=context.getOperatorState().getListState(newListStateDescriptor<>("state",Types.INT));}}KeyedState要實(shí)現(xiàn)KeyedStateBootstrapFunction實(shí)現(xiàn)processElement()方法,每來一個輸入,processElement()方法會被調(diào)用一次,用于將數(shù)據(jù)寫入Savepoint。KeyedState:KeyedStateBootstrapFunction/**
*AccountBootstrapper繼承并實(shí)現(xiàn)了KeyedStateBootstrapFunction
*第一個泛型Integer為Key類型*第二個泛型Account為輸入類型*/
public
class
AccountBootstrapper
extends
KeyedStateBootstrapFunction<Integer,Account>{ValueState<Double>state;//獲取狀態(tài)句柄
@Overridepublic
void
open(Configurationparameters)
{ValueStateDescriptor<Double>descriptor=new
ValueStateDescriptor<>("total",Types.DOUBLE);state=getRuntimeContext().getState(descriptor);}//每個輸入都會調(diào)用一次processElement()@Overridepublic
void
processElement(Accountvalue,Contextctx)
throwsException{state.update(value.amount);}}從已有的Savepoint上修改,保存。removeOperator()將一個算子狀態(tài)數(shù)據(jù)從Savepoint中刪除。withOperator()方法增加了一個算子。write()方法將數(shù)據(jù)寫入一個路徑下。修改SavepointBootstrapTransformation<Integer>transformation=OperatorTransformation.bootstrapWith(data).transform(newModifyProcessFunction());Savepoint.load(bEnv,savepointPath,backend)//刪除名為currency的算子
.removeOperator("currency")//增加名為numbers的算子,使用transformation構(gòu)建其狀態(tài)數(shù)據(jù)
.withOperator("number",transformation)//新的Savepoint會寫到modifyPath路徑下
.write(modifyPath);本章中我們和讀者介紹了,如何進(jìn)行有狀態(tài)的計算。Flink中的狀態(tài)主要包括:KeyedState和OperatorState。狀態(tài)可以借助Checkpoint或Savepoint機(jī)制被持久化保存到存儲空間上,Checkpoint用于故障恢復(fù),Savepoint用于狀態(tài)的迭代更新。7.Flink連接器本章將詳細(xì)介紹Flink的Connector相關(guān)知識。在實(shí)際生產(chǎn)環(huán)境中,數(shù)據(jù)可能存放在不同的系統(tǒng)中,比如文件系統(tǒng)、數(shù)據(jù)庫或消息隊(duì)列。一個完整的Flink作業(yè)包括Source和Sink兩大模塊,Source和Sink肩負(fù)著Flink與外部系統(tǒng)進(jìn)行數(shù)據(jù)交互的重要功能,它們又被稱為連接器(Connector)。通過本節(jié)學(xué)習(xí)您將可以:掌握Flink端到端的Exactly-Once保障。掌握自定義Source和Sink。熟悉Flink中常用的Connector,如文件系統(tǒng)、Kafka等。端到端的Exactly-Once自定義Source和Sink常用流式連接器
端到端Exactly-OnceExactly-Once:某條數(shù)據(jù)投遞到某個流處理系統(tǒng)后,該系統(tǒng)對這條數(shù)據(jù)只處理一次有數(shù)據(jù)重發(fā)(Replay)問題:作業(yè)重啟后,Source必須從某個Offset位置重新發(fā)送數(shù)據(jù)數(shù)據(jù)重發(fā)會導(dǎo)致一條輸入數(shù)據(jù)可能多次影響下游系統(tǒng),有可能產(chǎn)生At-Least-Once的效果,沒有達(dá)到Exactly-Once的效果為了達(dá)到端到端的Exactly-Once,必須:Source有重發(fā)功能Sink支持冪等寫或事務(wù)寫冪等寫(IdempotentWrite):任意多次向一個系統(tǒng)寫入數(shù)據(jù),只對目標(biāo)系統(tǒng)產(chǎn)生一次結(jié)果影響:重復(fù)向一個HashMap里插入同一個Key-Value對,第一次插入時這個HashMap發(fā)生變化,后續(xù)的插入操作不會改變HashMap的結(jié)果。Key-Value必須是可確定性(Deterministic)計算的:Key為name
+
curTimestamp,curTimestamp一直變化,Key非可確定性Key為name+eventTimestamp,Event
Time確定,Key可確定性有短暫的數(shù)據(jù)閃回現(xiàn)象:只有當(dāng)后續(xù)所有數(shù)據(jù)都重發(fā)一遍后,所有應(yīng)該被覆蓋的Key都被最新數(shù)據(jù)覆蓋后,整個系統(tǒng)才達(dá)到數(shù)據(jù)的一致狀態(tài)。冪等寫事務(wù)寫(TransactionWrite):Flink先將待輸出的數(shù)據(jù)保存下來暫時不向外部系統(tǒng)提交,等待Checkpoint結(jié)束的時刻,F(xiàn)link上下游所有算子的數(shù)據(jù)都是一致時,將之前保存的數(shù)據(jù)全部提交(Commit)到外部系統(tǒng):預(yù)寫日志(Write-Ahead-Log,WAL)兩階段提交(Two-Phase-Commit,2PC)Write-Ahead-Log方式使用OperatorState緩存待輸出的數(shù)據(jù)Two-Phase-Commit方式需要外部系統(tǒng)自身就支持事務(wù)(比如Kafka)端到端的Exactly-Once,犧牲了低延遲,數(shù)據(jù)分批次地提交事物寫端到端的Exactly-Once自定義Source和Sink常用流式連接器Flink在1.11對Source進(jìn)行了重構(gòu),改動較大,之前的稱為老Source接口,之后的稱為新Source接口老Source接口實(shí)現(xiàn)SourceFunction:接口類SourceFunctionRich函數(shù)類RichSourceFunction必須實(shí)現(xiàn)兩個方法:run()和cancel()方法:run()方法:Source啟動后開始運(yùn)行,在方法中使用循環(huán),循環(huán)內(nèi)不斷向下游發(fā)送數(shù)據(jù)cancel()方法:停止向下游繼續(xù)發(fā)送數(shù)據(jù)老Source接口//Source啟動后調(diào)用run方法,生成數(shù)據(jù)向下游發(fā)送
void
run(SourceContext<T>ctx)
throwsException;//停止
void
cancel();使用標(biāo)志位isRunning標(biāo)記Source是否在運(yùn)行run()方法內(nèi)一直循環(huán),使用SourceContext.collect()方法收集數(shù)據(jù),發(fā)送到下游停止Source時,要修改標(biāo)志位isRunning主邏輯中調(diào)用:老Source接口private
static
class
SimpleSource
implements
SourceFunction<Tuple2<String,Integer>>{private
intoffset=0;private
booleanisRunning=true;@Overridepublic
void
run(SourceContext<Tuple2<String,Integer>>ctx)
throwsException{while(isRunning){Thread.sleep(500);ctx.collect(newTuple2<>(""+offset,offset));offset++;if(offset==1000){isRunning=false;}}}@Overridepublic
void
cancel()
{isRunning=false;}}自定義Source:將數(shù)字發(fā)送到下游DataStream<Tuple2<String,Integer>>countStream=env.addSource(newSimpleSource());前頁的例子沒有進(jìn)行任何Checkpoint,重啟后從0重新開始,為了整個作業(yè)重啟后可恢復(fù),Source需要支持重發(fā),將Offset作為狀態(tài)記錄下來使用Operator
State記錄Offset,需要繼承CheckpointedFunction接口類,實(shí)現(xiàn)snapshotState()和initializeState()方法整個作業(yè)第一次啟動時,調(diào)用initializeState()方法,offset為0,之后每隔一段時間調(diào)用snapshotState()將狀態(tài)數(shù)據(jù)進(jìn)行Checkpoint可恢復(fù)的Source@Overridepublic
void
snapshotState(FunctionSnapshotContextsnapshotContext)
throwsException{//清除上次狀態(tài)
offsetState.clear();//將最新的offset添加到狀態(tài)中
offsetState.add(offset);}@Overridepublic
void
initializeState(FunctionInitializationContextinitializationContext)
throwsException{//初始化offsetState
ListStateDescriptor<Integer>desc=newListStateDescriptor<Integer>("offset",Types.INT);offsetState=initializationContext.getOperatorStateStore().getListState(desc);Iterable<Integer>iter=offsetState.get();if(iter==null||!iter.iterator().hasNext()){//第一次初始化,從0開始計數(shù)
offset=0;}else{//從狀態(tài)中恢復(fù)offset
offset=iter.iterator().next();}}privateListState<Integer>offsetState;在Source發(fā)送數(shù)據(jù)時也設(shè)置數(shù)據(jù)對應(yīng)的時間戳,并生成Watermark:collectWithTimestamp()方法,發(fā)送數(shù)據(jù)的同時也設(shè)置時間戳emitWatermark()方法,生成Watermark越早設(shè)置時間戳和Watermark,越能保證整個作業(yè)在時間序列上的準(zhǔn)確性和健壯性時間戳和Watermark@Overridepublic
void
run(SourceContext<Tuple2<String,Integer>>ctx)
throwsException{while(isRunning){ Thread.sleep(100);//將系統(tǒng)當(dāng)前時間作為該條數(shù)據(jù)的時間戳發(fā)送出去
ctx.collectWithTimestamp( newTuple2<>(""+offset,offset),System.currentTimeMillis());offset++;//每隔一段時間,發(fā)送一個Watermark
if(offset%100==0){ctx.emitWatermark(newWatermark(System.currentTimeMillis()));}if(offset==1000){isRunning=false;}}}老Source接口只適合流處理,不適合批處理,新的Source接口統(tǒng)一了流批處理,提供了更大規(guī)模并行處理能力三個組件:分片(Split):將數(shù)據(jù)源切分后的一小部分。讀取器(SourceReader):在TaskManager上,負(fù)責(zé)Split的讀取和處理,可分布式地并行運(yùn)行。例如,單個SourceReader可以讀取文件夾里的單個文件,多個SourceReader實(shí)例共同完成讀取整個文件夾的任務(wù)。分片枚舉器(SplitEnumerator):在JobManager上,負(fù)責(zé)發(fā)現(xiàn)和分配Split,按照負(fù)載均衡策略將多個Split分配到多個SourceReader。新Source接口類SinkFunctionRich函數(shù)類RichSinkFunction實(shí)現(xiàn)invoke()方法如果想達(dá)到端到端Exactly-Once,需要實(shí)現(xiàn)冪等寫和事務(wù)寫冪等寫:使用一些Key-Value存儲,并設(shè)計好Key,采用更新插入(Upsert)方式,將舊數(shù)據(jù)覆蓋事務(wù)寫:Write-Ahead-Log、Two-Phase-Commit
自定義Sink//每條數(shù)據(jù)到達(dá)Sink后都會調(diào)用invoke方法,發(fā)送到下游外部系統(tǒng)
//value為待輸出數(shù)據(jù)
void
invoke(INvalue,Contextcontext)在數(shù)據(jù)寫入到下游系統(tǒng)之前,先把數(shù)據(jù)以日志(Log)的形式緩存下來,等收到明確的確認(rèn)提交信息后,再將Log中的數(shù)據(jù)提交到下游系統(tǒng)0、兩次Checkpoint之間的待輸出數(shù)據(jù)組成一個批次,待輸出批次緩存在Sink的Operator
State中1、接收到新的CheckpointBarrier
2、開啟一個新待輸出批次3、Sink向CheckpointCommitter查詢某批次是否已經(jīng)提交。CheckpointCommitter是一個與外部系統(tǒng)緊密相連的插件,里面存儲了各批次數(shù)據(jù)是否已經(jīng)寫入到外部系統(tǒng)4、Sink得知某批次數(shù)據(jù)還未提交,則使用sendValues()方法,發(fā)送待輸出的數(shù)據(jù)到外部系統(tǒng)5、提交成功后,Sink會刪除OperatorState中存儲的這些數(shù)據(jù)Write-Ahead-Log待輸出數(shù)據(jù)直接寫入外部系統(tǒng),與外部系統(tǒng)一起協(xié)作提供事物寫功能0、Sink直接將待發(fā)送數(shù)據(jù)寫到外部系統(tǒng)的第k次事務(wù)(Transaction)中1、接收到新的CheckpointBarrier
2、preCommit()將第k次Transaction的數(shù)據(jù)預(yù)提交到外部系統(tǒng)中,數(shù)據(jù)寫到外部系統(tǒng),但是并未確認(rèn),外部系統(tǒng)也不可見3、beginTransaction()方法,開啟下一次Transaction(Transactionk+1),在這之后的上游算子流入的待輸入數(shù)據(jù)都將流入新的Transaction(k+1)4、第2步和第3步完成后,執(zhí)行commit()方法,確認(rèn)提交Transaction
k,該批次數(shù)據(jù)在外部可見Two-Phase-Commit端到端的Exactly-Once自定義Source和Sink常用流式連接器內(nèi)置I/O(Input/Output)接口flink-connector項(xiàng)目所涉及的ConnectorApacheBahir所提供的Connector
系統(tǒng)類型:消息隊(duì)列、數(shù)據(jù)庫、文件系統(tǒng)具體技術(shù):Kafka、Elasticsearch、HBase、Cassandra、JDBC、Kinesis、Redis
…常用流式連接器基于Socket的Source和Sink無法實(shí)現(xiàn)數(shù)據(jù)重發(fā),適合用來調(diào)試基于內(nèi)存集合的Source打印到標(biāo)準(zhǔn)輸出的Sinkprint()打印到STDOUTprintToErr()打印到STDERR數(shù)據(jù)類型要實(shí)現(xiàn)toString()方法實(shí)際是在TaskManager上執(zhí)行內(nèi)置I/O接口//讀取Socket中的數(shù)據(jù),數(shù)據(jù)流元素之間用\n來切分
env.socketTextStream(hostname,port,"\n");//向Socket中寫數(shù)據(jù),數(shù)據(jù)以SimpleStringSchema序列化
stream.writeToSocket(outputHost,outputPort,newSimpleStringSchema());DataStream<Integer>sourceDataStream=env.fromElements(1,2,3);從內(nèi)存集合讀取數(shù)據(jù)從Socket中讀取數(shù)據(jù)通過文件系統(tǒng)描述符來確定使用什么文件系統(tǒng):hdfs://、s3://周期性檢測功能:每隔一定時間周期性地檢查filePath路徑下的內(nèi)容是否有更新基于文件系統(tǒng)的SourceStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StringtextPath=...//以UTF-8編碼格式讀取文件
DataStream<String>text=env.readTextFile(textPath)//文件路徑
StringfilePath=...//文件為純文本格式
TextInputFormattextInputFormat=newTextInputFormat(neworg.apache.flink.core.fs.Path(filePath));//每隔100毫秒檢測一遍
DataStream<String>inputStream=env.readFile(textInputFormat,filePath, FileProcessingMode.PROCESS_CONTINUOUSLY,100);簡單接口復(fù)雜接口writeAsText()方法:無法進(jìn)行Checkpoint,逐漸被廢棄StreamingFileSink行式存儲和列式存儲桶:輸出路徑的子文件夾可以按時間分桶基于文件系統(tǒng)的SinkDataStream<Address>stream=env.addSource(...)//使用StreamingFileSink將DataStream輸出為一個文本文件
StreamingFileSink<String>fileSink=StreamingFileSink.forRowFormat(newPath("/file/base/path"),
newSimpleStringEncoder<String>("UTF-8")).build();stream.addSink(fileSink);[base-path]/[bucket-path]/part-[task-id]-[id]/file/base/path└──2020-02-25--15├──part-0-0.inprogress.92c7be6f-8cfc-4ca3-905b-91b0e20ba9a9├──part-1-0.inprogress.18f9fa71-1525-4776-a7bc-fe02ee1f2ddaStreamingFileSink接口桶的文件夾結(jié)構(gòu)Kafka:被廣泛使用的消息隊(duì)列,非常具有代表性可以作為Flink的上游,此時要構(gòu)建Flink的Source;也可以作為Flink的下游,此時要構(gòu)建Flink的Sink不在Flink核心程序中,使用時需要額外在Maven中添加依賴Flink
Kafka
Connector<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.11.0</version></dependency>Kafka是一個Producer,F(xiàn)link作為Kafka的Consumer消費(fèi)Kafka中的數(shù)據(jù)創(chuàng)建FlinkKafkaConsumer需要三個參數(shù):Topic、反序列化方式和Kafka相關(guān)參數(shù)Kafka中傳輸?shù)氖嵌M(jìn)制的數(shù)據(jù),需要提供一個反序列化方式,將數(shù)據(jù)轉(zhuǎn)化為具體的Java或Scala對象開啟Flink
Checkpoint后,Checkpoint會記錄Offset,以進(jìn)行故障恢復(fù)Flink
Kafka
Source//Kafka參數(shù)
Propertiesproperties=newProperties();properties.setProperty("bootstrap.servers","localhost:9092");properties.setProperty("group.id","flink-group");StringinputTopic="Shakespeare";//Source
FlinkKafkaConsumer<String>consumer=newFlinkKafkaConsumer<String>(inputTopic,newSimpleStringSchema(),properties);DataStream<String>stream=env.addSource(consumer);Flink是Producer,向Kafka輸出數(shù)據(jù)創(chuàng)建FlinkKafkaProducer需要四個參數(shù):Topic、序列化方式、Kafka相關(guān)參數(shù)以及投遞保障序列化方式將Java/Scala對象轉(zhuǎn)化為Kafka中的二進(jìn)制數(shù)據(jù)三種投遞保障:NONE:不提供任何保障,數(shù)據(jù)可能會丟失也可能會重復(fù)。AT_LEAST_ONCE:保證不丟數(shù)據(jù),但是有可能會有重復(fù)。EXACTLY_ONCE:基于Kafka提供的事務(wù)寫功能,一條數(shù)據(jù)最終只寫入Kafka一次。FlinkKafkaSinkDataStream<Tuple2<String,Integer>>wordCount=...FlinkKafkaProducer<Tuple2<String,Integer>>producer=newFlinkKafkaProducer<Tuple2<String,Integer>>(outputTopic,newKafkaWordCountSerializationSchema(outputTopic),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 安徽郵電職業(yè)技術(shù)學(xué)院《抽樣技術(shù)與應(yīng)用》2023-2024學(xué)年第二學(xué)期期末試卷
- 廣東酒店管理職業(yè)技術(shù)學(xué)院《建筑設(shè)計初步(一)》2023-2024學(xué)年第二學(xué)期期末試卷
- 吉林工業(yè)職業(yè)技術(shù)學(xué)院《環(huán)境微生物》2023-2024學(xué)年第二學(xué)期期末試卷
- 惠州城市職業(yè)學(xué)院《藝術(shù)與國際關(guān)系學(xué)》2023-2024學(xué)年第二學(xué)期期末試卷
- 四川文軒職業(yè)學(xué)院《工程制圖強(qiáng)化實(shí)踐》2023-2024學(xué)年第二學(xué)期期末試卷
- 河西學(xué)院《中國古代文學(xué)6》2023-2024學(xué)年第二學(xué)期期末試卷
- 遼寧機(jī)電職業(yè)技術(shù)學(xué)院《汽車制造工藝學(xué)》2023-2024學(xué)年第二學(xué)期期末試卷
- 湖南生物機(jī)電職業(yè)技術(shù)學(xué)院《跨境電子商務(wù)運(yùn)營》2023-2024學(xué)年第二學(xué)期期末試卷
- 南京審計大學(xué)《公考申論》2023-2024學(xué)年第二學(xué)期期末試卷
- 2026浙江臺州椒江區(qū)第三中心幼兒園總園及分園教師招聘備考題庫及答案詳解(新)
- 2025年國家能源筆試題及答案
- T/CECS 10378-2024建筑用輻射致冷涂料
- 云南省昆明市云大附小小學(xué)六年級小升初期末英語試卷(含答案)
- 急性發(fā)熱課件
- 農(nóng)村建房合同協(xié)議書電子版(2025年版)
- SJG 46-2023 建設(shè)工程安全文明施工標(biāo)準(zhǔn)
- 部編版小學(xué)語文四年級上冊習(xí)作《我的心兒怦怦跳》精美課件
- DLT 593-2016 高壓開關(guān)設(shè)備和控制設(shè)備
- DB11∕T 190-2016 公共廁所建設(shè)標(biāo)準(zhǔn)
- 房屋過戶提公積金合同
- D-二聚體和FDP聯(lián)合檢測在臨床中的應(yīng)用現(xiàn)狀
評論
0/150
提交評論