版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
AllibabaGroup阿里巴巴集團(tuán)Flink社區(qū)微信公眾號(hào)102萬(wàn)行代碼,1270個(gè)問(wèn)題,F(xiàn)link新版發(fā)布了什么?從開(kāi)發(fā)到生產(chǎn)上線,如何確定集群規(guī)劃大小?11Demo:基于FlinkSQL構(gòu)建流式應(yīng)用FlinkCheckpoint問(wèn)題排查實(shí)用指南如何分析及處理Flink反壓?FlinkonYARN(上一張圖輕松掌握基礎(chǔ)架構(gòu)與啟動(dòng)流程FlinkonYARN(下常見(jiàn)問(wèn)題與排ApacheFlink與ApacheHive的集成72FlinkBatchSQL1.10實(shí)踐83如何在PyFlink1.10中自定義PythonUDF?90Flink1.10NativeKubernetes原理與實(shí)踐導(dǎo)讀:ApacheFlink是公認(rèn)的新一代開(kāi)源大數(shù)據(jù)計(jì)算引擎,可以支持流處理、批處理和機(jī)器學(xué)習(xí)等多種計(jì)算形態(tài),也是Apache軟件基金會(huì)和GitHub社區(qū)最2019年1月,阿里巴巴實(shí)時(shí)計(jì)算團(tuán)隊(duì)宣布將經(jīng)過(guò)雙十一歷練和集團(tuán)內(nèi)部業(yè)務(wù)打磨的Blink引擎進(jìn)行開(kāi)源并向ApacheFlink貢獻(xiàn)代碼,此后的一年中,阿里巴巴實(shí)時(shí)計(jì)算團(tuán)隊(duì)與ApacheFlink社區(qū)密切合作,持續(xù)推進(jìn)Flink對(duì)Blink的整合。式完成了Blink向Flink的合并。在此基礎(chǔ)之上,F(xiàn)link1.10版本在生產(chǎn)可用性、功能、性能上都有大幅提升。本文將詳細(xì)為大家介紹該版本的重大變更與新增特性。Flink1.10是迄今為止規(guī)模最大的一次版本升級(jí),除標(biāo)志著B(niǎo)link的合并完成外,還實(shí)現(xiàn)了Flink作業(yè)的整體性能及穩(wěn)定性的顯著優(yōu)化、Flink1.10.0版本一共有218名貢獻(xiàn)者,解決了1270個(gè)JIRAissue,經(jīng)由2661個(gè)commit總共提交了超過(guò)102萬(wàn)行102萬(wàn)行代碼,1270個(gè)問(wèn)題,F(xiàn)lin其中阿里巴巴實(shí)時(shí)計(jì)算團(tuán)隊(duì)共提交64.5萬(wàn)行代碼,超過(guò)總代碼量的60%,做出在該版本中,F(xiàn)link對(duì)SQL的DDL進(jìn)行了增強(qiáng),并實(shí)現(xiàn)了生產(chǎn)級(jí)別的Batch支持和Hive兼容,其中TPC-DS10T的性能更是達(dá)到了Hive3.0的7倍之多。在內(nèi)核方面,對(duì)內(nèi)存管理進(jìn)行了優(yōu)化。在生態(tài)方面,增加了PythonUDF和原生Kubernetes集成的支持。后續(xù)章節(jié)將在這些方面分別進(jìn)行詳細(xì)介紹。在舊版本的Flink中,流處理和批處理的內(nèi)存配置是割裂的,并置使用RocksDB存儲(chǔ)狀態(tài)數(shù)據(jù)時(shí),很難限制其內(nèi)存使用,從而在容器環(huán)境下經(jīng)常出在1.10.0中,我們對(duì)TaskExecutor的內(nèi)存模型,尤其是受管理內(nèi)存(Man- 此外,我們還將RocksDBstatebackend使用的內(nèi)存納入了托管范疇,同時(shí)可受控前的內(nèi)存使用情況(share-slot)受控后的內(nèi)存使用情況(share-slot)Batch兼容Hive且生產(chǎn)可用Flink從1.9.0版本開(kāi)始支持Hive集成,但并未完全兼容。在1.10.0中我們對(duì)Hive兼容性做了進(jìn)一步的增強(qiáng),使其達(dá)到生產(chǎn)可用的標(biāo)準(zhǔn)。具體來(lái)說(shuō),F(xiàn)link1.10Meta兼容-支持直接讀取Hivecatalog,覆蓋Hive1.x/2.x/3.x全部版本數(shù)據(jù)格式兼容-支持直接讀取Hive表,同時(shí)也支持寫(xiě)成Hive表的格式;支UDF兼容-支持在FlinkSQL內(nèi)直接調(diào)用Hive的UDF,UDTF和UDAF與此同時(shí),1.10.0版本中對(duì)batch執(zhí)行進(jìn)行了進(jìn)一步的優(yōu)化(FLINK-14133),向量化讀取ORC(FLINK-14135)基于比例的彈性內(nèi)存分配(FLIP-53)Shuffle的壓縮(FLINK-14845) 基于新調(diào)度框架的優(yōu)化(FLINK-14735)在此基礎(chǔ)上將Flink作為計(jì)算引擎訪問(wèn)Hive的meta和數(shù)據(jù),在TPC-DS10Tbenchmark下性能達(dá)到HivSQLDDL增強(qiáng)Flink1.10.0支持在SQL建表語(yǔ)句中定義watermark和計(jì)算列,以water-)PythonUDF支持Flink從1.9.0版本開(kāi)始增加了對(duì)Python的支持(PyFlink但用戶只能使用102萬(wàn)行代碼,1270個(gè)問(wèn)題,F(xiàn)linkJava開(kāi)發(fā)的User-defined-function(UDF),具有一定的局限性。在1.10.0中我們?yōu)镻yFlink增加了原生UDF支持(FLIP-58用戶現(xiàn)在可以在TableAPI/SQLhttps://enjoyment.cool/2020/02/19/Deep-dive-how-to-support-Python-UDF-in-Apache-Flink-1-10/原生Kubernetes集成Kubernetes(K8S)是目前最為流行的容器編排系統(tǒng),也是目前最流行的容器化需要對(duì)容器、算子及kubectl等K8S命令有所了解。在Flink1.10中,我們推出了對(duì)K8S環(huán)境的原生支持(FLINK-9953Flink 的資源管理器會(huì)主動(dòng)和Kubernetes通信,按需申請(qǐng)/projects/flink/flink-docs-stable/release-notes/flink-1.10.html2019年1月,阿里巴巴實(shí)時(shí)計(jì)算團(tuán)隊(duì)宣布Blink開(kāi)源。整整一年之后,F(xiàn)link1.10.0版本的發(fā)布宣告Flink和Blink的整合正式完成。我們踐行著自己的諾言放源碼,更相信社區(qū)的力量,相信社區(qū)是開(kāi)源協(xié)作精神與創(chuàng)新的搖籃。我們也衷心希望有更多的志同道合的小伙伴加入我們,一起把A從開(kāi)發(fā)到生產(chǎn)上線,如何確定集群規(guī)劃大小?在Flink社區(qū)中,最常被問(wèn)到的問(wèn)題之一是:在從開(kāi)發(fā)到生產(chǎn)上線的過(guò)程中如何確定集群的大小。這個(gè)問(wèn)題的標(biāo)準(zhǔn)答案顯然是“視情況而定”,但這并非一個(gè)有用的答案。本文概述了一系列的相關(guān)問(wèn)題,通過(guò)回答這些問(wèn)題,或許你能得出一些數(shù)字作每秒記錄數(shù)和每條記錄的大小狀態(tài)更新的次數(shù)和狀態(tài)后端的訪問(wèn)模式最后,一個(gè)更實(shí)際的問(wèn)題是與客戶之間圍繞停機(jī)時(shí)間、延遲和最大吞吐量的服務(wù)網(wǎng)絡(luò)容量,同時(shí)把使用網(wǎng)絡(luò)的外部服務(wù)也納入考慮,如Kafka、HDFS等。磁盤帶寬,如果您依賴于基于磁盤的狀態(tài)后端,如RocksDB(并考慮其他磁盤使用,如Kafka或HDFS)可用的機(jī)器數(shù)量、CPU和內(nèi)存基于所有這些因素,現(xiàn)在可以為正常運(yùn)行構(gòu)建一個(gè)基線,外加一個(gè)資源緩沖量用于恢復(fù)追趕或處理負(fù)載尖峰。建議您在建立基線時(shí)也考慮檢查點(diǎn)期間(checkpoint- 當(dāng)前在假設(shè)的集群上計(jì)劃作業(yè)部署,將建立資源使用基線的過(guò)程可視化。這些數(shù)字是粗略的值,它們并不全面——在文章的最后將進(jìn)一步說(shuō)明在進(jìn)行計(jì)算過(guò)程中遺漏Flink流計(jì)算作業(yè)和硬件示例在本案例中,我將部署一個(gè)典型的Flink流處理作業(yè),該作業(yè)使用Flink的Kafka數(shù)據(jù)消費(fèi)者從Kafka消息源中讀取數(shù)據(jù)。然后使用帶鍵的總計(jì)窗口運(yùn)算符假設(shè)吞吐量為每秒100萬(wàn)條消息。要了解窗口運(yùn)算符(windowoperator)的狀態(tài)大小,需要知道不同鍵的數(shù)目。在本例中,鍵(keys)是用戶id的數(shù)量,即500000000個(gè)不同的用戶。對(duì)于每個(gè)用戶,需要計(jì)算四個(gè)數(shù)字,存儲(chǔ)為長(zhǎng)整形(8如上圖所示,共有五臺(tái)機(jī)器在運(yùn)行作業(yè),每臺(tái)機(jī)器運(yùn)行一個(gè)Flink任務(wù)管理器機(jī)到運(yùn)行TaskManager的每臺(tái)計(jì)算機(jī)都由一個(gè)10千兆位以太網(wǎng)連接。Kafka緩存代理(brokers)在不同的機(jī)器上分開(kāi)運(yùn)行。每臺(tái)機(jī)器有16個(gè)CPU核。為了簡(jiǎn)化處理,不考慮CPU和內(nèi)存需求。但實(shí)際情況中,根據(jù)應(yīng)用程序邏輯和正在使用的狀態(tài)后端,我們需要注意內(nèi)存。這個(gè)例子使用了一個(gè)基于RocksDB的狀態(tài)后端,它穩(wěn)定并且內(nèi)存需求很低。要了解整個(gè)作業(yè)部署的資源需求,最容易的方法是先關(guān)注一臺(tái)計(jì)算機(jī)和一個(gè) 14>從開(kāi)發(fā)到生產(chǎn)上線,如何確定集群規(guī)機(jī)器視角圖-TaskManagern從上圖來(lái)看,keyBy是一個(gè)單獨(dú)運(yùn)算符,因此計(jì)算資源需求更容易。實(shí)際上,TheKafkasource要計(jì)算單個(gè)Kafka源(source)接收的數(shù)據(jù)量,我們首先計(jì)算K入。這些source每秒接收1000000條消息,每條消息大小為2KB。2KBx1,000,000/s=2GB/s2GB/s÷5臺(tái)機(jī)器=400MB/sKafkasource的計(jì)算過(guò)程TheShuffle/keyByShuffle過(guò)程將具有相同鍵的所有數(shù)據(jù)發(fā)送到一臺(tái)計(jì)算機(jī),因此需要將來(lái)自400MB/s÷5臺(tái)機(jī)器=80MB/s平均而言,我們必須向每臺(tái)計(jì)算機(jī)發(fā)送80MB/s的數(shù)據(jù)。此分析是從一臺(tái)機(jī)器 16>從開(kāi)發(fā)到生產(chǎn)上線,如何確定集群規(guī)400MB/s-80MB=320MB/sTheshuffle的計(jì)算過(guò)程Window窗口輸出和Kafka發(fā)送下一個(gè)要問(wèn)的問(wèn)題是窗口運(yùn)算符發(fā)出多少數(shù)據(jù)并發(fā)送到Kafka接收器。答案是分鐘發(fā)出一次當(dāng)前聚合總值。每個(gè)鍵從聚合中發(fā)出2個(gè)整形(user_id,window_ts)100000000個(gè)keysx40個(gè)字節(jié)=4GB(從每臺(tái)機(jī)器來(lái)看)4GB/分鐘÷60=67MB/秒(由每個(gè)任務(wù)管理器發(fā)出)這意味著每個(gè)任務(wù)管理器平均從窗口運(yùn)算符發(fā)出67MB/s的用戶數(shù)據(jù)。由于每個(gè)任務(wù)管理器上都有一個(gè)Kafka發(fā)送端(和窗口運(yùn)算符在同一個(gè)任務(wù)管理器中并且沒(méi)有進(jìn)一步的重新分區(qū),所以這得到的是Flink向K用戶數(shù)據(jù):從Kafka,分發(fā)到窗口運(yùn)算符并返回到Kafka窗口運(yùn)算器的數(shù)據(jù)發(fā)射預(yù)計(jì)將是“突發(fā)”的,因?yàn)樗鼈兠糠昼姲l(fā)送一次數(shù)據(jù)。實(shí)際上,運(yùn)算符不會(huì)以67mb/s的恒定速率給客戶發(fā)送數(shù)據(jù),而是每分鐘內(nèi)將可用帶 18>從開(kāi)發(fā)到生產(chǎn)上線,如何確定集群規(guī)狀態(tài)訪問(wèn)和檢查點(diǎn)實(shí)際情況中需要計(jì)入從磁盤訪問(wèn)的開(kāi)銷,包括到RocksDB的存儲(chǔ)狀態(tài)和檢查點(diǎn)。要了解磁盤訪問(wèn)成本,請(qǐng)查看窗口運(yùn)算符(windowoperator)如何訪問(wèn)狀態(tài)。Kafka看。Flink正在用1分鐘的滑動(dòng)窗口計(jì)算5分鐘的窗口量。Flink通過(guò)維護(hù)五個(gè)窗口來(lái)實(shí)現(xiàn)滑動(dòng)窗口,每次滑動(dòng)都對(duì)應(yīng)一個(gè)1分鐘的窗口。如前所述,當(dāng)使用窗口實(shí)現(xiàn)即時(shí)聚合時(shí),將為每個(gè)窗口中的每個(gè)鍵(key)維護(hù)40字節(jié)的狀態(tài)。對(duì)于每個(gè)傳入事件,首先需要從磁盤檢索當(dāng)前聚合值(讀取40字節(jié)更新聚合值,然后將新值寫(xiě)回40字節(jié)狀態(tài)x5個(gè)窗口x每臺(tái)計(jì)算機(jī)200000msg/s=40MB/s即需要的每臺(tái)計(jì)算機(jī)的讀或?qū)懘疟P訪問(wèn)權(quán)限。如前所述,磁盤是網(wǎng)絡(luò)相互連接上述考慮是針對(duì)狀態(tài)訪問(wèn)的,當(dāng)新事件到達(dá)窗口運(yùn)算符時(shí),狀態(tài)訪問(wèn)會(huì)持續(xù)進(jìn)行,還需要容錯(cuò)啟用檢查點(diǎn)。如果機(jī)器或其他部分出現(xiàn)故障,需要恢復(fù)窗口內(nèi)容并繼檢查點(diǎn)設(shè)置為每分鐘一個(gè)檢查點(diǎn),每個(gè)檢查點(diǎn)將作業(yè)的整個(gè)狀態(tài)復(fù)制到網(wǎng)絡(luò)連接 40字節(jié)狀態(tài)x5個(gè)窗口x100000000個(gè)keys=20GB20GB÷60=333MB/秒與窗口運(yùn)算類似,檢查點(diǎn)是突發(fā)的,每分鐘一次,它都試圖將數(shù)據(jù)全速發(fā)送到外部存儲(chǔ)器。Checkpointing引發(fā)對(duì)RocksDB的額外狀態(tài)訪問(wèn)(在本案例中,RocksDB位于網(wǎng)絡(luò)連接的磁盤上)。自Flink1.3版本以來(lái),Rock760+760x5+400+2335=10335MB/秒從開(kāi)發(fā)到生產(chǎn)上線,如何確定集群規(guī)劃大小?<2補(bǔ)充一點(diǎn),這些計(jì)算都不包括協(xié)議開(kāi)銷,例如來(lái)自Flink、Kafka或文件系統(tǒng)的TCP、Ethernet和RPC調(diào)用。但這仍然是一個(gè)很好的出發(fā)點(diǎn),可以幫助您了解工基于以上分析,這個(gè)例子,在一個(gè)5節(jié)點(diǎn)集群的典型運(yùn)行中,每臺(tái)機(jī)器都需要保留了大約40%的網(wǎng)絡(luò)容量因?yàn)椴糠直恢饔^對(duì)于40%的凈空是否合適,沒(méi)有一個(gè)一刀切的答案,但是這個(gè)算法應(yīng)該是一個(gè)很好的起點(diǎn)。嘗試上面的計(jì)算,更換機(jī)器數(shù)量、鍵(keys)的數(shù)量或每秒的消息數(shù),Demo:基于FlinkSQL構(gòu)建流式應(yīng)用上周四在Flink中文社區(qū)釘釘群中直播分享了《Demo:基于FlinkSQL構(gòu)建流式應(yīng)用》,直播內(nèi)容偏向?qū)崙?zhàn)演示。這篇文章是對(duì)直播內(nèi)容的一個(gè)總結(jié),并且改善了部分內(nèi)容,比如除Flink外其他組件全部采用DockerCompose安裝流程。讀者也可以結(jié)合視頻和本文一起學(xué)習(xí)。完整分享可以觀看視頻回顧:https://Flink1.10.0于近期剛發(fā)布,釋放了許多令人激動(dòng)的新特性。尤其是FlinkSQL模塊,發(fā)展速度非???,因此本文特意從實(shí)踐的角度出發(fā),帶領(lǐng)大家一起探索使用FlinkSQL如何快速構(gòu)建流式應(yīng)用。本文將基于Kafka,MySQL,Elasticsearch,Kibana,使用FlinkSQL構(gòu)建一個(gè)電商用戶行為的實(shí)時(shí)分析應(yīng)用。本文所有的實(shí)戰(zhàn)演練都將在FlinkSQLCLI上執(zhí)行,全程只涉及SQL純文本,無(wú)需一行Java/Scala代碼,無(wú)需安裝IDE。本實(shí)戰(zhàn)一臺(tái)裝有Docker和Java8的Linux或MacOS計(jì)算機(jī)。使用DockerCompose啟動(dòng)容器本實(shí)戰(zhàn)演示所依賴的組件全都編排到了容器中,因此可以通過(guò)docker-com-DataGen:數(shù)據(jù)生成器。容器啟動(dòng)后會(huì)自動(dòng)開(kāi)始生成用戶行為數(shù)據(jù),并發(fā)送到Kafka集群中。默認(rèn)每秒生成1000條數(shù)據(jù),持續(xù)生成約3小時(shí)。也可以更改 MySQL:集成了MySQL5.7,以及預(yù)先創(chuàng)建好了類目表(category預(yù)先Kafka:主要用作數(shù)據(jù)源。DataGen組件會(huì)自動(dòng)將數(shù)據(jù)灌入這個(gè)容器中。Zookeeper:Kafka容器依賴。Elasticsearch:主要存儲(chǔ)FlinkSQL產(chǎn)出的數(shù)據(jù)。Kibana:可視化Elasticsearch中的數(shù)據(jù)。在啟動(dòng)容器前,建議修改Docker的配置,將資源調(diào)整到4GB以及4核。啟動(dòng)該命令會(huì)以detached模式自動(dòng)啟動(dòng)DockerCompose配置中定義的所有容器。你可以通過(guò)dockerps來(lái)觀察上述的五個(gè)容器是否正常啟動(dòng)了。也可以訪問(wèn)下載安裝Flink本地集群/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz3.通過(guò)如下命令下載依賴jar包,并拷貝到lib/目錄下,也可手動(dòng)下載和拷 6.執(zhí)行bin/sql-client.shembedded啟動(dòng)SQLCLI。便會(huì)看到如下的 使用DDL創(chuàng)建Kafka表Datagen容器在啟動(dòng)后會(huì)往Kafka的user_behaviortopic中持續(xù)不斷地寫(xiě)入數(shù)據(jù)。數(shù)據(jù)包含了2017年11月27日一天的用戶加購(gòu)、喜歡每一行表示一條用戶行為,以JSON的格式由用戶ID、商品ID、商品類目ID、行為類型和時(shí)間組成。該原始數(shù)據(jù)集來(lái)自阿里云天池公開(kāi)數(shù)據(jù)集,特此我們可以在docker-compose.yml所在目錄下運(yùn)行如下命令,查看Kafka集FlinkSQLCLI中執(zhí)行該DDL。如上我們按照數(shù)據(jù)的格式聲明了5個(gè)字段,除此之外,我們還通過(guò)計(jì)算列語(yǔ)法和PROCTIME()內(nèi)置函數(shù)聲明了一個(gè)產(chǎn)生處理時(shí)間的虛擬列。我們還通過(guò)WATERMARK語(yǔ)法,在ts字段上聲明了watermark策略(容忍5秒亂序ts字段因此也成了事件時(shí)間列。關(guān)于時(shí)間屬性以及DDL語(yǔ)法可以閱讀官方文檔了解 時(shí)間屬性:/projects/flink/flink-docs-release-1.10/DDL:/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-在SQLCLI中成功創(chuàng)建Kafka表后,可以通過(guò)showtables;和describeuser_behavior;來(lái)查看目前已注冊(cè)的表,以及表的詳細(xì)信息。我們也可以直接在SQLCLI中運(yùn)行SELECT*FROMuser_behavior;預(yù)覽下數(shù)據(jù)(按q退出)。接下來(lái),我們會(huì)通過(guò)三個(gè)實(shí)戰(zhàn)場(chǎng)景來(lái)更深入地了解FlinkSQL。使用DDL創(chuàng)建Elasticsearch表我們先在SQLCLI中創(chuàng)建一個(gè)ES結(jié)果表,根據(jù)場(chǎng)景需求主要需要保存兩個(gè)數(shù)k提交Query統(tǒng)計(jì)每小時(shí)的成交量就是每小時(shí)共有多少“buy”的用戶行為。因此會(huì)需要用到TUMBLE窗口函數(shù),按照一小時(shí)切窗。然后每個(gè)窗口分別統(tǒng)計(jì)“buy”的個(gè)數(shù),這可這里我們使用HOUR內(nèi)置函數(shù),從一個(gè)TIMESTAMP列中提取出一天中第幾個(gè)小時(shí)的值。使用了INSERTINTO將query的結(jié)果持續(xù)不斷地插入到上文定義的es結(jié)果表中(可以將es結(jié)果表理解成query的物化視圖)。另外可以閱讀該文檔了解更多關(guān)于窗口聚合的內(nèi)容:/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows在FlinkSQLCLI中運(yùn)行上述查詢后,在FlinkWebUI中就能看到提交的任 使用Kibana可視化結(jié)果localhost:5601訪問(wèn)Kibana。首先我們需要先配置一個(gè)indexpattern。點(diǎn)擊左接下來(lái),我們先創(chuàng)建一個(gè)Dashboard用來(lái)展示各個(gè)可視化的視圖。點(diǎn)擊頁(yè)面左擊“CreateNew”創(chuàng)建一個(gè)新的視圖,選擇“Area”面積圖,選擇“buy_cnt_per_hour”索引,按照如下截圖中的配置(左側(cè))畫(huà)出成交量面積圖,并保另一個(gè)有意思的可視化是統(tǒng)計(jì)一天中每一刻的累計(jì)獨(dú)立用戶數(shù)(uv也就是每一刻的uv數(shù)都代表從0點(diǎn)到當(dāng)前時(shí)刻為止的總計(jì)uv數(shù),因此該曲線肯定是單調(diào)遞我們?nèi)匀幌仍赟QLCLI中創(chuàng)建一個(gè)Elasticsearch表,用于存儲(chǔ)結(jié)果匯總數(shù) 為了實(shí)現(xiàn)該曲線,我們可以先通過(guò)OVERWINDOW計(jì)算出每條數(shù)據(jù)的當(dāng)前通過(guò)內(nèi)置的COUNT(DISTINCTuser_id)來(lái)完成,F(xiàn)linkSQL內(nèi)部對(duì)COUNTDISTINCT做了非常多的優(yōu)化,因此可以放心使用。這里我們使用SUBSTR和DATE_FORMAT還有||內(nèi)置函數(shù),將一個(gè)TIME-STAMP字段轉(zhuǎn)換成了10分鐘單位的時(shí)間字符串,如:12:10,12:20。關(guān)于ERWINDOW的更多內(nèi)容可以參考文檔:/projects//flink-docs-release-1.10/dev/table/sql/queries.html#aggregations我們還使用了CREATEVIEW語(yǔ)法將query注冊(cè)成了一個(gè)邏輯視圖,可以方便地在后續(xù)查詢中對(duì)該query進(jìn)行引用,這有利于拆解復(fù)雜query。注意,創(chuàng)建邏輯視圖不會(huì)觸發(fā)作業(yè)的執(zhí)行,視圖的結(jié)果也不會(huì)落地,因此使用起來(lái)非常輕量,沒(méi)有額外開(kāi)銷。由于uv_per_10min每條輸入數(shù)據(jù)都產(chǎn)生一條輸出數(shù)據(jù),因此對(duì)于存儲(chǔ)分鐘只有一個(gè)點(diǎn)會(huì)存儲(chǔ)在Elasticsearch中,對(duì)于Elasticsearch和Kibana可視化最后一個(gè)有意思的可視化是類目排行榜,從而了解哪些類目是支柱類目。不過(guò)由能將其歸約到頂級(jí)類目。所以筆者在mysql容器中預(yù)先準(zhǔn)備了子類目與頂級(jí)類目的在SQLCLI中創(chuàng)建MySQL表,后續(xù)用作維表查詢。 同時(shí)我們?cè)賱?chuàng)建一個(gè)Elasticsearch表,用于存儲(chǔ)類目統(tǒng)計(jì)結(jié)果。第一步我們通過(guò)維表關(guān)聯(lián),補(bǔ)全類目名稱。我們?nèi)匀皇褂肅REATEVIEW將該查詢注冊(cè)成一個(gè)視圖,簡(jiǎn)化邏輯。維表關(guān)聯(lián)使用temp/table/streaming/joins.html#join-with-a-temporal-table最后根據(jù)類目名稱分組,統(tǒng)計(jì)出buy的事件數(shù),并寫(xiě)入Elasticsearch中。提交上述查詢后,在Kibana中創(chuàng)建to到目前為止,我們已經(jīng)完成了三個(gè)實(shí)戰(zhàn)案例及其可視化視圖?,F(xiàn)在可以回到Dashboard頁(yè)面,對(duì)各個(gè)視圖進(jìn)行拖拽編排,讓我們的Dashboard看上去更加正項(xiàng),而用戶行為數(shù)據(jù)中也有很多有意思的信息值得挖掘,感興趣的讀者可以用FlinkSQL對(duì)數(shù)據(jù)進(jìn)行更多維度的分析,并使用Kibana展示更多可視化圖,并觀測(cè)圖形在本文中,我們展示了如何使用FlinkSQL集成Kafka,MySQL,Elastic- search以及Kibana來(lái)快速搭建一個(gè)實(shí)時(shí)分析應(yīng)用。整個(gè)過(guò)程無(wú)需一行Java/Scala代碼,使用SQL純文本即可完成。期望通過(guò)本文,可以讓讀者了解到FlinkSQL的易用和強(qiáng)大,包括輕松連接各種外部系統(tǒng)、對(duì)事件時(shí)間和亂序數(shù)據(jù)處理的原生支持、維表關(guān)聯(lián)、豐富的內(nèi)置函數(shù)等等。希望你能喜歡我們的實(shí)戰(zhàn)演練,并從中獲得樂(lè)趣和Flink會(huì)從最近成功的Checkpoint恢復(fù)。在實(shí)際情況中,我們可能會(huì)遇到Check-1.Checkpoint流程簡(jiǎn)介 Source收到triggercheckpoint的PRC,自己開(kāi)始做snapshot,并往下游Task開(kāi)始同步階段snapshotTask開(kāi)始異步階段snapshotTasksnapshot完成,匯報(bào)給JM2.Checkpoint異常情況排查2.1Checkpoint失敗其中Acknowledged一列表示有多少個(gè)subtask對(duì)這個(gè)Checkpoint進(jìn)行了ack,從圖中我們可以知道第三個(gè)operator總共有5個(gè)subtask,但是只有4個(gè)進(jìn)行了ack;第二列LatestAcknowledgement表示該operator的StateSize表示當(dāng)前Checkpoint的state大小--主要這里如果是增量 Checkpoint失敗大致分為兩種情況:CheckpointDecline和Checkpoint0b60f08bf8984085b59f8d9bc74ce2e1是executionid,85d268e6fbc-當(dāng)前Flink中如果較小的Checkpoint還沒(méi)有對(duì)齊的情況下,收到了更大的這個(gè)日志表示,當(dāng)前Checkpoint19還在對(duì)齊階段,我們收到了Checkpoint在下游task收到被cancelBarrier的時(shí)候,會(huì)打印類似如下的日志:上面三種日志都表示當(dāng)前task接收到上游發(fā)送過(guò)來(lái)的barrierCancel消息,從如果Checkpoint做的非常慢,超過(guò)了timeout還沒(méi)有完成,則整個(gè)Check-point也會(huì)失敗。當(dāng)一個(gè)Checkpoint由于超時(shí)而失敗是表示Chekpoint1由于超時(shí)而失敗,這個(gè)時(shí)候可以可以看這個(gè)日志后面是否有 可以按照2.1.1中的方法找到對(duì)應(yīng)的taskmanager.log查看具體信息。下面的日志如果是DEBUG的話,我們會(huì)在開(kāi)始處標(biāo)記DEBUG我們按照下面的日志把TM端的snapshot分為三個(gè)階段,開(kāi)始做snapshot上面的日志表示當(dāng)前這個(gè)backend的同步階段完成,共使用了上面的日志表示異步階段完成,異步階段使用了369ms2.2Checkpoint慢在2.1節(jié)中,我們介紹了Checkpoint失敗的排查思路,本節(jié)會(huì)分情況介紹Checkpoint經(jīng)常需要做9分鐘(我們希望1分鐘左右就能夠做完而且我們預(yù)期statesize不是非常大。這個(gè)一般發(fā)生較少,但是也有可能,因?yàn)閟ource做snapshot并往下游發(fā)送point,其中全量Checkpoint會(huì)把當(dāng)前的state全部備份一次到持久化存儲(chǔ),而 現(xiàn)在Flink中僅在RocksDBStateBackend中支持增量Checkpoint,如果你已經(jīng)使用RocksDBStateBackend,可以通過(guò)開(kāi)啟增量Checkpoint來(lái)加速,具體我們知道task僅在接受到所有的barrier之后才會(huì)進(jìn)行snapshot,如果作業(yè)存在反壓,或者有數(shù)據(jù)傾斜,則會(huì)導(dǎo)致全部的channel或者某些channel的barrier上圖中我們選擇了一個(gè)task,查看所有subtask的反壓情上圖中我們選擇其中一個(gè)operator,點(diǎn)擊所有的subtask,然后按照RecordsReceived/BytesReceived/TPS從大到小進(jìn)行排序,能看到前面幾個(gè)subtask會(huì)如果存在反壓或者數(shù)據(jù)傾斜的情況,我們需要首先解決反壓或者數(shù)據(jù)傾斜問(wèn)題之從前面我們知道Checkpoint在task端分為barrier對(duì)齊(收齊所有上游發(fā)送如果taskmanager.log中沒(méi)有這個(gè)日志,則表示barrier一直沒(méi)有對(duì)齊,接下來(lái)我們需要了解哪些上游的barrier沒(méi)有發(fā)送下來(lái),如果你使用AtLeastOnce的表示該task收到了channel5來(lái)的barrier,然后看對(duì)應(yīng)Checkpoint,再查看還剩哪些上游的barrier沒(méi)有接受到,對(duì)于ExactlyOnce暫時(shí)沒(méi)有類似的日志,可在task端,所有的處理都是單線程的,數(shù)據(jù)處理和barrier處理都由主線程處理,如果主線程在處理太慢(比如使用RocksDBBackend,state操作慢導(dǎo)致整體 2.使用工具AsyncProfiledump一份火焰圖,查看占用CPU最多的棧;同步階段一般不會(huì)太慢,但是如果我們通過(guò)日志發(fā)現(xiàn)同步階段比較慢的話,對(duì)于非RocksDBBackend我們可以考慮查看是否開(kāi)啟了異步snapshot,如果開(kāi)啟了異步snapshot還是慢,需要看整個(gè)JVM在干嘛,也可以使用前一節(jié)中的工具。對(duì)于RocksDB開(kāi)始snapshot的日志如下:Backend來(lái)說(shuō),主要瓶頸來(lái)自于網(wǎng)絡(luò),這個(gè)階段可以考慮觀察網(wǎng)絡(luò)的metric,或者對(duì)于RocksDB來(lái)說(shuō),則需要從本地讀取文件,寫(xiě)入到遠(yuǎn)程的持久化存儲(chǔ)上,所以不僅需要考慮網(wǎng)絡(luò)的瓶頸,還需要考慮本地磁盤的性能。另外對(duì)于RocksDB-Backend來(lái)說(shuō),如果覺(jué)得網(wǎng)絡(luò)流量不是瓶頸,但是上傳比較慢的話,還可以嘗試考3.總結(jié)在第二部分內(nèi)容中,我們介紹了官方編譯的包的常情況的主要場(chǎng)景,以及相應(yīng)的排查方法,如果排查了上面所有的情況,還是沒(méi)有發(fā)上文提到的一些DEBUG日志,如果flinkdist包是自己編譯的話,則建議將Checkpoint整個(gè)步驟內(nèi)的一些DEBUG改為INFO,能夠通過(guò)日志了解整個(gè)[1]Changethreading-modelinStreamTasktoamailbox-basedapproach[3]RocksDBStateBackend多線程上傳State問(wèn)題。反壓意味著數(shù)據(jù)管道中某個(gè)節(jié)點(diǎn)成為瓶頸,處理速率跟不上上游發(fā)送數(shù)據(jù)的速率,而需要對(duì)上游進(jìn)行限速。由于實(shí)時(shí)計(jì)算應(yīng)用通常使用消息隊(duì)列來(lái)進(jìn)行生產(chǎn)端和消費(fèi)端的解耦,消費(fèi)端數(shù)據(jù)源是pull-based的,所以反壓通常是從某個(gè)節(jié)點(diǎn)傳導(dǎo)至數(shù)關(guān)于Flink的反壓機(jī)制,網(wǎng)上已經(jīng)有不少博客介紹,中文博客推薦這兩篇1。簡(jiǎn)單來(lái)說(shuō),F(xiàn)link拓?fù)渲忻總€(gè)節(jié)點(diǎn)(Task)間的數(shù)據(jù)都以阻塞隊(duì)列的方式傳輸,下游來(lái)不及消費(fèi)導(dǎo)致隊(duì)列被占滿后,上游的生產(chǎn)也會(huì)被阻塞,最終導(dǎo)致數(shù)據(jù)源的攝入被阻塞。而本文將著重結(jié)合官方的博客[4]分享筆者在實(shí)踐中分析和處理Flink反壓反壓并不會(huì)直接影響作業(yè)的可用性,它表明作業(yè)處于亞健康的狀態(tài),有潛在的性能瓶頸并可能導(dǎo)致更大的數(shù)據(jù)處理延遲。通常來(lái)說(shuō),對(duì)于一些對(duì)延遲要求不太高或者數(shù)據(jù)量比較小的應(yīng)用來(lái)說(shuō),反壓的影響可能并不明顯,然而對(duì)于規(guī)模比較大的Flink前者是因?yàn)閏heckpointbarrier是不會(huì)越過(guò)普通數(shù)據(jù)的,數(shù)據(jù)處理后者是因?yàn)闉楸WCEOS(Exactly-Once-Semantics,準(zhǔn)確一次對(duì)于有接受到較快的輸入管道的barrier后,它后面數(shù)據(jù)會(huì)被緩存起來(lái)但不處理,直這兩個(gè)影響對(duì)于生產(chǎn)環(huán)境的作業(yè)來(lái)說(shuō)是十分危險(xiǎn)的,因?yàn)閏heckpoint是大小同樣可能拖慢checkpoint甚至導(dǎo)致OOM(使用Heap-basedStateBackend)或者物理內(nèi)存使用超出容器資源(使用RocksDBStateBackend)的穩(wěn)定性問(wèn)題。因此,我們?cè)谏a(chǎn)中要盡量避免出現(xiàn)反壓的情況(順帶一提,為了緩解反壓給checkpoint造成的壓力,社區(qū)提出了FLIP-76:UnalignedCheckpoints[4]來(lái)解耦要解決反壓首先要做的是定位到造成反壓的節(jié)點(diǎn),這主要有兩種辦法:2.通過(guò)FlinkTaskMetrics。前者比較容易上手,適合簡(jiǎn)單分析,后者則提供了更加豐富的信息,適合用于監(jiān)控系統(tǒng)。因?yàn)榉磯簳?huì)向上游傳導(dǎo),這兩種方式都要求我們從Source節(jié)點(diǎn)到Sink的FlinkWebUI的反壓監(jiān)控提供了SubTask級(jí)別的反壓監(jiān)控,原理是通過(guò)周期性對(duì)Task線程的棧信息采樣,得到線程被阻塞在請(qǐng)求Buff塞)的頻率來(lái)判斷該節(jié)點(diǎn)是否處于反壓狀態(tài)。默認(rèn)配置 OK,0.1至0.5為L(zhǎng)OW,而超過(guò)0.5則為HIGH。1.該節(jié)點(diǎn)的發(fā)送速率跟不上它的產(chǎn)生數(shù)據(jù)速率。這一般會(huì)發(fā)生在一條輸入多條2.下游的節(jié)點(diǎn)接受速率較慢,通過(guò)反壓機(jī)制限制了該節(jié)點(diǎn)的發(fā)如果是第一種狀況,那么該節(jié)點(diǎn)則為反壓的根源節(jié)點(diǎn),它是從SourceTask到值得注意的是,反壓的根源節(jié)點(diǎn)并不一定會(huì)在反壓面板體現(xiàn)出高反壓,因?yàn)榉磯好姘灞O(jiān)控的是發(fā)送端,如果某個(gè)節(jié)點(diǎn)是性能瓶頸并不會(huì)導(dǎo)致它本身出現(xiàn)高反壓,而是導(dǎo)致它的上游出現(xiàn)高反壓??傮w來(lái)看,如果我們找到第一個(gè)出現(xiàn)反壓的節(jié)點(diǎn),那么反那么如果區(qū)分這兩種狀態(tài)呢?很遺憾只通過(guò)反壓面板是無(wú)法直接判斷的,我們還很大,由于要采集所有Task的棧信息,反壓面板的壓力也會(huì)很大甚至不可用。TaskMetricsFlink提供的TaskMetrics是更好的反壓監(jiān)控手段,但也要求更加豐富的背景TaskManager傳輸數(shù)據(jù)時(shí),不同的TaskManager上的兩個(gè)Subtask間通常根據(jù)key的數(shù)量有多個(gè)Channel,這些Channel會(huì)復(fù)用同一個(gè)TaskManager級(jí)別的TCP鏈接,并且共享接收端Subtask級(jí)別的BufferPool。在接收端,每個(gè)Channel在初始階段會(huì)被分配固定數(shù)量的ExclusiveBuffer,這些Buffer會(huì)被用于存儲(chǔ)接受到的數(shù)據(jù),交給Operator使用后再次被釋放。Channel接收端空閑的Buffer數(shù)量稱為Credit,Credit會(huì)被定時(shí)同步給發(fā)送端被后在流量較大時(shí),Channel的ExclusiveBuffer可能會(huì)被寫(xiě)滿,此時(shí)Flink會(huì)向個(gè)Channel需要就去哪里。而在Channel發(fā)送端,一個(gè)Subtask所有的Channel會(huì)共享同一個(gè)BufferPool,這邊就沒(méi)有區(qū)分ExclusiveBuffer和FloatingBuffer。 圖2FlinkCredit-Based網(wǎng)絡(luò)我們?cè)诒O(jiān)控反壓時(shí)會(huì)用到的Metrics主要和Channel接受端的Buffer使用率有關(guān),最為有用的是以下幾個(gè)Metrics:MetrisoutPoolUsage發(fā)送端Buffer的使用率inPoolUsage接收端Buffer的使用率exclusiveBuffersUsage(1.9以上)接收端ExclusiveBuffer的使用率其中inPoolUsage等于floatingBuffersUsage與exclusiveBuffersUsage的反壓傳導(dǎo)至上游。反壓情況可以根據(jù)以下表格進(jìn)行對(duì)號(hào)入outPoolUsage和inPoolUsage同為低或同為高分別表明當(dāng)前Subtask正常或處于被下游反壓,這應(yīng)該沒(méi)有太多疑問(wèn)。而比較有趣的是當(dāng)outPoolUsage和inPoolUsage表現(xiàn)不同時(shí),這可能是出于反壓傳導(dǎo)的中間狀態(tài)或者表明該Subtask如果一個(gè)Subtask的outPoolUsage是高,通常是被下游Task所影響,所以可以排查它本身是反壓根源的可能性。如果一個(gè)Subtask的outPoolUsage是低,但其inPoolUsage是高,則表明它有可能是反壓的根源。因?yàn)橥ǔ7磯簳?huì)傳導(dǎo)至其上游,導(dǎo)致上游某些Subtask的outPoolUsage為高,我們可以根據(jù)這點(diǎn)來(lái)進(jìn)一步判斷。值得注意的是,反壓有時(shí)是短暫的且影響不大,比如來(lái)自某個(gè)Channel的短暫網(wǎng)絡(luò)延遲或者TaskManager的正常GC,這種情況下我們可以不用處理。對(duì)于Flink1.9及以上版本,除了上述的表格,我們還可以根據(jù)floatingBsUsage/exclusiveBuffersUsage以及其上游Task的outPoolUsage來(lái)進(jìn)行進(jìn)一步的分析一個(gè)Subtask和其上游Subtask的數(shù)據(jù)傳輸。 BuffersUsage則表明了反壓是否存在傾斜(floatingBuffersUsage高、exclusive-至此,我們已經(jīng)有比較豐富的手段定位反壓的根源是出現(xiàn)在哪個(gè)節(jié)點(diǎn),但是具體定位到反壓節(jié)點(diǎn)后,分析造成原因的辦法和我們分析一個(gè)普通程序的性能瓶頸的辦法是十分類似的,可能還要更簡(jiǎn)單一點(diǎn),因?yàn)槲覀円^察的主要是TaskThread。在實(shí)踐中,很多情況下的反壓是由于數(shù)據(jù)傾斜造成的,這點(diǎn)我們可以通過(guò)WebUI各個(gè)SubTask的RecordsSent和RecordReceived來(lái)確認(rèn),另外Check-pointdetail里不同SubTask的Statesize也是一個(gè)分析數(shù)據(jù)傾斜的有用指標(biāo)。此外,最常見(jiàn)的問(wèn)題可能是用戶代碼的執(zhí)行效率問(wèn)題(頻繁被阻塞或者性能問(wèn)題)。最有用的辦法就是對(duì)TaskManager進(jìn)行CPUprofile,從中我們可以分析到TaskThread是否跑滿一個(gè)CPU核:如果是的話要分析CPU主要花費(fèi)在哪些函數(shù)里面,比如我們生產(chǎn)環(huán)境中就偶爾遇到卡在Regex的用戶函數(shù)(ReDoS如果不是的話要看TaskThread阻塞在哪里,可能是用戶函數(shù)本身有些同步的調(diào)用,可能是checkpoint或者GC等系統(tǒng)活動(dòng)導(dǎo)致的暫時(shí)系統(tǒng)暫停。當(dāng)然,性能分析的結(jié)果也可能是正常的,只是作業(yè)申請(qǐng)的資源不足而導(dǎo)致了反壓,這就通常要求拓展并行度。值得一提的,在未來(lái)的版本Flink將會(huì)直接在WebUI提供JVM的CPU火焰圖[5],這將大大簡(jiǎn)化性能瓶頸的分析。另外TaskManager的內(nèi)存以及GC問(wèn)題也可能會(huì)導(dǎo)致反壓,包括Task-ManagerJVM各區(qū)內(nèi)存不合理導(dǎo)致的頻繁FullGC甚至失聯(lián)。推薦可以通過(guò)給TaskManager啟用G1垃圾回收器來(lái)優(yōu)化GC,并加上-XX:+PrintGCDetails來(lái)打印GC日志的方式來(lái)觀察GC的問(wèn)題。反壓是Flink應(yīng)用運(yùn)維中常見(jiàn)的問(wèn)題,它不僅意味著性能瓶頸還可能導(dǎo)致作業(yè)的不穩(wěn)定性。定位反壓可以從WebUI的反壓監(jiān)控面板和TaskMetric兩者入手,前者方便簡(jiǎn)單分析,后者適合深入挖掘。定位到反壓節(jié)點(diǎn)后我們可以通過(guò)數(shù)據(jù)分布、CPUProfile和GC指標(biāo)日志等手段來(lái)進(jìn)一步分析反壓背后的具體原因并進(jìn)行針對(duì)性2.一文徹底搞懂Flink網(wǎng)絡(luò)流控與反壓機(jī)制3.Flink輕量級(jí)異步快照ABS實(shí)現(xiàn)原理4.FlinkNetworkStackVol.2:Monitoring,Metrics,andthatBackpressureThing5.SupportforCPUFlameGraphsinnewwebUIFlinkonYARN(上Flink支持Standalone獨(dú)立部署和YARN、Kubernetes、Mesos等集群部署模式,其中YARN集群部署模式在國(guó)內(nèi)的應(yīng)用越來(lái)越廣泛。Flink社區(qū)將推出FlinkonYARN應(yīng)用解讀系列文章,分為上、下兩篇。本文基于FLIP-6重構(gòu)后的資源調(diào)度模型將介紹FlinkonYARN應(yīng)用啟動(dòng)全流程,并進(jìn)行詳細(xì)步驟解析。下篇將根據(jù)社區(qū)大群反饋,解答客戶端和FlinkCluster的常見(jiàn)問(wèn)題,分享相關(guān)問(wèn)題的排查FlinkonYARN流程圖FlinkonYARN集群部署模式涉及YARN和Flink兩大開(kāi)源框架,應(yīng)用啟動(dòng)流程的很多環(huán)節(jié)交織在一起,為了便于大家理解,在一張圖上畫(huà)出了FlinkonYARN基礎(chǔ)架構(gòu)和應(yīng)用啟動(dòng)全流程,并對(duì)關(guān)鍵角色和流程進(jìn)行了介紹說(shuō)明,整個(gè)啟動(dòng)流程又標(biāo)注為橙色)兩個(gè)階段分別闡述,由于分支和細(xì)節(jié)太多,本文會(huì)忽略掉一些,只介紹YARN(上一張圖輕松掌握基礎(chǔ)架構(gòu)與啟動(dòng)流1.執(zhí)行命令:bin/flinkrun-d-myarn-cluster...或bin/yarn-session.sh...來(lái)提交per-job運(yùn)行模式或session運(yùn)行模式的應(yīng)用;2.解析命令參數(shù)項(xiàng)并初始化,啟動(dòng)指定運(yùn)行模式,如果是per-job運(yùn)行模式將如果可以從命令行參數(shù)(-yid)或YARNproperties臨時(shí)文件(${java.io.tmp-dir}/.yarn-properties-${})中獲否則當(dāng)命令行參數(shù)中包含-d(表示detached模式)和-myarn-clus否則當(dāng)命令行參數(shù)項(xiàng)不包含-yq(表示查詢YARN集群可用資源)時(shí),啟動(dòng)session運(yùn)行模式;3.獲取YARN集群信息、新應(yīng)用ID并啟動(dòng)運(yùn)行前檢查;通過(guò)YarnClient向YARNResourceManager(下文縮寫(xiě)為:YARNRM,YARNMaster節(jié)點(diǎn),負(fù)責(zé)整個(gè)集群資源的管理和調(diào)度)請(qǐng)求創(chuàng)建一個(gè)新應(yīng)用(YARNRM收到創(chuàng)建應(yīng)用請(qǐng)求后生成新應(yīng)用ID和container申請(qǐng)的 資源上限后返回并且獲取YARNSlave節(jié)點(diǎn)報(bào)告(YARNRM返回全部slave節(jié)點(diǎn)的ID、狀態(tài)、rack、httpflinkJobManager/TaskManagervcores資源申請(qǐng)需求;(3)指定queue是否存在(不存在也只是打印WARN信息,后續(xù)向YARN提交時(shí)排除異常并退出);(4)當(dāng)預(yù)期應(yīng)用申請(qǐng)的Container資源會(huì)超出YARN資源限制時(shí)拋出異常并退出;(5)當(dāng)預(yù)期應(yīng)用申請(qǐng)不能被滿足時(shí)(例如總資源超出YARN集群可4.將應(yīng)用配置(flink-conf.yaml、logback.xml、perties)和相關(guān)文件(flinkjars、shipfiles、userjars、jobgraph等)上傳至分布如HDFS)的應(yīng)用暫存目錄(/user/${}/.flink/);5.準(zhǔn)備應(yīng)用提交上下文(ApplicationSubmissionContext,包括應(yīng)用的名classpath、資源大小等),注冊(cè)處理部署失敗的shutdownhook(清理應(yīng)用對(duì)應(yīng)的HDFS目錄然后通過(guò)YarnClient向YARNRM提交應(yīng)用;6.循環(huán)等待直到應(yīng)用狀態(tài)為RUNNING,包含兩個(gè)階段:循環(huán)等待應(yīng)用提交成功(SUBMITTED默認(rèn)每隔200ms通過(guò)YarnClient獲取應(yīng)用報(bào)告,如果應(yīng)用狀態(tài)不是NEW和NEW_SAVING則認(rèn)為提交成功并退出循環(huán),每循環(huán)10次會(huì)將當(dāng)前的應(yīng)用狀態(tài)輸出至日志:"Application循環(huán)等待應(yīng)用正常運(yùn)行(RUNNING每隔250ms通過(guò)YarnClient獲取應(yīng)hasbeendeployedsuccessfully."并退出循環(huán),如果等到的是非預(yù)期狀態(tài)如FAILED/FINISHED/KILLED,就會(huì)在輸出YARN返回的診斷信息("TheYARNapplicationunexpectedlyswitchedtostateduringdeployment.DiagnosticsfromYARN:...")之后拋出異常并退出。FlinkCluster啟動(dòng)流程1.YARNRM中的ClientRMService(為普通用戶提供的RPC服務(wù)組件,處理來(lái)自客戶端的各種RPC請(qǐng)求,比如查詢YARN集群信息,提交、終止應(yīng)用等)接收到應(yīng)用提交請(qǐng)求,簡(jiǎn)單校驗(yàn)后將請(qǐng)求轉(zhuǎn)交給RMAppMan-ager(YARNRM內(nèi)部管理應(yīng)用生命周期的組件2.RMAppManager根據(jù)應(yīng)用提交上下文內(nèi)容創(chuàng)建初始狀態(tài)為NEW的應(yīng)用,將應(yīng)用狀態(tài)持久化到RM狀態(tài)存儲(chǔ)服務(wù)(例如ZooKeeper集群,RM狀態(tài)存儲(chǔ)服務(wù)用來(lái)保證RM重啟、HA切換或發(fā)生故障后集群應(yīng)用能夠正常恢復(fù),后續(xù)流程中的涉及狀態(tài)存儲(chǔ)時(shí)不再贅述應(yīng)用狀態(tài)變?yōu)镹EW_SAVING;3.應(yīng)用狀態(tài)存儲(chǔ)完成后,應(yīng)用狀態(tài)變?yōu)镾UBMITTED;RMAppManager開(kāi)始向ResourceScheduler(YARNRM可拔插資源調(diào)度器,YARN自帶三種調(diào)度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最廣泛,F(xiàn)ifoScheduler功能最簡(jiǎn)單基本不可用,今年社區(qū)已明確不再繼續(xù)支持FairScheduler,建議已有用戶不是葉子隊(duì)列、隊(duì)列已停用、超出隊(duì)列最大應(yīng)用數(shù)限制等)則拋出拒絕該應(yīng)用,應(yīng)用狀態(tài)先變?yōu)镕INAL_SAVING觸發(fā)應(yīng)用狀態(tài)存儲(chǔ)流程并在完成后變?yōu)镕AILED;如果提交成功,應(yīng)用狀態(tài)變?yōu)锳CCEPTED;4.開(kāi)始創(chuàng)建應(yīng)用運(yùn)行實(shí)例(ApplicationAttempt,由于一次運(yùn)行實(shí)例中最重要的組件是ApplicationMaster,下文簡(jiǎn)稱AM,它的狀態(tài)代表了Applica-5.初始化應(yīng)用運(yùn)行實(shí)例信息,并向ApplicationMasterService(AM&RM協(xié) 議接口服務(wù),處理來(lái)自AM的請(qǐng)求,主要包括注冊(cè)和心跳)注冊(cè),應(yīng)用實(shí)例狀態(tài)變?yōu)镾UBMITTED;6.RMAppManager維護(hù)的應(yīng)用實(shí)例開(kāi)始初始化AM資源申請(qǐng)信息并重新校驗(yàn)隊(duì)列,然后向ResourceScheduler申請(qǐng)AMContainer(Container是YARN中資源的抽象,包含了內(nèi)存、CPU等多維度資源應(yīng)用實(shí)例狀態(tài)變?yōu)锳CCEPTED;7.ResourceScheduler會(huì)根據(jù)優(yōu)先級(jí)(隊(duì)列/應(yīng)用/請(qǐng)求每個(gè)維度都有優(yōu)先級(jí)配置)從根隊(duì)列開(kāi)始層層遞進(jìn),先后選擇當(dāng)前優(yōu)先級(jí)最高的子隊(duì)列、應(yīng)用直至具體某個(gè)請(qǐng)求,然后結(jié)合集群資源分布等情況作出分配決策,AMContainer分配成功后,應(yīng)用實(shí)例狀態(tài)變?yōu)锳LLOCATED_SAVING,并觸發(fā)應(yīng)用實(shí)例狀態(tài)存儲(chǔ)流程,存儲(chǔ)成功后應(yīng)用實(shí)例狀態(tài)變?yōu)锳LLOCATED;8.RMAppManager維護(hù)的應(yīng)用實(shí)例開(kāi)始通知ApplicationMasterLauncher(AM生命周期管理服務(wù),負(fù)責(zé)啟動(dòng)或清理AMcontainer)啟動(dòng)AMcon-tainer,ApplicationMasterLauncher與YARN稱YARNNM,與YARNRM保持通信,負(fù)責(zé)管理單個(gè)節(jié)點(diǎn)上的全部資源、Container生命周期、附屬服務(wù)等,監(jiān)控節(jié)點(diǎn)健康狀況和Container資源使9.ContainerManager(YARNNM核心組件,管理所有Container的生命周期)接收到AMcontainer啟動(dòng)請(qǐng)求,YARNNM開(kāi)始校驗(yàn)ContainerToken及資源文件,創(chuàng)建應(yīng)用實(shí)例和Container實(shí)例并存儲(chǔ)至本地,結(jié)果返回后應(yīng)用實(shí)例狀態(tài)變?yōu)長(zhǎng)AUNCHED;10.ResourceLocalizationService(資源本地化服務(wù),負(fù)責(zé)Container所需資源的本地化。它能夠按照描述從HDFS上下載Container所需的文件資源,并盡量將它們分?jǐn)偟礁鱾€(gè)磁盤上以防止出現(xiàn)訪問(wèn)熱點(diǎn))初始化各種服務(wù)組件、創(chuàng)建工作目錄、從HDFS下載運(yùn)行所需的各種資源至Container工作目錄(路徑為:${yarn.nodemanager.local-dirs}/usercache/${user}/11.ContainersLauncher(負(fù)責(zé)container的具體操作,包括啟動(dòng)、重啟、恢復(fù)和清理等)將待運(yùn)行Container所需的環(huán)境變量和運(yùn)行命令寫(xiě)到Container工作目錄下的launch_container.sh腳本中,然后運(yùn)行該腳本12.Container進(jìn)程加載并運(yùn)行ClusterEntrypoint(FlinkJobManager入口類,每種集群部署模式和應(yīng)用運(yùn)行模式都有相應(yīng)的實(shí)現(xiàn),例如在YARN集群部署模式下,per-job應(yīng)用運(yùn)行模式實(shí)現(xiàn)類是YarnJobClusterEntrypoint,session應(yīng)用運(yùn)行模式實(shí)現(xiàn)類是YarnSessionClusterEntrypoint),首先初輸出各軟件版本及運(yùn)行環(huán)境信息、命令行參數(shù)項(xiàng)、classpath等信息;注冊(cè)處理各種SIGNAL的handler:記錄到日志注冊(cè)JVM關(guān)閉保障的shutdownhook:避免JVM退出時(shí)被其他shutdown打印YARN運(yùn)行環(huán)境信息:用戶名初始化文件系統(tǒng)創(chuàng)建并啟動(dòng)各類內(nèi)部服務(wù)(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)將RPCaddress和port更新到flinkconf配置13.啟動(dòng)ResourceManager(Flink資源管理核心組件,包含YarnRe-sourceManager和SlotManager兩個(gè)子組件,YarnResourceManagerTaskManager資源,注銷應(yīng)用等;SlotManager則負(fù)責(zé)內(nèi)部資源管理,維護(hù)全部Slot信息和狀態(tài))及相關(guān)服務(wù),創(chuàng)建異步AMRMClient,開(kāi)始注冊(cè)AM,注冊(cè)成功后每隔一段時(shí)間(心跳間隔配置項(xiàng):${er-val},默認(rèn)5s)向YARNRM發(fā)送心跳來(lái)發(fā)送資源更新請(qǐng)求和接受資源變更結(jié)果。YARNRM內(nèi)部該應(yīng)用和應(yīng)用運(yùn)行實(shí)例的狀態(tài)都變?yōu)镽UNNING, 并通知AMLivelinessMonitor服務(wù)監(jiān)控AM是否存活狀態(tài),當(dāng)心跳超過(guò)一14.啟動(dòng)Dispatcher(負(fù)責(zé)接收用戶提供的作業(yè),并且負(fù)責(zé)為這個(gè)新提交的作業(yè)拉起一個(gè)新的JobManager)及相關(guān)服務(wù)(包括RESTendpoint等JobGraph文件;在session運(yùn)行模式下,Dispatcher將在接收客戶端提15.根據(jù)JobGraph啟動(dòng)JobManager(負(fù)責(zé)作業(yè)調(diào)度、管理Job和Task的生命周期構(gòu)建ExecutionGraph(JobGraph的并行化版本,調(diào)度層最16.JobManager開(kāi)始執(zhí)行ExecutionGraph,向ResourceManager申請(qǐng)17.ResourceManager將資源請(qǐng)求加入等待請(qǐng)求隊(duì)列,并通過(guò)心跳向YARNRM申請(qǐng)新的Container資源來(lái)啟動(dòng)TaskManager進(jìn)程;后續(xù)流程如果有空閑Slot資源,SlotManager將其分配給等待請(qǐng)求隊(duì)列中匹配的請(qǐng)求,不用再通過(guò)18.YarnResourceManager申請(qǐng)新的TaskManager;18.YARNApplicationMasterService接收到資源請(qǐng)求后,解析出新的資源19.YARNResourceScheduler成功為該應(yīng)用分配資源后更新應(yīng)用信息,ApplicationMasterService接收到FlinkJobManager的下一次心跳時(shí)返20.FlinkResourceManager接收到新分配的Container資源后,準(zhǔn)備好TaskManager啟動(dòng)上下文(ContainerLauncherContext,生成Task-Manager配置并上傳至分布式存儲(chǔ),配置其他依賴和環(huán)境變量等然后向YARNNM申請(qǐng)啟動(dòng)TaskManager進(jìn)程,YARNNM啟動(dòng)Container的21.TaskManager進(jìn)程加載并運(yùn)行YarnTaskExecutorRunner(FlinkTaskManager入口類初始化流程完成后啟動(dòng)TaskExecutor(負(fù)責(zé)執(zhí)行Task相關(guān)操作22.TaskExecutor啟動(dòng)后先向ResourceManager注冊(cè),成功后再向SlotManager匯報(bào)自己的Slot資源與狀態(tài);SlotManager接收到Slot空閑資源后主動(dòng)觸發(fā)Slot分配,從等待請(qǐng)TaskManager請(qǐng)求該Slot資源23.TaskManager收到請(qǐng)求后檢查該Slot是否可分配(不存在則返回異常信24.JobManager檢查Slot分配是否重復(fù),通過(guò)后通知Execution執(zhí)行部署task流程,向TaskExecutor提交task;TaskExecutor啟動(dòng)新的線程運(yùn)行Task。FlinkRelease-1.9SourceCodeFlinkRelease-1.9Documents[FLIP-6-FlinkDeploymentandProcessModel-Standalone,Yarn,Mesos,Kubernetes,etc.]YARN3.2SourceCodeYARN3.2.0DocumentsFlinkonYARN(下常見(jiàn)問(wèn)題與排查思路Flink支持Standalone獨(dú)立部署和YARN、Kubernetes、Mesos等集群部署模式,其中YARN集群部署模式在國(guó)內(nèi)的應(yīng)用越來(lái)越廣泛。Flink社區(qū)將推出FlinkonYARN應(yīng)用解讀系列文章,分為上、下兩篇。上篇分享了基于FLIP-6重構(gòu)后的資源調(diào)度模型介紹FlinkonYARN應(yīng)用啟動(dòng)全流程,本文將根據(jù)社區(qū)大群反饋,解答客戶端和FlinkCluster的常見(jiàn)問(wèn)題,分享相關(guān)問(wèn)題的排查思路。這個(gè)問(wèn)題的迷惑性較大,很多時(shí)候并非指定運(yùn)行的JAR文件問(wèn)題,而是提交過(guò)程中發(fā)生了異常,需要根據(jù)日志信息進(jìn)一步排查。最常見(jiàn)原因是未將依賴的Hadoop JAR文件加到CLASSPATH,找不到依賴類(例如:ClassNotFoundException:org.apache.hadoop.yarn.exceptions.YarnException)導(dǎo)致加載客戶端入口類(FlinkYarnSessionCli)失敗。FlinkonYARN客戶端通常需配置HADOOP_CONF_DIR和HADOOP_CLASSPATH兩個(gè)環(huán)境變量來(lái)讓客戶端能加載到Hadoop配置和依賴JAR文件。示例(已有環(huán)境變量HADOOP_HOME指定Hadoop部署目錄flink-${USER}-client-.log,使用log4j配置:${FLINK_HOME}/conf/log4j-cli.有的客戶端環(huán)境比較復(fù)雜,難以定位日志位置和配置時(shí),可以通過(guò)以下環(huán)境變量配置打開(kāi)log4j的DEBUG日志,跟蹤log4j的初始化和JVM_ARGS=”-Dlog4j.debug=true”為DEBUG后重新運(yùn)行,看是否有DEBUG日志可以幫助排查問(wèn)題。對(duì)于一些沒(méi)有日志或日志信息不完整的問(wèn)題,可能需要開(kāi)展代碼級(jí)調(diào)試,修改源碼重新打包替換的方式太過(guò)繁瑣,推薦使用Java字節(jié)碼注入工具By(1)編寫(xiě)調(diào)試腳本,例如打印Flink實(shí)際使用的Client類,以下腳本表示在CliFrontend#getActiveCust(2)設(shè)置環(huán)境變量,使用bytemanjavaagent: (3)運(yùn)行測(cè)試命令bin/flinkrun-myarn-cluster-p1./examples/stream-ing/WordCount.jar,控制臺(tái)將輸出內(nèi)容:FlinkCluster常見(jiàn)問(wèn)題與排查思路用戶應(yīng)用和框架JAR包版本沖突問(wèn)題該問(wèn)題通常會(huì)拋出NoSuchMethodError/ClassNotFoundException/Incom-patibleClassChangeError等異常,要解決此類問(wèn)題:1.首先需要根據(jù)異常類定位依賴庫(kù),然后可以在項(xiàng)目中執(zhí)行mvndepen-dency:tree以樹(shù)形結(jié)構(gòu)展示全部依賴鏈,再?gòu)闹卸ㄎ粵_突的依賴庫(kù),也可以增加參數(shù)-Dincludes指定要顯示的包,格式為[groupId]:[artifactId]:[--Dincludes=power,javaassist;2.定位沖突包后就要考慮如何排包,簡(jiǎn)單的方案是用exclusion來(lái)排除掉其從他依賴項(xiàng)目中傳遞過(guò)來(lái)的依賴,不過(guò)有的應(yīng)用場(chǎng)景需要多版本共存,不同組件依賴不同版本,就要考慮用MavenShade插件來(lái)解決,詳情請(qǐng)參考MavenShadePlugin。依賴庫(kù)有多版本JAR包共存時(shí)如何確定某類的具體來(lái)源?很多應(yīng)用運(yùn)行CLASSPATH中存在相同依賴庫(kù)的多個(gè)版本JAR包,導(dǎo)致實(shí)際使用的版本跟加載順序有關(guān),排查問(wèn)題時(shí)經(jīng)常需要確定某個(gè)類的來(lái)源JAR,F(xiàn)link支持給JM/TM進(jìn)程配置JVM參數(shù),因此可以通過(guò)下面三個(gè)配置項(xiàng)來(lái)打印加載類及其Flink應(yīng)用運(yùn)行中的JM/TM日志可以在WebUI上查看,但是查問(wèn)題時(shí)通常需要結(jié)合完整日志來(lái)分析排查,因此就需要了解YARN的日志保存機(jī)制,YARN上1.如果應(yīng)用還沒(méi)有結(jié)束,Container日志會(huì)一直保留在其運(yùn)行所在的節(jié)點(diǎn)上,即使Container已經(jīng)運(yùn)行完成仍然可以在所在節(jié)點(diǎn)的配置目錄下找到:2.如果應(yīng)用已結(jié)束并且集群?jiǎn)⒂昧巳罩臼占?yarn.log-aggregation-en-able=true),則通常應(yīng)用結(jié)束后(也有配置可以增量上傳)NM會(huì)將其全部日志上傳至分布式存儲(chǔ)(通常是HDFS)并刪除本地文件,我們可以通志,還可以增加參數(shù)項(xiàng)-containerId-nodeAddress來(lái)查看某containerremote-app-log-dir}/${user}/${yarn.nodemanager.remote-app-log-dir-suffix}/Flink應(yīng)用資源分配問(wèn)題排查思路如果Flink應(yīng)用不能正常啟動(dòng)達(dá)到RUNNING狀處于NEW__SAVING狀態(tài)時(shí)正在進(jìn)行應(yīng)用信息持久化,如果持續(xù)處于這個(gè)如果處于SUBMITTED狀態(tài),可能是RM內(nèi)部發(fā)生一些hold讀寫(xiě)鎖的耗時(shí)操作導(dǎo)致事件堆積,需要根據(jù)YARN集群日志 如果處于ACCEPTED狀態(tài),需要先檢查AM是否正常,跳轉(zhuǎn)到步驟2;如果已經(jīng)是RUNNING狀態(tài),但是資源沒(méi)有全部拿到導(dǎo)致JOB無(wú)法正常Queue’sAMresourcelimitexceeded.原因是達(dá)到了隊(duì)列AM可用資源上限,即隊(duì)列的AM已使用資源和AM新申請(qǐng)資源之和超出了隊(duì)列的AM資源上限,可以適當(dāng)調(diào)整隊(duì)列AM可用資源百分比的配置項(xiàng):yarn.scheduler.capacity..maximum-am-resource-percent。User’sAMresourcelimitexceeded.原因是達(dá)到了應(yīng)用所屬用戶在該隊(duì)新申請(qǐng)資源之和超出了應(yīng)用所屬用戶在該隊(duì)列的AM資源上限,可以適當(dāng)提高用戶可用AM資源比例來(lái)解決該問(wèn)題,相關(guān)配置項(xiàng):yarn.scheduler.capacity..user-limit-factor與yarn.scheduler.capacity..minimum-us-er-limit-percent。AMcontainerislaunched,wRM.大致原因是AM已啟動(dòng),但內(nèi)部初始化未完成,可能有ApplicationisActivated,waitingforresourcestobeassignedforAM.3.確認(rèn)應(yīng)用確實(shí)有YARN未能滿足的資源請(qǐng)求:從應(yīng)用列表頁(yè)點(diǎn)擊問(wèn)題應(yīng)用ID進(jìn)入應(yīng)用頁(yè)面,再點(diǎn)擊下方列表的應(yīng)用實(shí)例ID進(jìn)入應(yīng)用實(shí)例頁(yè)面,看TotalOutstandingResourceRequests列表中是否有Pending資源,如果沒(méi)有,說(shuō)明YARN已分配完畢,退出該檢查流程,轉(zhuǎn)去檢查AM;如果4.調(diào)度器分配問(wèn)題排查,YARN-9050支持在WebUI上或通過(guò)RESTAPI自檢查集群或queue資源,scheduler頁(yè)面樹(shù)狀圖葉子隊(duì)列展開(kāi)查看資源信息:EffectiveMaxResource、UsedResources1)檢查集群資源或所在隊(duì)列資源或其父隊(duì)列資源是否已用完2)檢查葉子隊(duì)列某維度資源是否檢查是否存在資源碎片1)檢查集群Used資源和Reserved資源之和占總資源的比例,當(dāng)集群資源接近用滿時(shí)(例如90%以上可能存片的情況,應(yīng)用的分配速度就會(huì)受影響變慢,因?yàn)榇蟛糠謾C(jī)器都沒(méi)有資源了,機(jī)器可用資源不足會(huì)被reserve,reserved資源達(dá)到一定規(guī)模后可能導(dǎo)致大部分機(jī)器資源被鎖定,后續(xù)分配可能就會(huì)變慢2)檢查NM
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- GB/T 45830-2025聲學(xué)開(kāi)放式辦公空間的聲學(xué)質(zhì)量
- GB/T 45906.6-2025變電站二次系統(tǒng)第6部分:站內(nèi)監(jiān)控系統(tǒng)
- 母親之軀試題及答案
- 機(jī)械制造基礎(chǔ)部分課后習(xí)題答案
- 支氣管擴(kuò)張癥試題及答案
- 信豐縣輔警考試公安基礎(chǔ)知識(shí)考試真題庫(kù)及參考答案
- 加氫工藝?;纷鳂I(yè)證理論試題及答案
- 醫(yī)院管理知識(shí)試題附答案
- 醫(yī)院污水(醫(yī)療廢水)處理培訓(xùn)試題及答案
- 物流環(huán)節(jié)模擬題庫(kù)及答案
- 2026年遼寧省盤錦市高職單招語(yǔ)文真題及參考答案
- 農(nóng)投集團(tuán)安全生產(chǎn)制度
- 近五年貴州中考物理真題及答案2025
- 2026年南通科技職業(yè)學(xué)院高職單招職業(yè)適應(yīng)性測(cè)試備考試題含答案解析
- 2025年黑龍江省大慶市中考數(shù)學(xué)試卷
- 2025年廣西職業(yè)師范學(xué)院招聘真題
- 山東煙草2026年招聘(197人)考試備考試題及答案解析
- 中遠(yuǎn)海運(yùn)集團(tuán)筆試題目2026
- 扦插育苗技術(shù)培訓(xùn)課件
- 妝造店化妝品管理制度規(guī)范
- 婦產(chǎn)科臨床技能:新生兒神經(jīng)行為評(píng)估課件
評(píng)論
0/150
提交評(píng)論