版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
SparkStreaming概述1SparkStreaming基本概念目錄流計(jì)算簡(jiǎn)介2SparkStreaming工作原理3SparkStreaming運(yùn)行機(jī)制4數(shù)據(jù)從時(shí)間上可分為靜態(tài)數(shù)據(jù)和動(dòng)態(tài)數(shù)據(jù),動(dòng)態(tài)數(shù)據(jù)又被稱為流數(shù)據(jù)。靜態(tài)數(shù)據(jù)是一段較長(zhǎng)的時(shí)間內(nèi)相對(duì)穩(wěn)定的數(shù)據(jù)。如:管理信息系統(tǒng)中的數(shù)據(jù)。流數(shù)據(jù)是一組順序、大量、快速、連續(xù)到達(dá)的數(shù)據(jù)序列,可被視為一個(gè)隨時(shí)間延續(xù)而無(wú)限增長(zhǎng)的動(dòng)態(tài)數(shù)據(jù)集合。如:電商網(wǎng)站用戶點(diǎn)擊流數(shù)據(jù)。流數(shù)據(jù)1.靜態(tài)數(shù)據(jù)與流數(shù)據(jù)數(shù)據(jù)快速持續(xù)到達(dá)后臺(tái)、處理平臺(tái),也許無(wú)窮無(wú)盡。數(shù)據(jù)來(lái)源眾多,格式復(fù)雜。數(shù)據(jù)到達(dá)次序獨(dú)立不受應(yīng)用系統(tǒng)所控制。注重?cái)?shù)據(jù)的整體價(jià)值,不過(guò)分關(guān)注個(gè)別數(shù)據(jù)。數(shù)據(jù)量大但不十分關(guān)注存儲(chǔ)。流數(shù)據(jù)2.流數(shù)據(jù)特征批處理是對(duì)靜態(tài)數(shù)據(jù)進(jìn)行處理。對(duì)數(shù)據(jù)緩存,主動(dòng)發(fā)起,批量計(jì)算,允許時(shí)延。流計(jì)算是對(duì)流數(shù)據(jù)進(jìn)行處理。數(shù)據(jù)持續(xù)到達(dá),事件觸發(fā),實(shí)時(shí)處理,快速響應(yīng)。流計(jì)算1.批處理與流計(jì)算流計(jì)算2.流計(jì)算與批處理的比較對(duì)比項(xiàng)流計(jì)算批處理數(shù)據(jù)類型在線實(shí)時(shí)數(shù)據(jù)離線靜態(tài)數(shù)據(jù)數(shù)據(jù)范圍對(duì)滾動(dòng)時(shí)間窗口內(nèi)的數(shù)據(jù)或僅對(duì)最近的數(shù)據(jù)記錄進(jìn)行查詢或處理對(duì)數(shù)據(jù)集中的所有或大部分?jǐn)?shù)據(jù)進(jìn)行查詢或處理數(shù)據(jù)大小單條記錄或包含幾條記錄的微批量數(shù)據(jù)大批量數(shù)據(jù)時(shí)間要求非常快,秒級(jí)或毫秒級(jí)延遲分鐘至小時(shí)級(jí)的延遲結(jié)果分析簡(jiǎn)單的響應(yīng)函數(shù)、聚合和滾動(dòng)指標(biāo)復(fù)雜的分析高性能。處理大數(shù)據(jù)的基本要求,如每秒處理幾十萬(wàn)條數(shù)據(jù)。海量式。支持TB級(jí)甚至PB級(jí)的數(shù)據(jù)規(guī)模。實(shí)時(shí)性。保證較低的延遲時(shí)間,達(dá)到秒級(jí)甚至毫秒級(jí)。分布式。支持大數(shù)據(jù)的基本架構(gòu),必須能夠平滑擴(kuò)展。易用性。能夠快速地進(jìn)行開(kāi)發(fā)和部署??煽啃浴D芸煽康靥幚砹鲾?shù)據(jù)。流計(jì)算2.流計(jì)算系統(tǒng)基本要求ApacheStorm是由Twitter開(kāi)源的、免費(fèi)的、分布式、實(shí)時(shí)計(jì)算框架。Storm可簡(jiǎn)單、高效、可靠地處理大量的流數(shù)據(jù),Storm相對(duì)Flink和Spark而言延遲最低,一般為幾毫秒到幾十毫秒,但數(shù)據(jù)吞吐量較低,建設(shè)成本高。開(kāi)源流計(jì)算框架當(dāng)今主流的開(kāi)源流計(jì)算框架有Storm、Flink、SparkStreaming等。1.
StormFlink是由Apache軟件基金會(huì)開(kāi)發(fā)的開(kāi)源流處理框架,F(xiàn)link將所有的任務(wù)當(dāng)作流進(jìn)行處理,將批數(shù)據(jù)視為流數(shù)據(jù)的一種特例。計(jì)算任務(wù)作為流處理看待時(shí)輸入數(shù)據(jù)流是無(wú)界的,計(jì)算任務(wù)作為批處理看待時(shí)輸入數(shù)據(jù)流被定義為有界的。Flink延遲較低,一般在幾十到幾百毫秒,數(shù)據(jù)吞吐量非常高,且能夠保證消息傳輸不丟失不重復(fù),建設(shè)成本低。目前Flink在流計(jì)算方面擁有一定的優(yōu)勢(shì),國(guó)內(nèi)互聯(lián)網(wǎng)廠商主要使用Flink作為流計(jì)算工具。開(kāi)源的流計(jì)算框架2.
FlinkSpark通過(guò)SparkStreaming或SparkStructuredStreaming支持流計(jì)算。Spark的流計(jì)算其本質(zhì)仍然是批處理,屬于微批處理,將流數(shù)據(jù)按照時(shí)間分割成一個(gè)個(gè)小批次進(jìn)行處理。SparkStreaming延遲一般在1秒左右,但SparkStructuredStreaming開(kāi)始支持持續(xù)處理流計(jì)算模式,在數(shù)據(jù)到達(dá)時(shí)立即進(jìn)行計(jì)算,將延遲降低到毫秒級(jí),不過(guò)目前尚處于測(cè)試階段。Spark吞吐量與Flink相當(dāng),Spark主要優(yōu)勢(shì)是有強(qiáng)大的批處理功能和圖計(jì)算功能,便于企業(yè)統(tǒng)一部署大數(shù)據(jù)生態(tài)環(huán)境。同時(shí)Spark擁有比Flink更為活躍的社區(qū),Spark流計(jì)算功能一直在不斷地完善和發(fā)展。開(kāi)源的流計(jì)算框架3.
SparkStreaming1SparkStreaming基本概念目錄流計(jì)算簡(jiǎn)介2SparkStreaming工作原理3SparkStreaming運(yùn)行機(jī)制4SparkStreaming是SparkCoreAPI的一個(gè)擴(kuò)展,可以實(shí)現(xiàn)高吞吐量的、具備容錯(cuò)機(jī)制的實(shí)時(shí)流式數(shù)據(jù)的處理。SparkStreaming從數(shù)據(jù)源獲取數(shù)據(jù)進(jìn)行編程處理,并將處理結(jié)果推送到文件系統(tǒng)、數(shù)據(jù)庫(kù)或儀表盤中。SparkStreaming實(shí)時(shí)數(shù)據(jù)流的處理如下圖。SparkStreaming基本概念1.SparkStreaming是什么2.SparkStreaming常用術(shù)語(yǔ)離散流(DiscretizedStream)或DStream:SparkStreaming對(duì)內(nèi)部持續(xù)的實(shí)時(shí)數(shù)據(jù)流的抽象描述,即處理的一個(gè)實(shí)時(shí)數(shù)據(jù)流,在SparkStreaming中對(duì)應(yīng)于一個(gè)DStream實(shí)例。時(shí)間片或批處理時(shí)間間隔(BatchInterval):拆分流數(shù)據(jù)的時(shí)間單元,一般為500毫秒或1秒。批數(shù)據(jù)(BatchData):一個(gè)時(shí)間片內(nèi)所包含的流數(shù)據(jù),表示成一個(gè)RDD。輸入數(shù)據(jù)流(inputDStream):一個(gè)inputDStream是一個(gè)特殊的DStream,將sparkstreaming連接到一個(gè)外部數(shù)據(jù)源來(lái)讀取數(shù)據(jù)。接收器(Receiver):長(zhǎng)時(shí)間(可能7*24小時(shí))運(yùn)行在Excutor之上,每個(gè)Receiver負(fù)責(zé)一個(gè)inuptDStream(比如讀取一個(gè)kafka消息的輸入流)。每個(gè)Receiver,加上inputDStream會(huì)占用一個(gè)core/slot。SparkStreaming基本概念3.SparkStreaming主要優(yōu)點(diǎn)使用簡(jiǎn)單
SparkStreaming支持Java、Scala和Python語(yǔ)言,提供了很多高級(jí)操作算子,可以像編寫批處理作業(yè)程序一樣編寫流式作業(yè)程序。與SparkCore、SparkSQL完美融合
SparkStreaming建立在SparkCore之上,可以在SparkStreaming中使用與RDD相同的代碼進(jìn)行批處理,構(gòu)建強(qiáng)大的交互應(yīng)用程序,而不僅僅適用于數(shù)據(jù)分析。SparkStreaming基本概念1SparkStreaming基本概念目錄流計(jì)算簡(jiǎn)介2SparkStreaming工作原理3SparkStreaming運(yùn)行機(jī)制4SparkStreaming工作原理1.工作原理SparkEngine從輸入數(shù)據(jù)流接收數(shù)據(jù),按照時(shí)間片(如1秒)將數(shù)據(jù)拆分成一個(gè)個(gè)批次數(shù)據(jù),然后將每個(gè)批次數(shù)據(jù)交給Spark計(jì)算引擎(SparkEngine)以類似批處理的方式進(jìn)行處理,最終得到結(jié)果數(shù)據(jù)流。輸入數(shù)據(jù)流拆分成批次結(jié)果數(shù)據(jù)流由上可知,SparkStreaming其實(shí)是對(duì)RDD進(jìn)行微批量處理,核心還是對(duì)RDD的操作。所以SparkStreaming并不是真正意義上的流處理,最多實(shí)現(xiàn)秒級(jí)響應(yīng),無(wú)法做到毫秒級(jí)。DStream是SparkStreaming的數(shù)據(jù)抽象。一個(gè)DStream由多個(gè)RDD組成。Streaming將DStream按照時(shí)間片拆分成一個(gè)個(gè)RDD批次,構(gòu)成一個(gè)RDD序列。DStream數(shù)據(jù)流如下圖。SparkStreaming工作原理2.剖析DStream1SparkStreaming基本概念目錄流計(jì)算簡(jiǎn)介2SparkStreaming工作原理3SparkStreaming運(yùn)行機(jī)制4SparkStreaming運(yùn)行機(jī)制跟Spark工作機(jī)制非常相似,因?yàn)镾parkStreaming其實(shí)質(zhì)就是spark對(duì)RDD的操作。每個(gè)WorkerNode中會(huì)有一個(gè)接收器Receiver,長(zhǎng)時(shí)間(可能7*24小時(shí))運(yùn)行在Excutor之上。每個(gè)Receiver會(huì)對(duì)接一個(gè)inuptDStream,inputDStream負(fù)責(zé)源源不斷的輸入數(shù)據(jù)。SparkStreaming運(yùn)行機(jī)制Receiver主要任務(wù)就是負(fù)責(zé)對(duì)接inputDStream,接收并處理流數(shù)據(jù)。SparkStreaming運(yùn)行機(jī)制inputDStream對(duì)接的數(shù)據(jù)流可以有多種套接字流:通過(guò)socket不斷發(fā)送數(shù)據(jù)。文件流:監(jiān)聽(tīng)文本,文件一旦發(fā)生改變就傳輸文本。Kafka:接收Kafka的數(shù)據(jù)流。SparkStreaming運(yùn)行機(jī)制靜態(tài)數(shù)據(jù)與動(dòng)態(tài)數(shù)據(jù)流計(jì)算與批處理主流的開(kāi)源流數(shù)據(jù)框架:
Storm、Flink、SparkStreamingSparkStreaming基本概念
什么是SparkStreaming、DStream、時(shí)間片與時(shí)間間隔、微批處理、inputDStreamSparkStreaming工作原理:流處理轉(zhuǎn)化為批處理SparkStreaming運(yùn)行機(jī)制
通過(guò)Receiver負(fù)責(zé)對(duì)接inputDStream,接收并處理流數(shù)據(jù)小結(jié)pyspark.streaming模塊1pyspark.streaming模塊核心類目錄pyspark.streaming模塊簡(jiǎn)介2DStream基礎(chǔ)操作3pyspark.streaming模塊是PySpark用于處理流計(jì)算的模塊,是SparkStreaming對(duì)Python的API接口。
在Python環(huán)境中可以通過(guò)調(diào)用pyspark.streaming模塊中多個(gè)功能不同的類對(duì)Spark進(jìn)行操作,完成流式數(shù)據(jù)的計(jì)算、分析與處理。pyspark.streaming模塊簡(jiǎn)介
pyspark.streaming模塊包含三個(gè)類,分別是:StreamingContext類:SparkStreaming程序的主要入口DStream類:SparkStreaming數(shù)據(jù)操作StreamingListener類:對(duì)流處理時(shí)間的監(jiān)聽(tīng),設(shè)置對(duì)應(yīng)的時(shí)間處理函數(shù)pyspark.streaming模塊簡(jiǎn)介為了對(duì)接Kafka、Kinesis和Flume這些目前主流且與ApacheHadoop平臺(tái)聯(lián)系較為緊密的流數(shù)據(jù)模塊,pyspark.streaming還提供了:pyspark.streaming.kafka模塊pyspark.streaming.kinesis模塊pyspark.streaming.flume模塊pyspark.streaming模塊簡(jiǎn)介1pyspark.streaming模塊核心類目錄pyspark.streaming模塊簡(jiǎn)介2DStream基礎(chǔ)操作3主要功能SparkStreaming主程序的入口。SparkStreaming編程從創(chuàng)建StreamingContext對(duì)象開(kāi)始。提供了大量的根據(jù)不同數(shù)據(jù)源創(chuàng)建輸入Dstream
的API。提供了啟動(dòng)流開(kāi)始、等待流停止、控制流結(jié)束、事件監(jiān)聽(tīng)、提供容錯(cuò)機(jī)制等的API。負(fù)責(zé)與Spark集群進(jìn)行連接,在現(xiàn)有SparkContext的基礎(chǔ)上構(gòu)建數(shù)據(jù)流環(huán)境。pyspark.streaming模塊核心類1.StreamingContext類主要方法StreamingCon-text(sparkContext,batchDuration=None,jssc=None):構(gòu)造函數(shù),返回Dstream對(duì)象addStreamingListener(streamingListener):添加一個(gè)流處理事件監(jiān)聽(tīng)器awaitTermination(timeout=None):等待停止流計(jì)算,參數(shù)timeout為超時(shí)設(shè)置checkpoint(directory):容錯(cuò)機(jī)制檢查點(diǎn),檢查點(diǎn)數(shù)據(jù)存儲(chǔ)在參數(shù)目錄中queueS-tream(rdds,oneAtATime=True,default=None):從RDD或列表隊(duì)列創(chuàng)建輸入流,參數(shù)rdds為rdd隊(duì)列,oneAtATime為每次選擇一個(gè)rdd或選擇所有rdd一次。default如果沒(méi)有更多的rdds,則為默認(rèn)rddsocketTextStream(hostname,port,storageLevel=(True,True,False,False,2)):從TCP源hostname:port創(chuàng)建輸入。使用TCPsock-et接收數(shù)據(jù),并將接收字節(jié)解釋為UTF8編碼的分隔行pyspark.streaming模塊核心類主要方法start():?jiǎn)?dòng)流計(jì)算stop():停止流計(jì)算textFileStream(directory):創(chuàng)建一個(gè)輸入流,用于監(jiān)視與Hadoop兼容的文件系統(tǒng)中的新文件,并將其作為文本文件讀取。必須將文件從同一文件系統(tǒng)中的另一個(gè)位置“移動(dòng)”到受監(jiān)視的目錄中pyspark.streaming模塊核心類從SparkContext對(duì)象創(chuàng)建StreamingContext對(duì)象典型代碼frompysparkimportSparkContext,SparkConffrompyspark.streamingimportStreamingContextconf=SparkConf()conf.setAppName('TestDstream')conf.setMaster('local[2]')sc=SparkContext(conf=conf)ssc=StreamingContext(sc,1)pyspark.streaming模塊核心類代碼說(shuō)明從一個(gè)SparkContext對(duì)象創(chuàng)建一個(gè)StreamingContext對(duì)象,因而需同時(shí)導(dǎo)入SparkConf和SparkContext對(duì)象。使用SparkConf的setMaster()方法配置Spark的運(yùn)行模式,若選定本地單機(jī)運(yùn)行方式local[],根據(jù)SparkStreaming運(yùn)行機(jī)制,local[]中參數(shù)至少為2,即至少2個(gè)線程,本次為local[2]。語(yǔ)句ssc=StreamingContext(sc,1)的參數(shù)中,sc表示從名為sc的SparkContext對(duì)象去創(chuàng)建StreamingContext對(duì)象;1表示對(duì)數(shù)據(jù)流按1秒時(shí)間間隔進(jìn)行切分,每1秒中的數(shù)據(jù)切分成一個(gè)RDD。
pyspark.streaming.DStream相關(guān)API同spark.RDD非常相似,可實(shí)現(xiàn)對(duì)DStream的多種操作。數(shù)據(jù)來(lái)源可以是實(shí)時(shí)數(shù)據(jù)或上游DStream。DStream類主要APIcountByValue():統(tǒng)計(jì)每個(gè)RDD中不同鍵值對(duì)的次數(shù),返回一個(gè)新的DStreamforeachRdd(func):將函數(shù)func應(yīng)用于DStream的每個(gè)RDD中filter(f):返回一個(gè)僅滿足函數(shù)f元素的新DStreamreduce(func):
每個(gè)RDD通過(guò)reduce函數(shù)處理生成新的RDD,返回一個(gè)新的DStreamjoin():通過(guò)兩個(gè)DStream的公共RDDs進(jìn)行連接操作,返回一個(gè)新的DStreamWindow():滑動(dòng)窗口操作,返回一個(gè)窗口的統(tǒng)計(jì)值,詳見(jiàn)后續(xù)窗口操作pyspark.streaming模塊核心類2.DStream類1pyspark.streaming模塊核心類目錄pyspark.streaming模塊簡(jiǎn)介2DStream基礎(chǔ)操作3DStream基礎(chǔ)操作SparkStreaming編程其實(shí)質(zhì)是對(duì)DStream進(jìn)行一系列操作,一般步驟如下。創(chuàng)建StreamingContext對(duì)象,構(gòu)建SparkStreamingContext環(huán)境。使用輸入源創(chuàng)建輸入DStream。根據(jù)業(yè)務(wù)邏輯對(duì)DStream應(yīng)用轉(zhuǎn)換操作和輸出操作進(jìn)行流計(jì)算,以實(shí)現(xiàn)用戶處理目標(biāo)。啟動(dòng)SparkStreaming。調(diào)用StreamingContext對(duì)象的start()方法啟動(dòng)SparkStreaming,開(kāi)始接收數(shù)據(jù)和處理數(shù)據(jù)。結(jié)束SparkStreaming。調(diào)用StreamingContext對(duì)象的awaitTermination()方法等待流計(jì)算進(jìn)程結(jié)束,或調(diào)用StreamingContext對(duì)象的stop()方法手動(dòng)結(jié)束流計(jì)算進(jìn)程。其中步驟(2)與(3)屬于DStream基礎(chǔ)操作,也是SparkStreaming編程主要任務(wù)。SparkStreaming的所有操作都基于數(shù)據(jù)流,而輸入源是流計(jì)算一系列操作的起點(diǎn)。SparkStreaming提供了兩種內(nèi)置的輸入源支持,分別為基本輸入源和高級(jí)數(shù)據(jù)源?;据斎朐矗豪肧treamingContextAPI能從一些輸入源中直接創(chuàng)建輸入DStream,這樣的輸入源稱為基本輸入源?;据斎朐窗ㄎ募?、網(wǎng)絡(luò)Socket流及RDD隊(duì)列流等。高級(jí)輸入源:不能直接利用StreamingContextAPI創(chuàng)建輸入DStream,在使用時(shí)需引入第三方依賴庫(kù)。高級(jí)數(shù)據(jù)源包括Kafka、Flume、Kinesis等,DStream基礎(chǔ)操作1.輸入源(1)基本輸入源之文件流在文件流的應(yīng)用場(chǎng)景中,通過(guò)編寫SparkStreaming應(yīng)用程序,對(duì)文件系統(tǒng)中的某個(gè)目錄一直進(jìn)行監(jiān)控。一旦發(fā)現(xiàn)有新的文件生成,SparkStreaming自動(dòng)讀取和處理。
創(chuàng)建文件流輸入DStream:使用StreamingContext.textFileStream()方法。
示例語(yǔ)句:pages=ssc.textFileStream('file:///D:/streaming')
示例說(shuō)明:使用textFileStream()創(chuàng)建文件流輸入DStream,并賦給pages變量,監(jiān)控本地“D:\streaming”文件夾,一旦“D:\streaming”有新的文本文件產(chǎn)生,SparkStreaming立即讀取新產(chǎn)生的文本文件內(nèi)容并處理。DStream基礎(chǔ)操作1.輸入源注意事項(xiàng)不同文件系統(tǒng)的書寫格式是不同的,HDFS是“HDFS://path”,本地文件系統(tǒng)則是“file:///path”。文件必須具有相同的數(shù)據(jù)格式,創(chuàng)建的文件必須在參數(shù)指定的目錄下。文件流不需要運(yùn)行Receiver,不需要為接收文件數(shù)據(jù)分配CPU內(nèi)核。DStream基礎(chǔ)操作創(chuàng)建Socket流輸入DStream:使用StreamingContext.socketTextStream(hostname,port)方法。其中參數(shù)hostname代表主機(jī)IP,參數(shù)port代表主機(jī)端口號(hào)。
示例語(yǔ)句:lines=ssc.socketTextStream('localhost',9999)示例說(shuō)明:使用socketTextStream()創(chuàng)建socket流輸入DStream,并賦給lines變量,作為TCP客戶端,源源不斷接收來(lái)自本機(jī)、端口號(hào)為9999的服務(wù)器數(shù)據(jù)交給用戶自定義的SparkStreaming應(yīng)用程序。DStream基礎(chǔ)操作(2)基本輸入源之網(wǎng)絡(luò)Socket流注意事項(xiàng)若使用Socket流輸入源,需要啟動(dòng)一個(gè)TCP服務(wù)器端。啟動(dòng)一個(gè)TCP服務(wù)器端有如下兩種方法。使用NetCat程序啟動(dòng)一個(gè)Socket服務(wù)器端,讓服務(wù)器端接收客戶端的請(qǐng)求,并向客戶端不斷地發(fā)送數(shù)據(jù)流。Windows操作系統(tǒng)可從網(wǎng)上下載安裝NetCat并在cmd命令窗口使用。使用Python編寫模擬Socket流輸入源程序。更多相關(guān)知識(shí)可查閱PythonSocket客戶端/服務(wù)器編程。DStream基礎(chǔ)操作在PythonIDLE環(huán)境編寫一個(gè)名為sockettest.py程序,模擬Socket流從鍵盤源源不斷輸入英文語(yǔ)句,并作為Socket服務(wù)器端,為一個(gè)SparkStreaming程序提供Socket流輸入源。
DStream基礎(chǔ)操作測(cè)試SparkStreaming應(yīng)用程序時(shí)往往直接使用RDD隊(duì)列作為輸入DStream,因而RDD隊(duì)列流的主要應(yīng)用場(chǎng)景是對(duì)SparkStreaming應(yīng)用程序進(jìn)行調(diào)試。在PySpark中,使用StreamingContext.queueStream(rdds)方法,從RDD或Python的列表中創(chuàng)建輸入DStream,其中參數(shù)rdds是一個(gè)RDD隊(duì)列。整個(gè)RDD隊(duì)列將被視為數(shù)據(jù)流,每一個(gè)推送到這個(gè)隊(duì)列中的RDD,都將作為一個(gè)DStream進(jìn)行加工處理。DStream基礎(chǔ)操作(3)基本輸入源之RDD隊(duì)列流生產(chǎn)環(huán)境中,SparkStreaming更多地從一些高級(jí)數(shù)據(jù)源(如Kafka)接收來(lái)自上游的數(shù)據(jù),然后經(jīng)過(guò)處理輸出結(jié)果。SparkStreaming沒(méi)有自帶創(chuàng)建高級(jí)數(shù)據(jù)源DStream的API,使用時(shí)需引入第三方依賴庫(kù)。以Kafka為例,不同的Spark版本號(hào)需對(duì)應(yīng)不同的Kafka版本號(hào),Kafka版本號(hào)一定要與已安裝好的的Scala版本號(hào)一致,同時(shí)搭配對(duì)應(yīng)版本spark-streaming-kafka-assembly的jar包。DStream基礎(chǔ)操作(4)高級(jí)數(shù)據(jù)源SparkStreaming從輸入DStream開(kāi)始,將連續(xù)的數(shù)據(jù)流切分成一個(gè)個(gè)批次,然后對(duì)每個(gè)批次內(nèi)的DStream數(shù)據(jù)進(jìn)行處理,實(shí)質(zhì)是對(duì)DStream執(zhí)行各種轉(zhuǎn)換操作。根據(jù)是否需要記錄DStream歷史狀態(tài)信息,DStream轉(zhuǎn)換操作分為DStream無(wú)狀態(tài)轉(zhuǎn)換操作和DStream有狀態(tài)轉(zhuǎn)換操作兩種。(1)DStream無(wú)狀態(tài)轉(zhuǎn)換操作DStream進(jìn)行轉(zhuǎn)換操作時(shí),若數(shù)據(jù)對(duì)象僅為SparkStreaming一個(gè)批次內(nèi)的數(shù)據(jù),則稱為DStream無(wú)狀態(tài)轉(zhuǎn)換。DStream無(wú)狀態(tài)轉(zhuǎn)換每次只計(jì)算當(dāng)前時(shí)間批次的內(nèi)容,處理結(jié)果不依賴之前批次的歷史數(shù)據(jù)。DStream基礎(chǔ)操作2.DStream轉(zhuǎn)換操作常用DStream無(wú)狀態(tài)轉(zhuǎn)換操作算子map(func):對(duì)當(dāng)前DStream的每個(gè)元素,采用func函數(shù)進(jìn)行轉(zhuǎn)換,得到一個(gè)新的DStream。flatMap(func):與map()相似,對(duì)當(dāng)前DStream的每個(gè)元素,采用func函數(shù)進(jìn)行轉(zhuǎn)換,得到一個(gè)新的DStream,但每個(gè)輸入項(xiàng)可用被映射為0個(gè)或多個(gè)輸出項(xiàng)。filter(func):過(guò)濾操作,返回滿足函數(shù)func的新的DStream。union(otherStream):合并當(dāng)前DStream和其他DStream的元素,返回一個(gè)新的DStream。count():統(tǒng)計(jì)當(dāng)前DStream中每個(gè)RDD的元素?cái)?shù)量。DStream基礎(chǔ)操作常用DStream無(wú)狀態(tài)轉(zhuǎn)換操作算子reduce(func):利用函數(shù)func聚集當(dāng)前DStream中每個(gè)RDD的元素,返回一個(gè)包含單元素RDD的新DStream。countByValue():應(yīng)用于元素類型為K的DStream上,返回一個(gè)(K,V)鍵值對(duì)類型的新DStream,每個(gè)鍵的值是在原DStream的每個(gè)RDD中的出現(xiàn)次數(shù)。reduceByKey(func,[numTasks]):當(dāng)在一個(gè)由(K,V)鍵值對(duì)組成的DStream上執(zhí)行該操作時(shí),返回一個(gè)新的由(K,V)鍵值對(duì)組成的DStream,每一個(gè)key的值均由給定的reduce函數(shù)(func)聚集起來(lái)。join(otherStream,[num-Tasks]):應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)鍵值對(duì),一個(gè)包含(K,W)鍵值對(duì)),返回一個(gè)包含(K,(V,W))鍵值對(duì)的新DStream。DStream基礎(chǔ)操作常用DStream無(wú)狀態(tài)轉(zhuǎn)換操作算子cogroup(otherStream,[numTasks]):應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)鍵值對(duì),一個(gè)包含(K,W)鍵值對(duì)),返回一個(gè)包含(K,Seq[V],Seq[W])的元組。transform(func):將當(dāng)前的DStream的每個(gè)RDD轉(zhuǎn)換為新的RDD,返回一個(gè)新的DStream。該函數(shù)操作靈活,可用于實(shí)現(xiàn)DStreamAPI中沒(méi)有提供的操作。DStream基礎(chǔ)操作現(xiàn)通過(guò)兩個(gè)實(shí)例講解輸入源與無(wú)狀態(tài)轉(zhuǎn)換操作算子的使用。實(shí)例1:?jiǎn)卧~統(tǒng)計(jì)單詞統(tǒng)計(jì)實(shí)例講解基本輸入源之網(wǎng)絡(luò)流與無(wú)狀態(tài)轉(zhuǎn)換算子flatMap()、map()、reduceByKey()的使用。前面已在PythonIDLE集成環(huán)境編寫了一個(gè)Socket服務(wù)器端程序,現(xiàn)在在JupyterNotebook中編寫SparkStreaming程序作為Socket客戶端,對(duì)接收到的英文語(yǔ)句進(jìn)行單詞統(tǒng)計(jì)處理
。DStream基礎(chǔ)操作實(shí)例1:?jiǎn)卧~統(tǒng)計(jì)程序運(yùn)行可分為四步:在PythonIDLE環(huán)境程序運(yùn)行sockettest.py程序,啟動(dòng)Socket服務(wù)器端,監(jiān)聽(tīng)本機(jī)9999號(hào)端口,開(kāi)始等待狀態(tài)。在JupyterNotebook中啟動(dòng)流計(jì)算。一旦在Jupyter中開(kāi)始輸出當(dāng)前日期與時(shí)間,則表示流計(jì)算已正常啟動(dòng),將當(dāng)前工作窗口切換到PythonIDLE環(huán)境。sockettest.py程序已經(jīng)監(jiān)聽(tīng)到在9999號(hào)端口有來(lái)自客戶端的請(qǐng)求,等待用戶輸入。如下圖。此時(shí)可從鍵盤輸入英文語(yǔ)句,或從一篇英語(yǔ)文章復(fù)制一段,輸入回車鍵結(jié)束一次輸入??梢圆煌5剌斎耄鳛镾ocket流輸入源,直至按住鍵盤Ctrl+C鍵終止。DStream基礎(chǔ)操作輸入實(shí)例如左圖。切換到Jupyter,觀察程序的運(yùn)行情況。SparkStreaming程序每10秒鐘處理一次??梢詠?lái)回在PythonIDLE環(huán)境和Jupyter中進(jìn)行切換,一邊輸入一邊觀察程序的運(yùn)行處理情況。程序運(yùn)行結(jié)果如右圖。DStream基礎(chǔ)操作實(shí)例1:?jiǎn)卧~統(tǒng)計(jì)
DStream基礎(chǔ)操作實(shí)例2:計(jì)算網(wǎng)頁(yè)熱度simulator.py程序在PythonIDLE中運(yùn)行,作為SparkStreaming文件流輸入源,源源不斷產(chǎn)生文本文件,模擬器程序運(yùn)行情況部分截圖如左圖。simulator.py程序運(yùn)行時(shí)結(jié)果生成一堆文本文件,如右圖。DStream基礎(chǔ)操作實(shí)例2:計(jì)算網(wǎng)頁(yè)熱度PySpark中編寫DStream一個(gè)批次的網(wǎng)頁(yè)熱度計(jì)算程序,遵循SparkStreaming編程一般步驟,具體編寫過(guò)程如下。創(chuàng)建StreamingContext對(duì)象,構(gòu)建SparkStreamingContext環(huán)境。需注意的是為了提高SparkStreaming程序性能,每個(gè)批次的間隔選取需依據(jù)具體業(yè)務(wù)邏輯,本例中ssc=StreamingContext(sc,10),設(shè)定DStream批次間隔為10s。frompysparkimportSparkContext,SparkConffrompyspark.streamingimportStreamingContextconf=SparkConf()conf.setAppName('pagehot')conf.setMaster('local[2]')sc=SparkContext(conf=conf)ssc=StreamingContext(sc,10)DStream基礎(chǔ)操作實(shí)例2:計(jì)算網(wǎng)頁(yè)熱度創(chuàng)建輸入DStream。本例利用textFileStream(‘file:///D:/streaming’)創(chuàng)建文件流輸入DStream存放到pages變量中,監(jiān)控目錄D:\streaming下的數(shù)據(jù),一旦在D:\streaming產(chǎn)生新文件,網(wǎng)頁(yè)熱度計(jì)算程序立馬捕捉新文件并讀取新文件內(nèi)容,代碼如下。pages=ssc.textFileStream('file:///D:/streaming’)DStream基礎(chǔ)操作實(shí)例2:計(jì)算網(wǎng)頁(yè)熱度根據(jù)業(yè)務(wù)邏輯對(duì)DStream應(yīng)用轉(zhuǎn)換操作和輸出操作進(jìn)行流計(jì)算。本例首先使用map()算子調(diào)用自定義函數(shù)calculate_hot()對(duì)pagesDStream進(jìn)行轉(zhuǎn)換。pagesDStream內(nèi)容為形如“041.html,7,5,0.9,-1”的記錄數(shù)據(jù),代表某個(gè)網(wǎng)頁(yè)用戶的瀏覽行為。calculate_hot()函數(shù)對(duì)pagesDStream每一行數(shù)據(jù)按照“,”分隔符進(jìn)行分割,生成一個(gè)列表,列表的第0個(gè)元素為網(wǎng)頁(yè)號(hào),第1到第4個(gè)元素依次為“用戶等級(jí)”、“訪問(wèn)次數(shù)”、“停留時(shí)間”以及“是否點(diǎn)贊”等具體值,代入計(jì)算公式,得到某個(gè)網(wǎng)頁(yè)的熱度值,保留兩位小數(shù)。pagesDStream經(jīng)過(guò)map()無(wú)狀態(tài)轉(zhuǎn)換操作后生成pagehotDStream,pagehot數(shù)據(jù)類型為key-value(鍵值對(duì)),其中網(wǎng)頁(yè)號(hào)為key,value為網(wǎng)頁(yè)熱度值。接著對(duì)pagehotDStream進(jìn)行reduceByKey()操作得到pagehot_countDStream,統(tǒng)計(jì)在一個(gè)批次內(nèi)(本例為10s,由StreamingContext(sc,10)設(shè)置的時(shí)間)某個(gè)網(wǎng)頁(yè)的熱度。最后使用pagehot_count.pprint()方法對(duì)pagehot_count進(jìn)行輸出操作,結(jié)果輸出到控制臺(tái)。DStream基礎(chǔ)操作實(shí)例2:計(jì)算網(wǎng)頁(yè)熱度使用StreamingContext對(duì)象的start()方法啟動(dòng)流計(jì)算。使用StreamingContext對(duì)象的awaitTermination()方法等待流計(jì)算進(jìn)程結(jié)束,或調(diào)用StreamingContext對(duì)象的stop()方法手動(dòng)結(jié)束流計(jì)算進(jìn)程。
注:程序運(yùn)行說(shuō)明
在JupyterNotebook中啟動(dòng)流計(jì)算運(yùn)行網(wǎng)頁(yè)熱度計(jì)算程序,一旦在Jupyter中,開(kāi)始輸出當(dāng)前日期與時(shí)間,意味著流計(jì)算已正常啟動(dòng),切換到PythonIDLE環(huán)境。在Python3IDLE中運(yùn)行simulator.py模擬器程序,模擬器每隔5秒生成一個(gè)文本文件作為SparkStreaming文件流輸入源,SparkStreaming進(jìn)行實(shí)時(shí)運(yùn)算處理。DStream基礎(chǔ)操作實(shí)例2:計(jì)算網(wǎng)頁(yè)熱度(2)DStream有狀態(tài)轉(zhuǎn)換操作DStream有狀態(tài)轉(zhuǎn)換操作包括滑動(dòng)窗口轉(zhuǎn)換操作和updateStateByKey操作,屬于跨批次操作,需要記錄DStream每個(gè)批次的歷史數(shù)據(jù)狀態(tài)信息?;瑒?dòng)窗口轉(zhuǎn)換操作創(chuàng)建窗口:使用window(windowLength,slideInterval)方法。參數(shù)說(shuō)明:windowLength(窗口長(zhǎng)度),指一次加載處理多長(zhǎng)時(shí)間的數(shù)據(jù)。slideInterval(窗口滑動(dòng)間隔),指多長(zhǎng)時(shí)間窗口向前滑動(dòng)一次即執(zhí)行一次處理,兩者值的設(shè)置必須為DStream一個(gè)批次時(shí)間的整數(shù)倍。DStream基礎(chǔ)操作2.DStream轉(zhuǎn)換操作假設(shè)源DStream批次時(shí)間為10s,即10秒數(shù)據(jù)流切分為一個(gè)RDD,現(xiàn)需要每隔20s(2個(gè)批次間隔),對(duì)前30s(3個(gè)批次間隔)的數(shù)據(jù)進(jìn)行整合計(jì)算[time1+time2+time3],此時(shí)可使用滑動(dòng)窗口計(jì)算,滑動(dòng)窗口計(jì)算過(guò)程如下圖所示。DStream基礎(chǔ)操作滑動(dòng)窗口長(zhǎng)度為3個(gè)批次,窗口首先覆蓋time1、time2、time3這3個(gè)批次的數(shù)據(jù),將窗口內(nèi)time1、time2、time3的3個(gè)RDD聚合起來(lái)進(jìn)行計(jì)算處理然后經(jīng)過(guò)2個(gè)批次時(shí)間單位滑動(dòng)窗口,此時(shí)窗口覆蓋time3、time4、time5三個(gè)批次的數(shù)據(jù),并對(duì)其進(jìn)行同樣的操作處理。常見(jiàn)的DStream滑動(dòng)窗口轉(zhuǎn)換操作算子window(windowLength,slideInterval)取某個(gè)滑動(dòng)窗口所覆蓋的DStream數(shù)據(jù),返回一個(gè)新的DStream。reduceByWindow(func,windowLength,slideIn-terval)對(duì)滑動(dòng)窗口內(nèi)的每個(gè)RDD中的元素進(jìn)行聚合操作,返回由多個(gè)單元素DRR組成的新DStream。reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)聚集起來(lái)。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組??梢杂胣umTasks參數(shù)設(shè)置不同的任務(wù)數(shù)。DStream基礎(chǔ)操作常見(jiàn)的DStream滑動(dòng)窗口轉(zhuǎn)換操作算子reduceByKeyAndWindow(func,invFunc,win-dowLength,slideInterval,[numTasks])更加高效的reduceByKeyAndWindow(),其中每個(gè)窗口的reduce值使用前一個(gè)窗口的reduce值遞增地計(jì)算。這是通過(guò)減少進(jìn)入滑動(dòng)窗口的新數(shù)據(jù)和“反向減少”離開(kāi)窗口的舊數(shù)據(jù)來(lái)實(shí)現(xiàn)的。例如,在滑動(dòng)窗口時(shí)“添加”和“減去”key的數(shù)量。但是,只能用于“可逆reduce函數(shù)”,即那些reduce函數(shù)都有一個(gè)對(duì)應(yīng)的“逆向reduce函數(shù)”(以invfunc參數(shù)傳入)。countByWindow(windowLength,slideInterval)計(jì)算一個(gè)滑動(dòng)窗口中的元素的數(shù)量。countByValueAndWindow(windowLength,slideInterval,[numTasks])應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每個(gè)key的值都是它們?cè)诨瑒?dòng)窗口中出現(xiàn)的頻率。DStream基礎(chǔ)操作滑動(dòng)窗口應(yīng)用場(chǎng)景滑動(dòng)窗口轉(zhuǎn)換操作每次可計(jì)算最近多少時(shí)間內(nèi)的數(shù)據(jù),在實(shí)際的應(yīng)用可以是每隔1分鐘計(jì)算最近24小時(shí)的熱搜排行榜,每隔10秒鐘計(jì)算最近10分鐘的廣告點(diǎn)擊量等,其典型應(yīng)用場(chǎng)景有微博熱點(diǎn),微博熱點(diǎn)一般是最近30分鐘內(nèi)最熱門的頭條。
DStream基礎(chǔ)操作在“每隔10秒計(jì)算最近1分鐘的網(wǎng)頁(yè)熱度”的代碼中,使用reduceByKeyAndWindow(lambdav1,v2:round(v1+v2,2),60,10)語(yǔ)句進(jìn)行窗口聚合計(jì)算,計(jì)算結(jié)果保留2位小數(shù),設(shè)置窗口長(zhǎng)度為60秒,滑動(dòng)窗口間隔為10秒,均為當(dāng)前批次5秒的整數(shù)倍。程序運(yùn)行每10秒顯示一次最近60秒的計(jì)算結(jié)果。另外,對(duì)DStream有狀態(tài)轉(zhuǎn)換操作,還需要啟用數(shù)據(jù)checkpoint容錯(cuò)機(jī)制,將實(shí)時(shí)計(jì)算過(guò)程中產(chǎn)生的RDD的數(shù)據(jù)保存到可靠的存儲(chǔ)系統(tǒng)中,從而進(jìn)行周期性的RDDcheckpoint,一旦運(yùn)算失敗可利用存儲(chǔ)在checkpoint目錄中數(shù)據(jù)進(jìn)行快速恢復(fù)。如代碼中使用“ssc.checkpoint('D://checkpoint1')”語(yǔ)句啟用checkpoint,其中參數(shù)“D://checkpoint1”為本地D盤checkpoint1文件目錄,checkpoint數(shù)據(jù)會(huì)寫入D:\checkpoint1目錄中。
DStream基礎(chǔ)操作滑動(dòng)窗口應(yīng)用示例功能改進(jìn)滑動(dòng)窗口轉(zhuǎn)換操作的局限性。滑動(dòng)窗口操作僅窗口內(nèi)的數(shù)據(jù)批次聚合,無(wú)法對(duì)較長(zhǎng)時(shí)間、窗口外跨批次之間數(shù)據(jù)進(jìn)行聚合計(jì)算??蓪?shí)現(xiàn)較長(zhǎng)時(shí)間、多個(gè)批次的數(shù)據(jù)聚合處理。數(shù)據(jù)類型和處理過(guò)程
處理的數(shù)據(jù)為key-value(鍵值對(duì))類型數(shù)據(jù)。處理過(guò)程分為兩步,第一步定義狀態(tài),為每一個(gè)key定義一個(gè)狀態(tài),狀態(tài)可以是任意的數(shù)據(jù)類型,可以是一個(gè)自定義的對(duì)象。第二步定義狀態(tài)更新函數(shù),通過(guò)更新函數(shù)對(duì)key的狀態(tài)不斷更新。應(yīng)用場(chǎng)景對(duì)實(shí)時(shí)計(jì)算中的歷史數(shù)據(jù)進(jìn)行統(tǒng)計(jì),例如統(tǒng)計(jì)不同時(shí)間段用戶平均消費(fèi)金額、消費(fèi)次數(shù)、消費(fèi)總額、統(tǒng)計(jì)網(wǎng)站的不同時(shí)間段的訪問(wèn)量等。
DStream基礎(chǔ)操作②updateStateByKey()updateStateByKey()對(duì)全局的key的狀態(tài)進(jìn)行統(tǒng)計(jì),并在每一個(gè)批次間隔,返回之前的全部歷史數(shù)據(jù),包括新增的key、改變的key和沒(méi)有變化的key的狀態(tài)信息。由于不斷地更新每個(gè)key的狀態(tài),當(dāng)前生成的RDD依賴于之前批次的RDD,這將導(dǎo)致RDD依賴鏈隨著時(shí)間的推移而擴(kuò)大。為了避免RDD依賴鏈不受限制的增長(zhǎng)給恢復(fù)時(shí)帶來(lái)的麻煩,updateStateByKey()方法在使用時(shí)一定要做checkpoint,在可靠的存儲(chǔ)介質(zhì)中創(chuàng)建checkpoint目錄,定期保存狀態(tài)轉(zhuǎn)換操作的中間RDD,以切斷這種RDD依賴鏈的影響。
DStream基礎(chǔ)操作②updateStateByKey()示例:使用updateStateByKey()方法計(jì)算某個(gè)時(shí)間段網(wǎng)頁(yè)熱度程序編寫遵循SparkStreaming編程一般步驟創(chuàng)建StreamingContext對(duì)象,構(gòu)建SparkStreamingContext環(huán)境。其中“ssc=StreamingContext(sc,1)”語(yǔ)句設(shè)置批次間隔時(shí)間為1秒。創(chuàng)建輸入DStream。使用“pages3=ssc.socketTextStream(localhost‘,9999)”語(yǔ)句,創(chuàng)建一個(gè)輸入DStream存放到變量pages3,作為客戶端將接收來(lái)自本地主機(jī)(localhost)的端口號(hào)為9999的數(shù)據(jù)。另外,使用“ssc.checkpoint(‘D://checkpoint2‘)”語(yǔ)句啟用checkpoint容錯(cuò)檢查機(jī)制,checkpoint數(shù)據(jù)將寫在本地D:\checkpoint2目錄中。
DStream基礎(chǔ)操作②updateStateByKey()示例:使用updateStateByKey()方法計(jì)算某個(gè)時(shí)間段網(wǎng)頁(yè)熱度根據(jù)業(yè)務(wù)邏輯對(duì)DStream應(yīng)用轉(zhuǎn)換操作和輸出操作進(jìn)行流計(jì)算。啟動(dòng)流計(jì)算。等待流結(jié)束。
DStream基礎(chǔ)操作示例:使用updateStateByKey()方法計(jì)算某個(gè)時(shí)間段網(wǎng)頁(yè)熱度程序運(yùn)行過(guò)程在Windows命令窗口運(yùn)行NetCat啟動(dòng)一個(gè)Socket服務(wù)器端,讓該服務(wù)器端接收客戶端的請(qǐng)求,并向客戶端不斷地發(fā)送數(shù)據(jù)流。具體過(guò)程為運(yùn)行cmd指令打開(kāi)Windows命令窗口,在命令窗口內(nèi)運(yùn)行如下代碼生成一個(gè)Socket服務(wù)器端。
C:\Users\hongyou>nc-l-p9999–v#參數(shù)“-l-p9999”表示啟動(dòng)監(jiān)聽(tīng)模式,作為Socket服務(wù)器端,nc會(huì)監(jiān)聽(tīng)本機(jī)(localhost)9999號(hào)端口,參數(shù)-v會(huì)在屏幕兩次提示監(jiān)聽(tīng)狀態(tài)。在JupyterNotebook中啟動(dòng)流計(jì)算。一旦在Jupyter中開(kāi)始輸出當(dāng)前日期與時(shí)間,則流計(jì)算已正常啟動(dòng),可將當(dāng)前工作窗口切換到cmd命令窗口環(huán)境。
DStream基礎(chǔ)操作示例:使用updateStateByKey()方法計(jì)算某個(gè)時(shí)間段網(wǎng)頁(yè)熱度從test.log文件中,不斷復(fù)制一段又一段代碼,模擬技術(shù)博客網(wǎng)站的日志文件數(shù)據(jù),輸入的內(nèi)容將作為輸入源被接收。Netcat數(shù)據(jù)輸入如下圖。JupyterNotebook中同時(shí)運(yùn)行“使用updateStateByKey()方法計(jì)算某個(gè)時(shí)間段網(wǎng)頁(yè)熱度”代碼,JupyterNotebook作為客戶端接收來(lái)自Socket服務(wù)器端的數(shù)據(jù),本例從本地的9999端口接收數(shù)據(jù),進(jìn)行某個(gè)時(shí)間段網(wǎng)頁(yè)熱度計(jì)算統(tǒng)計(jì)。
DStream基礎(chǔ)操作示例:使用updateStateByKey()方法計(jì)算某個(gè)時(shí)間段網(wǎng)頁(yè)熱度通過(guò)觀察發(fā)現(xiàn),程序運(yùn)行時(shí)間延遲越來(lái)越長(zhǎng),原因如下。使用updateStateByKey()方法獲得的是整個(gè)狀態(tài)的數(shù)據(jù),每次狀態(tài)更新對(duì)所有數(shù)據(jù)都調(diào)用自定義的函數(shù)進(jìn)行一次計(jì)算,不管數(shù)據(jù)在這個(gè)批次是否有變化,在批次間隔輸出全部數(shù)據(jù)。隨著時(shí)間推移,數(shù)據(jù)量不斷增長(zhǎng),需要維護(hù)的狀態(tài)越來(lái)越大,會(huì)非常影響性能。如果不能在當(dāng)前批次將數(shù)據(jù)處理完成,很容易造成數(shù)據(jù)堆積,影響程序穩(wěn)定運(yùn)行甚至宕掉。
DStream基礎(chǔ)操作在SparkStreaming中,DStream的輸出操作是真正觸發(fā)DStream上所有轉(zhuǎn)換操作進(jìn)行計(jì)算的操作。與RDD中的Action操作類似,只有DStream輸出操作,DStream中的數(shù)據(jù)才能與外部進(jìn)行交互,如將數(shù)據(jù)寫入到分布式文件系統(tǒng)、數(shù)據(jù)庫(kù)及其他應(yīng)用中。PySpark中,與DStream輸出相關(guān)的APIpprint():在DStream的每個(gè)批數(shù)據(jù)中打印前10條元素,這個(gè)操作在開(kāi)發(fā)和調(diào)試中都非常有用。saveAsText-Files(prefix,[suffix]):保存DStream的內(nèi)容為一個(gè)文本文件。每一個(gè)批間隔的文件的文件名基于prefix和suffix生成?!皃refix-TIME_IN_MS[.suffix]”。foreachRDD(func):將函數(shù)func用于產(chǎn)生于DStream的每個(gè)RDD。其中參數(shù)傳入的函數(shù)func應(yīng)該實(shí)現(xiàn)將每一個(gè)RDD中數(shù)據(jù)推送到外部系統(tǒng),如將RDD存入文件或通過(guò)網(wǎng)絡(luò)將其寫入數(shù)據(jù)庫(kù)。DStream基礎(chǔ)操作3.DStream輸出操作在DStream上調(diào)用saveAsTextFiles()方法將DStream輸出到一個(gè)文本文件。如使用語(yǔ)句“saveAsTextFiles('file:///D:/data/output’)”將DStream結(jié)果寫入到D盤data目錄中。在這個(gè)目錄下會(huì)生成很多output打頭的子目錄,這是因?yàn)榱饔?jì)算過(guò)程在不停地進(jìn)行中,每計(jì)算一次計(jì)算產(chǎn)生一個(gè)子目錄。進(jìn)入某個(gè)目錄下,可以看到類似part-00000的文件,里面包含了流計(jì)算過(guò)程輸出的結(jié)果。在單詞統(tǒng)計(jì)中,將計(jì)算結(jié)果保存為文本文件,可使用語(yǔ)句“saveAsTextFiles(‘file:///D:/test1/output’)”
將DStream內(nèi)容寫入到D盤test1目錄中。DStream基礎(chǔ)操作3.DStream輸出操作程序運(yùn)行過(guò)程同前述,在D盤test1目錄中生成很多output開(kāi)頭的子目錄,這是因?yàn)榱饔?jì)算過(guò)程在不停地進(jìn)行中,每個(gè)批次計(jì)算產(chǎn)生一個(gè)子目錄,如左圖。進(jìn)入某個(gè)目錄下,可以看到類似part-00000的文件,里面包含了流計(jì)算過(guò)程輸出的結(jié)果。如右圖。DStream基礎(chǔ)操作PySpark中的pyspark.streaming模塊,包括其模塊簡(jiǎn)介和核心類StreamContext類、DStream類
。DStream基本操作。包括從基本輸入源創(chuàng)建輸入DStream、DStream轉(zhuǎn)換操作和DStream輸出操作。關(guān)于輸入源
輸入源包括基本輸入源和高級(jí)輸入源,本節(jié)重點(diǎn)介紹基本輸入源,包括文件流、網(wǎng)絡(luò)Socket流和RDD隊(duì)列流。關(guān)于DStream轉(zhuǎn)換操作包括無(wú)狀態(tài)轉(zhuǎn)換操作和有狀態(tài)轉(zhuǎn)換操作。無(wú)狀態(tài)轉(zhuǎn)換操作只會(huì)記錄當(dāng)前批次數(shù)據(jù)的狀態(tài)。重點(diǎn)介紹了常用的無(wú)狀態(tài)DStream轉(zhuǎn)換操作算子,如:map(func)、flatMap(func)、filter(func)、rdeuceByKey(func,[numTasks])等。有狀態(tài)轉(zhuǎn)換操作主要介紹窗口轉(zhuǎn)換操作和updateStateByKey操作。小結(jié)StructuredStreaming
結(jié)構(gòu)化流式處理StructuredStreaming是Spark2.0版本開(kāi)始推出的一種實(shí)時(shí)流框架,建立在SparkSQL之上,是一個(gè)可伸縮的、容錯(cuò)的流處理引擎。StructuredStreaming通過(guò)一致的API整合了批處理和流處理,可以像編寫批處理程序一樣編寫流處理程序。StructuredStreaming結(jié)構(gòu)化流式處理1StructuredStreaming編程模型目錄StructuredStreaming概述2StructuredStreaming基礎(chǔ)操作3StructuredStreaming編程步驟4StructuredStreaming基于SparkSQL引擎構(gòu)建的可擴(kuò)展且容錯(cuò)的StreamProcessingEngine(流處理引擎),可以在靜態(tài)數(shù)據(jù)Dataset/DataFrame上像批處理計(jì)算一樣進(jìn)行流計(jì)算。可以使用SparkSQL中的Dataset/DataFrameAPI對(duì)數(shù)據(jù)流進(jìn)行聚合、滑動(dòng)窗口計(jì)算、流式數(shù)據(jù)與離線數(shù)據(jù)的連接等操作。此外,StructuredStreaming通過(guò)使用檢查點(diǎn)和預(yù)寫日志確保數(shù)據(jù)從端到端只被執(zhí)行一次。StructuredStreaming包括微批處理和持續(xù)處理兩種處理模型,默認(rèn)使用微批處理處理模型。在微批處理模型中,StructuredStreaming將輸入數(shù)據(jù)流視為一系列小批次作業(yè)進(jìn)行處理,從而實(shí)現(xiàn)端到端的延遲低至100毫秒。自Spark2.3版本起,引入了持續(xù)處理模型,將端到端的延遲進(jìn)一步降至1毫秒。對(duì)開(kāi)發(fā)使用者來(lái)說(shuō),無(wú)須考慮StructuredStreaming以哪種方式計(jì)算,StructuredStreaming在底層會(huì)自動(dòng)實(shí)現(xiàn)快速、可伸縮、容錯(cuò)等處理。StructuredStreaming概述相比于SparkStreaming,StructuredStreaming優(yōu)點(diǎn)如下。同樣能支持多種數(shù)據(jù)源的輸入和輸出。以結(jié)構(gòu)化的方式操作流式數(shù)據(jù),能夠像使用SparkSQL處理離線的批處理一樣,處理流數(shù)據(jù),代碼更簡(jiǎn)潔,寫法更簡(jiǎn)單。基于Event-Time(事件時(shí)間,指事件發(fā)生的時(shí)間),相比于SparkStreaming的Processing-Time更精確,更符合業(yè)務(wù)場(chǎng)景。解決了SparkStreaming存在的代碼升級(jí),DAG圖變化引起的任務(wù)失敗,無(wú)法斷點(diǎn)續(xù)傳的問(wèn)題。StructuredStreaming概述1StructuredStreaming編程模型目錄StructuredStreaming概述2StructuredStreaming基礎(chǔ)操作3StructuredStreaming編程步驟4StructuredStreaming的核心思想是將實(shí)時(shí)數(shù)據(jù)流抽象成一張無(wú)邊界的表,輸入的每一條數(shù)據(jù)當(dāng)成輸入表的一個(gè)新行,如下圖。數(shù)據(jù)流中的數(shù)據(jù)一行一行地添加到無(wú)界表中,這樣可以將流計(jì)算等同于在一個(gè)靜態(tài)表上的批處理查詢,Spark會(huì)在不斷添加數(shù)據(jù)的無(wú)界輸入表上運(yùn)行計(jì)算,并進(jìn)行增量查詢。StructuredStreaming編程模型在無(wú)界表上對(duì)輸入的數(shù)據(jù)進(jìn)行查詢將生成結(jié)果表,系統(tǒng)每隔一定的周期會(huì)觸發(fā)對(duì)無(wú)界表的計(jì)算和結(jié)果表的更新。最后,將結(jié)果表的結(jié)果寫入到外部存儲(chǔ)介質(zhì)。StructuredStreaming編程模型如下圖,設(shè)定批長(zhǎng)度為1s,每一秒從輸入源讀取數(shù)據(jù)到輸入無(wú)界表,然后觸發(fā)查詢計(jì)算,將結(jié)果寫入結(jié)果表中。第1行是時(shí)間,每秒都會(huì)觸發(fā)一次流計(jì)算。第2行是輸入數(shù)據(jù),對(duì)輸入數(shù)據(jù)執(zhí)行查詢后產(chǎn)生的結(jié)果最終會(huì)被更新到結(jié)果表中。第4行是外部存儲(chǔ),輸出模式是完全模式。StructuredStreaming編程模型1StructuredStreaming編程模型目錄StructuredStreaming概述2StructuredStreaming基礎(chǔ)操作3StructuredStreaming編程步驟4在PySpark,StructuredStreaming基礎(chǔ)操作主要包括:
從輸入源創(chuàng)建流式DataFrame的輸入操作。
根據(jù)業(yè)務(wù)流程對(duì)流式DataFrame進(jìn)行各種轉(zhuǎn)換操作。
將運(yùn)算結(jié)果進(jìn)行輸出操作。
使用窗口聚合操作對(duì)一段時(shí)間內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)運(yùn)算。StructuredStreaming基礎(chǔ)操作1.輸入操作SparkSession.readStream():讀取流數(shù)據(jù)創(chuàng)建流式DataFrame。readStream可使用format()方法定義輸入源,使用option()方法進(jìn)行輸入源的可選參數(shù)設(shè)置,使用load()方法載入數(shù)據(jù)。常見(jiàn)的內(nèi)置輸入源:File源、Kafka源、Socket源和Rate源。StructuredStreaming基礎(chǔ)操作輸入源File源以文件流的形式讀取目錄中寫入的文件。支持的文件格式為text,CSV,JSON,ORC,Parquet。需注意的是,文件必須以原子方式放置在給定的目錄中,這在大多數(shù)文件系統(tǒng)中可以通過(guò)文件移動(dòng)操作實(shí)現(xiàn)。File源的選項(xiàng)(option)包括如下幾個(gè)。path:輸入目錄的路徑,所有格式通用。maxFilesPerTrigger:每個(gè)觸發(fā)器中要處理的最大新文件數(shù)(默認(rèn)無(wú)最大值)。latestFirst:是否首先處理最新的文件,當(dāng)有大量積壓的文件時(shí)很有用,默認(rèn)false。fileNameOnly:是否僅根據(jù)文件名而不是完整路徑檢查新文件,默認(rèn)false。StructuredStreaming基礎(chǔ)操作Socket源從一個(gè)本地或遠(yuǎn)程主機(jī)的某個(gè)端口服務(wù)上讀取數(shù)據(jù),數(shù)據(jù)的編碼為UTF-8。由于Socket源無(wú)法提供端到端的容錯(cuò)保障,一般用于測(cè)試或?qū)W習(xí)。Socket源的選項(xiàng)(option)包括如下幾個(gè)。host:要連接的主機(jī),必須指定。port:要連接的端口,必須指定。StructuredStreaming基礎(chǔ)操作Kafka源是流處理理想的輸入源,因?yàn)樗梢员WC實(shí)時(shí)和容錯(cuò)。通用流程如下。應(yīng)用數(shù)據(jù)輸入-->Kafka-->SparkStreaming-->其他的數(shù)據(jù)庫(kù)Kafka源的選項(xiàng)(option)包括如下幾個(gè)。assign:指定所消費(fèi)的Kafka主題和分區(qū)
。subscribe:訂閱的Kafka主題,為逗號(hào)分隔的主題列表。subscribePattern:訂閱的Kafka主題正則表達(dá)式,可匹配多個(gè)主題。kafka.bootstrap.servers:Kafka服務(wù)器的列表,逗號(hào)分隔的host:port列表。startingOffsets:起始位置偏移量。endingOffsets:結(jié)束位置偏移量。failOnDataLoss:布爾值,表示是否在Kafka數(shù)據(jù)可能丟失時(shí),觸發(fā)流計(jì)算失敗。一般應(yīng)當(dāng)禁止,以免誤報(bào)。StructuredStreaming基礎(chǔ)操作Rate源可每秒生成特定個(gè)數(shù)的數(shù)據(jù)行,每個(gè)數(shù)據(jù)行包括時(shí)間戳和值字段。時(shí)間戳是消息發(fā)送的時(shí)間,值是從開(kāi)始到當(dāng)前消息發(fā)送的總個(gè)數(shù),從0開(kāi)始。Rate源一般用作調(diào)試或性能基準(zhǔn)測(cè)試。Rate源的選項(xiàng)(option)包括如下幾個(gè)。rowsPerSecond:每秒生成多少行數(shù)據(jù),默認(rèn)值為1。rampUpTime:生成速度達(dá)到rowsPerSecond需要多少啟動(dòng)時(shí)間,使用比秒更精細(xì)的粒度將會(huì)被截?cái)酁檎麛?shù)秒,默認(rèn)為0秒。numPartitions:使用的分區(qū)數(shù),默認(rèn)為Spark的默認(rèn)分區(qū)數(shù)。StructuredStreaming基礎(chǔ)操作2.轉(zhuǎn)換操作可在流式DataFrame使用select()、where()、groupBy()等API進(jìn)行類似于在靜態(tài)DataFrame上的查詢、投影、聚合操作,也可在流式DataFrame使用map(),filter(),flatMap()等API進(jìn)行類似于在RDD的操作。不支持讀取前n行這樣的操作,如不支持limit()、take(n)、show()等,也不支持distinct、count()等操作。StructuredStreaming基礎(chǔ)操作一個(gè)流式DataFrame可以和一個(gè)靜態(tài)DataFrame進(jìn)行join()操作,連接方式為Inner(內(nèi)連接)或LeftOuter(左外連接),連接后得到一個(gè)新的流式DataFrame。此外,一個(gè)流式DataFrame也可以和另一個(gè)流式DataFrame以Inner方式進(jìn)行join()操作,連接機(jī)制是通過(guò)追溯被進(jìn)行連接的流式DataFrame已經(jīng)接收到的流數(shù)據(jù)和主動(dòng)進(jìn)行連接的流式DataFrame的當(dāng)前批次進(jìn)行key(鍵值)的配對(duì)。為了避免追溯過(guò)去太久的數(shù)據(jù)造成性能瓶頸,可以通過(guò)設(shè)置watermark(水位線)來(lái)清空過(guò)去太久的歷史數(shù)據(jù)的狀態(tài),數(shù)據(jù)被清空狀態(tài)后將允許不被配對(duì)查詢。另外,sort()操作僅在聚合后在完整輸出模式下支持,不支持兩個(gè)流之間的任何join()操作。StructuredStreaming基礎(chǔ)操作3.輸出操作使用DataFrame的writeStream()方法保存流計(jì)算的結(jié)果。writeStream()有一些可以設(shè)定的選項(xiàng),如使用outputmode()方法配置輸出模式從而控制輸出內(nèi)容,使用format()方法配置輸出接收器類型,使用可選項(xiàng)queryName()標(biāo)識(shí)查詢的名稱,使用可選項(xiàng)triggerinterval()設(shè)定觸發(fā)間隔,在option()中有參數(shù)checkpointlocation,可以設(shè)置檢查點(diǎn)保存輸出的結(jié)果以保證數(shù)據(jù)的可靠性,最后使用start()方法啟動(dòng)流計(jì)算。StructuredStreaming基礎(chǔ)操作outputmode()內(nèi)參數(shù)指定輸出模式,輸出模式用于控制輸出接收器內(nèi)容,主要有3種輸出模式,分別為Append模式、Complete模式和Update模式,且不同類型的流式查詢支持不同的輸出模式。CompleteMode(完全模式):更新后的整個(gè)結(jié)果表將被寫入外部存儲(chǔ)。如何處理整個(gè)表的寫入由輸出接收器決定。AppendMode(追加模式):默認(rèn)模式。自上次觸發(fā)后,只將結(jié)果表中追加的新行寫入輸出接收器。適用結(jié)果表中的現(xiàn)有行不期望被改變的查詢,如select()、where()、map()、flatmap()、filter()、join()等操作支持該模式。UpdateMode(更新模式):自上次觸發(fā)后,只有在結(jié)果表中更新、增加的行才寫入輸出接收器。該模式只輸出自上次觸發(fā)以來(lái)更改的行。如果查詢不包括聚合,該模式等同于追加模式。StructuredStreaming基礎(chǔ)操作format()內(nèi)參數(shù)指定輸出接收器類型。StructuredStreaming內(nèi)置的輸出接收器包括File接收器、Kafka接收器、Console接收器、Memory接收器和Foreach接收器等,其中Console接收器和Memory接收器僅用作測(cè)試。具體說(shuō)明如下。File接收器,將計(jì)算結(jié)果以文件的形式輸出到指定目錄中。默認(rèn)文件格式為Parquet,也支持ORC、JSON和CSV等格式文件。支持的輸出模式:Append選項(xiàng)path:必須指定輸出目錄的路徑。容錯(cuò):是。數(shù)據(jù)只會(huì)被處理一次。StructuredStreaming基礎(chǔ)操作Kafka接收器,將計(jì)算結(jié)果輸出到Kafka的一個(gè)或多個(gè)主題。支持的輸出模式:Append、Complete、Update。選項(xiàng)kafka.bootstrap.servers:Kafka服務(wù)器的列表,逗號(hào)分隔的host:port列表。Topic:主題容錯(cuò):是,數(shù)據(jù)至少被處理一次。StructuredStreaming基礎(chǔ)操作Console接收器,將計(jì)算結(jié)果輸出到控制臺(tái),一般用于小量數(shù)據(jù)的調(diào)試。支持的輸出模式:Append、Complete、Update。選項(xiàng)numRows:每次觸發(fā)后打印多少行,默認(rèn)為20truncate:如果行太長(zhǎng)是否截?cái)?,默認(rèn)為“是”容錯(cuò):否。StructuredStreaming基礎(chǔ)操作Memory接收器,將計(jì)算結(jié)果作為內(nèi)存中的表存儲(chǔ)在內(nèi)存中,也用于小量數(shù)據(jù)的調(diào)試。支持的輸出模式:Append、Complete。選項(xiàng):無(wú)。容錯(cuò):否。StructuredStreaming基礎(chǔ)操作Foreach接收器,參數(shù)是一個(gè)foreach的方法,用戶可以通過(guò)實(shí)現(xiàn)這個(gè)方法實(shí)現(xiàn)一些自定義的功能。支持的輸出模式:Append、Complete、Update。選項(xiàng):無(wú)。容錯(cuò):是,數(shù)據(jù)至少被處理一次。StructuredStreaming基礎(chǔ)操作如果機(jī)器發(fā)生故障導(dǎo)致宕機(jī),可以使用檢查點(diǎn)(checkpoint)恢復(fù)先前查詢的進(jìn)度和狀態(tài),并從中斷處繼續(xù)。配置檢查點(diǎn)后,查詢將保存所有進(jìn)度信息和運(yùn)行聚合到檢查點(diǎn)目錄中。檢查點(diǎn)目錄必須是與HDFS兼容的文件系統(tǒng)中的路徑。StructuredStreaming基礎(chǔ)操作4.窗口聚合操作數(shù)據(jù)產(chǎn)生的時(shí)間也被稱為事件時(shí)間(eventtime),一般嵌入到流數(shù)據(jù)中作為一個(gè)字段。與事件時(shí)間對(duì)應(yīng)的是處理時(shí)間(processingtime),指數(shù)據(jù)被處理的時(shí)間。StructuredStreaming滑動(dòng)窗口聚合操作基于事件時(shí)間,一般使用事件時(shí)間作為窗口切分的依據(jù),例如每秒鐘的成
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2024年行唐縣招教考試備考題庫(kù)及答案解析(奪冠)
- 2025年惠州衛(wèi)生職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)技能測(cè)試題庫(kù)帶答案解析
- 2025年湖北三峽職業(yè)技術(shù)學(xué)院馬克思主義基本原理概論期末考試模擬題含答案解析(奪冠)
- 2024年貴州民族大學(xué)馬克思主義基本原理概論期末考試題含答案解析(奪冠)
- 2025年龍江縣招教考試備考題庫(kù)含答案解析(必刷)
- 2025年惠民縣招教考試備考題庫(kù)及答案解析(奪冠)
- 2025年山西醫(yī)藥學(xué)院馬克思主義基本原理概論期末考試模擬題及答案解析(必刷)
- 2025年江西信息應(yīng)用職業(yè)技術(shù)學(xué)院馬克思主義基本原理概論期末考試模擬題及答案解析(必刷)
- 2025年屏山縣幼兒園教師招教考試備考題庫(kù)帶答案解析(奪冠)
- 2025年陽(yáng)朔縣幼兒園教師招教考試備考題庫(kù)帶答案解析
- 2026年無(wú)錫工藝職業(yè)技術(shù)學(xué)院?jiǎn)握芯C合素質(zhì)考試題庫(kù)附答案解析
- 2026年中考語(yǔ)文一輪復(fù)習(xí)課件:記敘文類閱讀技巧及示例
- 2025腫瘤靶向藥物皮膚不良反應(yīng)管理專家共識(shí)解讀課件
- 腳手架施工安全技術(shù)交底標(biāo)準(zhǔn)模板
- 海姆立克急救課件 (完整版)
- 淘寶主體變更合同范本
- 2025中好建造(安徽)科技有限公司第二次社會(huì)招聘13人筆試歷年參考題庫(kù)附帶答案詳解
- 《交易心理分析》中文
- 護(hù)理創(chuàng)新實(shí)踐與新技術(shù)應(yīng)用
- 2025年海南事業(yè)單位聯(lián)考筆試筆試考題(真題考點(diǎn))及答案
- 2025中國(guó)電信股份有限公司重慶分公司社會(huì)成熟人才招聘筆試考試參考題庫(kù)及答案解析
評(píng)論
0/150
提交評(píng)論