版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
項目6SparkStreaming學(xué)習(xí)目標(biāo)(1)(2)(3)學(xué)習(xí)SparkStreaming的概念。學(xué)習(xí)SparkStreaming的基礎(chǔ)知識。應(yīng)用SparkStreaming進(jìn)行流處理操作。學(xué)習(xí)目標(biāo)任務(wù)1初識SparkStreaming6.1.1SparkStreaming概述SparkStreaming是SparkAPI核心的擴(kuò)展內(nèi)容,除了SparkAPI內(nèi)部的API和函數(shù),Spark項目還包含另一個主要的子項目SparkStreaming。它是一套并發(fā)流處理庫,可實(shí)現(xiàn)對實(shí)時數(shù)據(jù)流的可擴(kuò)展、高吞吐量和容錯流處理。數(shù)據(jù)流是一種連續(xù)的數(shù)據(jù)記錄。與SparkStreaming函數(shù)或處理相關(guān)的輸入數(shù)據(jù)流可以來自不同的消息隊列系統(tǒng),如Kafka、Flume、Kinesis、ZeroMQ等,并且輸入數(shù)據(jù)也可以來自傳統(tǒng)的TCP套接字。看這里!6.1.1SparkStreaming概述從消息隊列中獲取數(shù)據(jù)流以后,可以使用高級復(fù)雜的算法進(jìn)行處理,這些算法函數(shù)包括map()、reduce()、join()、windows()等,這些算法通常應(yīng)用于機(jī)器學(xué)習(xí)、圖像處理算法等領(lǐng)域。最后,處理后的數(shù)據(jù)可以存儲到文件系統(tǒng)(HDFS)、數(shù)據(jù)庫(Databases)和實(shí)時儀表板(Dashboards)中,如圖6-1所示。圖6-1SparkStreaming的輸入與輸出6.1.2SparkStreaming的運(yùn)行原理一般情況下,處理數(shù)據(jù)流有兩種通用的方法:單獨(dú)處理每條記錄,并在記錄出現(xiàn)時就立刻處理;第1種根據(jù)記錄數(shù)量或時間長度將多個記錄組合切分為小批量任務(wù)。SparkStreaming使用的是第2種方法。第2種6.1.2SparkStreaming的運(yùn)行原理在SparkStreaming內(nèi)部工作機(jī)制中,SparkStreaming接收實(shí)時輸入數(shù)據(jù)流,并將其分成批量輸入數(shù)據(jù)任務(wù),然后由Spark引擎處理,輸入數(shù)據(jù)批生成最終批量數(shù)據(jù)流,這些批量數(shù)據(jù)流是指一個小批的作業(yè)序列,每個小批量作業(yè)表示為SparkRDD,在內(nèi)部DStream表示為一系列SparkRDD,如圖6-2所示。圖6-2SparkStreaming數(shù)據(jù)處理過程6.1.2SparkStreaming的運(yùn)行原理圖6-3SparkStreaming高層次架構(gòu)SparkStreaming使用“微批次”的架構(gòu),把流式計算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待。SparkStreaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時間間隔創(chuàng)建出來。在每個時間區(qū)間開始時,一個新的批次就被創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會被添加到這個批次中。在時間區(qū)間結(jié)束時,批次停止增長。處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng),如圖6-3所示。6.1.3SparkStreaming快速體驗案例實(shí)訓(xùn)方法的步驟大概分為以下幾步。(1)(2)(3)定義輸入源。創(chuàng)建StreamingContext對象,這是流處理的關(guān)鍵點(diǎn)。指定流計算需要的指令(間隔時間,劃分RDD規(guī)則,處理數(shù)據(jù)規(guī)則)。(4)調(diào)用StreamingContext的start()方法開始接收并處理數(shù)據(jù)。(5)處理過程會一直持續(xù)到StreamingContext調(diào)用stop()方法。6.1.3SparkStreaming快速體驗案例(1)創(chuàng)建StreamingContext對象在以下代碼中,對于輸入數(shù)據(jù)流使用兩個執(zhí)行線程,并且輸入數(shù)據(jù)流進(jìn)行時間間隔為1秒的批處理,從而創(chuàng)建一個StreamingContext,將它作為流處理的關(guān)鍵入口點(diǎn)。創(chuàng)建StreamingContext時需要傳入SparkConf(Spark配置對象)和批處理間隔時長,也可以使用已經(jīng)創(chuàng)建好的SparkContext生成新StreamingContext對象。importorg.apache.spark._importorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._//notnecessarysinceSpark1.3//配置Spark參數(shù),設(shè)置使用Master主節(jié)點(diǎn),配置兩個核心valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))6.1.3SparkStreaming快速體驗案例(2)創(chuàng)建用來表示TCP套接字輸入的數(shù)據(jù)流DStream對象,數(shù)據(jù)流是連接localhost:9999獲取的。上述代碼已經(jīng)創(chuàng)建好了StreamingContext,StreamingContext是用來提供從輸入源創(chuàng)建DStream的方法的切入點(diǎn),然后使用StreamingContext對象的socketTextStream()方法獲得指定網(wǎng)絡(luò)源的輸入流。vallines=ssc.socketTextStream("localhost",9999)6.1.3SparkStreaming快速體驗案例(2)創(chuàng)建用來表示TCP套接字輸入的數(shù)據(jù)流DStream對象,數(shù)據(jù)流是連接localhost:9999獲取的。提示?。ocketTextStream()方法用來從指定的網(wǎng)絡(luò)源上創(chuàng)建一個輸入流,官方API給出的解釋如圖6-4所示。圖6-4socketTextStream()方法文檔6.1.3SparkStreaming快速體驗案例(3)上述代碼中的lines可以理解為從數(shù)據(jù)服務(wù)器接收到的數(shù)據(jù)流,但是這個數(shù)據(jù)流中有很多條數(shù)據(jù),每一條數(shù)據(jù)都是一行文本,然后為了處理方便,使用flatmap操作將ReceiverInputDStream轉(zhuǎn)換為可以處理單詞的另一種數(shù)據(jù)形式DStream[U]。flatmap是一種DStream操作,很多DStream可以通過使用flatmap來生成多個新數(shù)據(jù),以此來創(chuàng)建新的DStream。轉(zhuǎn)換的方法是使用flatMap()方法,“_”表示所有內(nèi)容,按照空格分割為單詞,從而生成新的DStream[U],代碼如下。valwords=lines.flatMap(_.split(""))6.1.3SparkStreaming快速體驗案例(4)輸入數(shù)據(jù)流已經(jīng)被分離成以單詞為單位的離散流,與Spark核心部分中RDD提取的操作接口類似,使用map和reduceByKey操作將word進(jìn)一步映射為(word,1)形式的DStream,然后統(tǒng)計單詞的個數(shù)并將其輸出。//計算每一批數(shù)據(jù)的單詞數(shù)量valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()6.1.3SparkStreaming快速體驗案例(5)至此,代碼編寫成功。為了能夠在Spark環(huán)境中運(yùn)行代碼,需要對該代碼進(jìn)行編譯打包工作。常用的編譯打包工具包括SBT、Maven等。這里使用SBT將上述編寫好的Scala代碼進(jìn)行編譯打包操作,詳細(xì)操作如下:單擊Terminal按鈕,然后進(jìn)入IDEA的Terminal界面,在該界面中使用SBT工具的基本命令“sbtpackage”,將代碼編譯成jar包。第一次編譯的過程會比較慢,如圖6-5所示。圖6-5使用SBT編譯jar包6.1.3SparkStreaming快速體驗案例圖6-6編譯成功當(dāng)出現(xiàn)success的字樣后,說明編譯成功,同時提示“packagingC:\\Users\\an\\......\\target\\scala-2.11\\wordcount_2.11-0.1.jar”,這說明已經(jīng)在該目錄下編譯出了jar包,可以在該目錄下查看是否有wordcount_2.11-0.1.jar包,如圖6-6和圖6-7所示。圖6-7編譯好的jar包6.1.3SparkStreaming快速體驗案例(6)在Spark環(huán)境下提交運(yùn)行編譯好的jar包。首先,使用netcat作為數(shù)據(jù)服務(wù)器運(yùn)行,運(yùn)行的命令為“nc-lk9999”。然后,打開另一個終端窗口,將SBT打包好的jar包復(fù)制到指定的目錄下,這里復(fù)制到/home/workspace下,然后,使用Spark環(huán)境提供的spark-submit腳本進(jìn)行提交操作,Spark腳本在目錄“/usr/spark/spark-2.4.0-bin-hadoop2.7/bin”下。6.1.3SparkStreaming快速體驗案例使用以下命令將應(yīng)用進(jìn)行提交運(yùn)行,同時可以看到終端界面在不斷地更新RDD,如果這段時間沒有輸入內(nèi)容,那么輸出就為空。./spark-submit\--class"SparkTest"\--masterlocal[2]/home/workspace/com.spark-1.0.jar在netcat數(shù)據(jù)服務(wù)器界面輸入“helloworldhellotom”,在運(yùn)行代碼的終端界面可以看到出現(xiàn)了單詞的統(tǒng)計數(shù)量,如圖6-8和圖6-9所示。圖6-8數(shù)據(jù)服務(wù)器終端輸入圖6-9Spark統(tǒng)計界面輸出任務(wù)2理解SparkStreaming6.2.1DStream簡介DiscretizedStream(離散流)或DStream是SparkStreaming對流式數(shù)據(jù)的基本抽象。它表示連續(xù)的數(shù)據(jù)流,這些連續(xù)的數(shù)據(jù)流可以是從數(shù)據(jù)源接收的輸入數(shù)據(jù)流,也可以是通過對輸入數(shù)據(jù)流執(zhí)行轉(zhuǎn)換操作而生成的經(jīng)過處理的數(shù)據(jù)流。它是一串RDD序列,每個RDD代表數(shù)據(jù)流中的一個時間片內(nèi)的數(shù)據(jù)。如圖6-10所示,DStream中的每個RDD都包含一定時間間隔內(nèi)的數(shù)據(jù),時間間隔的大小是由批處理間隔這個參數(shù)決定的。批處理間隔一般為500ms到幾秒,由應(yīng)用開發(fā)者配置。每個輸入批次都形成一個RDD,以Spark作業(yè)的方式處理并生成其他RDD。圖6-10DStream(一個持續(xù)的RDD序列)6.2.1DStream簡介可以從外部輸入源創(chuàng)建DStream,也可以對其他DStream應(yīng)用進(jìn)行轉(zhuǎn)化操作得到新的DStream。DStream支持許多Spark的RDD支持的轉(zhuǎn)化操作。這些底層RDD轉(zhuǎn)換由Spark引擎計算。DStream操作隱藏了大部分細(xì)節(jié),并為開發(fā)人員提供了更高級別的API以方便使用,如圖6-11所示。圖6-11DSteam轉(zhuǎn)換圖6.2.2DStream接收輸入源方法SparkStreaming提供了兩類內(nèi)置流媒體源。StreamingContext中提供了可對接的API,通過調(diào)用API可以直接讀取數(shù)據(jù)。例如,文件系統(tǒng)和TCP套接字連接。(1)基本來源:在未提供API的情況下讀取資源。例如,Kafka、Flume、Kinesis等資源可通過額外的實(shí)用程序獲得。(2)高級資源:6.2.2DStream接收輸入源方法對于已經(jīng)提供了API的基本輸入源,其常用接收函數(shù)如表6-1所示。表6-1輸入源常用接收函數(shù)6.2.2DStream接收輸入源方法SparkStreaming會監(jiān)控dataDirectory目錄,并處理該目錄下生成的任何文件,但是需要注意以下幾點(diǎn)。當(dāng)文件進(jìn)入目錄中,該文件將不會發(fā)生改變,對于窗口文件中的更改將不會重新讀取文件,會忽略更新。該目錄下所有的文件必須使用相同的數(shù)據(jù)格式。(1)(3)(2)若使用通配符來標(biāo)識目錄(如hdfs://namenode:8040/logs/2016-*),重命名整個目錄以匹配路徑,則會將該目錄添加到受監(jiān)視目錄列表中。6.2.3DStream轉(zhuǎn)換操作DStream支持很多轉(zhuǎn)換操作,它可以對RDD進(jìn)行各種轉(zhuǎn)換,因為DStream是由RDD組成的,SparkStreaming提供了可以在DSteam上使用的轉(zhuǎn)換集合,這些轉(zhuǎn)換操作應(yīng)用于DStream中的每個RDD的每個元素上,這些常用的轉(zhuǎn)換操作見表6-2。表6-2DStream常用轉(zhuǎn)換函數(shù)6.2.3DStream轉(zhuǎn)換操作表6-2DStream常用轉(zhuǎn)換函數(shù)6.2.3DStream轉(zhuǎn)換操作updateStateByKey操作允許在使用新信息不斷更新時保持任意狀態(tài)。要使用它,必須執(zhí)行以下兩個步驟。(1)定義狀態(tài),狀態(tài)可以是任意的數(shù)據(jù)類型。(2)定義狀態(tài)更新函數(shù),用一個函數(shù)指定如何使用先前的狀態(tài)和從輸入流中的新值來更新狀態(tài)。6.2.4DStream窗口操作滑動窗口轉(zhuǎn)換操作是允許在滑動數(shù)據(jù)窗口范圍內(nèi)進(jìn)行應(yīng)用轉(zhuǎn)換,而脫離了全部數(shù)據(jù)流的轉(zhuǎn)換。滑動窗口轉(zhuǎn)換的計算過程如圖6-12所示,任何窗口操作都需要指定如下兩個參數(shù)。圖6-12滑動窗口轉(zhuǎn)換的計算過程(1)windowLength:窗口時間間隔(窗口的持續(xù)時間),圖6-12中的窗口長度為3。(2)slidInterval:滑動時間間隔(每隔多長時間執(zhí)行一次計算),圖6-12中的滑動時間間隔為2。6.2.4DStream窗口操作一些常見的窗口操作見表6-3,所有這些操作的參數(shù)都包括windowLength和slideInterval。官方API文檔中提供了DStream轉(zhuǎn)換的完整操作。表6-3一些常見的窗口操作6.2.5DStream輸出操作DStream輸出操作允許數(shù)據(jù)被輸出到外部系統(tǒng)。例如,數(shù)據(jù)庫或文件系統(tǒng)。由于輸出操作實(shí)際上是轉(zhuǎn)換后的數(shù)據(jù),因此它們會觸發(fā)所有DStream轉(zhuǎn)換的實(shí)際執(zhí)行(類似于RDD的操作)。目前,定義了表6-4所示的輸出操作。表6-4輸出操作表任務(wù)3SparkStreaming實(shí)戰(zhàn)6.3.1統(tǒng)計本地文本單詞個數(shù)本實(shí)訓(xùn)是實(shí)時統(tǒng)計指定目錄下文本文件中的單詞個數(shù),我們指定在/tmp/test目錄下每隔15秒鐘統(tǒng)計一次新增加的文本中的單詞數(shù)量。該實(shí)訓(xùn)內(nèi)容主要應(yīng)用于Internet中日志的統(tǒng)計、網(wǎng)站流量的統(tǒng)計等功能。實(shí)訓(xùn)的步驟大致如下?!な褂肐DEA工具創(chuàng)建名為WordCount的Scala文件,主要完成文本文件的單詞個數(shù)統(tǒng)計功能?!ざxmain()方法。·創(chuàng)建StreamingContext對象,這是流處理關(guān)鍵點(diǎn)?!ざx輸入源?!ぶ付鬓D(zhuǎn)換需要的指令,并做出相應(yīng)處理(間隔時間,劃分RDD規(guī)則,處理數(shù)據(jù)規(guī)則)。·調(diào)用StreamingContext的start()方法開始接收并處理數(shù)據(jù)。·處理過程會一直持續(xù)到StreamingContext調(diào)用stop()方法。6.3.1統(tǒng)計本地文本單詞個數(shù)(1)使用IDEA工具創(chuàng)建名為WordCount的Scala文件,具體操作為:右擊scala文件夾,在彈出的快捷菜單中執(zhí)行New→ScalaClass命令,新建一個Scala類文件,名稱為WordCount,如圖6-13所示。圖6-13新建一個Scala類文件6.3.1統(tǒng)計本地文本單詞個數(shù)(2)編輯Scala代碼,創(chuàng)建StreamingContext對象。StreamingContext對象是SparkStreaming應(yīng)用程序與集群進(jìn)行交互的唯一入口,它封裝了Spark集群的環(huán)境信息和應(yīng)用程序的一些屬性信息。在創(chuàng)建StreamingContext對象之前,首先創(chuàng)建sparkConf,也就是Spark的配置文件,該對象通常需要指明應(yīng)用程序的運(yùn)行模式(本例中使用local[3])、設(shè)定應(yīng)用程序名稱(本例中為WordCount)、設(shè)定批處理時間間隔[本例中設(shè)定Seconds(15),即15秒],其中批處理時間間隔需要根據(jù)用戶的需求和集群的處理能力進(jìn)行適當(dāng)?shù)脑O(shè)置。6.3.1統(tǒng)計本地文本單詞個數(shù)(3)定義輸入源,SparkStream需要根據(jù)源類型選擇相應(yīng)的創(chuàng)建DStream的方法,處理輸入源的方法有很多,這里使用textFileStream方法,表示從本地系統(tǒng)的/tmp/test目錄下讀取文件,一旦這個目錄下有新的文件進(jìn)來,StreamingContext就會讀入這個文件并生成名為line_dstream的RDD。6.3.1統(tǒng)計本地文本單詞個數(shù)(4)使用SBT對代碼進(jìn)行打包,單擊Terminal按鈕,然后進(jìn)入IDEA的Terminal界面,在該界面中使用SBT工具的基本命令“sbtpackage”,將代碼編譯成jar包。第一次編譯過程會比較慢,見圖6-5。
當(dāng)出現(xiàn)“success”的字樣后,就說明編譯成功,同時提示“packagingC:\\Users\\an\\......\\target\\scala-2.11\\wordcount_2.11-0.1.jar”,這說明已經(jīng)在該目錄下編譯出了jar包,可以在該目錄下查看是否有wordcount_2.11-0.1.jar包,見圖6-6和圖6-7。6.3.1統(tǒng)計本地文本單詞個數(shù)(5)將編譯好的jar包復(fù)制到Spark主節(jié)點(diǎn)下指定的目錄下,然后執(zhí)行命令“./spark-submit--class"WordCount"/home/workspace/wordcount_2.11-0.1.jar”,該命令可以填寫除了class外的很多參數(shù),具體的參數(shù)使用情況,可以參考官方文檔中使用spark-submit啟動應(yīng)用程序部分的內(nèi)容:/docs/latest/submitting-applications.html。運(yùn)行結(jié)果如下。6.3.1統(tǒng)計本地文本單詞個數(shù)(6)這里沒有顯示結(jié)果就是因為代碼中監(jiān)控的文件夾下沒有文件,在/tmp/路徑下新建一個名為words.txt的文件,該文件內(nèi)容由多個單詞組成,中間由空格分割,如圖6-14所示。然后使用命令“mvwords.txt/tmp/test/”將words.txt文件移動到監(jiān)控的文件目錄下,然后發(fā)現(xiàn)監(jiān)控的結(jié)果顯示出了數(shù)據(jù),運(yùn)行結(jié)果如下。圖6-14文本words.txt的內(nèi)容
通過上述實(shí)例可以看出,該程序的確監(jiān)控了/tmp/test目錄下的文件情況,如果在之前的基礎(chǔ)上再將一個文件復(fù)制到監(jiān)控目錄下,那么新的運(yùn)行結(jié)果如下。-------------------------------------Time:1553072028000ms-------------------------------------(c,5)(d,5)(a,5)(b,5)6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)本實(shí)訓(xùn)的目的是了解檢查點(diǎn)的設(shè)置、初始化RDD的使用及自定義實(shí)現(xiàn)累計的函數(shù)function()。實(shí)訓(xùn)步驟大致如下?!な褂肐DEA工具創(chuàng)建名為CumulativeWord的Scala文件;定義main()方法;創(chuàng)建StreamingContext對象,這是流處理關(guān)鍵點(diǎn)?!ぴO(shè)置檢查點(diǎn)checkpoint?!こ跏蓟疪DD、定義輸入源、定義分割方式?!ぷ远x累加函數(shù)mappingFunc()?!な褂胢apWithState()方法更新pairDStream?!ふ{(diào)用StreamingContext的start()方法開始接收并處理數(shù)據(jù)?!ぬ幚磉^程會一直持續(xù)到StreamingContext調(diào)用stop()方法。6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)(1)使用IDEA工具創(chuàng)建名為CumulativeWord的Scala文件,具體操作為:右擊scala文件夾,在彈出的快捷菜單中執(zhí)行New→ScalaClass命令,新建一個Scala類文件,名為CumulativeWord。編輯Scala代碼,首先創(chuàng)建SparkConf(Spark的配置文件),該對象通常需要指明應(yīng)用程序的運(yùn)行模式(本例中使用local[2])、設(shè)定應(yīng)用程序名稱(本例中為NetworkWordCount)、設(shè)定批處理時間間隔[本例中設(shè)定Seconds(1),即1秒]。然后,創(chuàng)建StreamingContext對象。6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)具體代碼如下。importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,State,StateSpec,StreamingContext}objectCumulativeWord{defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))//setcheckpoint6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)(2)因為StreamingContext需要保存之前的信息,也就是有狀態(tài)的,所以需要設(shè)置檢查點(diǎn)checkpoint,檢查點(diǎn)是為了保證流應(yīng)用程序全天運(yùn)行,當(dāng)設(shè)備出現(xiàn)故障或崩潰時防止數(shù)據(jù)丟失,為了提高其容錯性,能夠方便地將它從故障中恢復(fù)。檢查點(diǎn)分為以下兩種類型。①元數(shù)據(jù)檢查點(diǎn):將定義流式計算的信息保存到容錯存儲(如HDFS)中,元數(shù)據(jù)包括:配置SparkConf(用于創(chuàng)建流的配置)、DStream操作(定義DStream的操作集)和不完整的批次(未完成的批次)。②數(shù)據(jù)檢查點(diǎn):將生成的RDD保存到存儲中。在RDD的轉(zhuǎn)換過程中,如果生成的RDD依賴于之前批次的RDD,就會導(dǎo)致恢復(fù)故障的時間增加,為了解決這樣的問題,將之前批次的RDD周期性地保存到存儲中,從而提高故障恢復(fù)速度。6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)通過設(shè)置目錄來啟動檢查點(diǎn),檢查點(diǎn)信息保存到該文件系統(tǒng)中,使用streamingContext.checkpoint(checkpointDirectory)代碼,其中參數(shù)checkpointDirectory為檢查點(diǎn)的目錄。設(shè)置檢查點(diǎn)的具體代碼如下。ssc.checkpoint(".")6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)(3)初始化RDD為list類型,list類型中是鍵值對形式,其中包含一個元素為("hello",0),創(chuàng)建用來表示TCP套接字輸入的數(shù)據(jù)流DStream對象,數(shù)據(jù)流是從localhost:9999獲取的,定義DStream分割條件為按照空格進(jìn)行分割,將分割的單詞設(shè)置為(word,1)的形式。//InitialstateRDDformapWithStateoperationvalinitialRDD=ssc.sparkContext.parallelize(List(("hello",0)))//CreateaDStreamthatwillconnecttohostname:port,likelocalhost:9999vallines=ssc.socketTextStream("localhost",9999)//Spliteachlineintowordsvalwords=lines.flatMap(_.split(""))//Counteachwordineachbatchvalpairs=words.map(word=>(word,1))6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)(4)自定義函數(shù)mappingFunc(),該函數(shù)將作為參數(shù)傳入pairDStream的mapWithState()方法中。mappingFunc的主要作用是將每次輸入保存到state中,將之前的輸入值與目前的輸入值相加,然后更新state。//UpdatethecumulativecountusingmapWithState//ThiswillgiveaDStreammadeofstate(whichisthecumulativecountofthewords)valmappingFunc=(word:String,one:Option[Int],state:State[Int])=>{valsum=one.getOrElse(0)+state.getOption.getOrElse(0)valoutput=(word,sum)state.update(sum)output}6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)(5)使用pairDStream中的mapWithState方法更新pairDStream,該方法需要一個參數(shù):StateSpec對象,為了創(chuàng)建該對象,傳入之前定義好的mappingFunc和initialRDD。如果對該方法的使用仍然存在疑惑,請參考源碼或官方文檔。valwordCounts=pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))//PrintthefirsttenelementsofeachRDDgeneratedinthisDStreamtotheconsolewordCounts.print()ssc.start()//Startthecomputationssc.awaitTermination()}}6.3.2有狀態(tài)操作累計統(tǒng)計單詞個數(shù)(6)使用SBT進(jìn)行打包,然后執(zhí)行命令“./spark-submit--class"CumulativeWord"/home/workspace/cumulativewords_2.11-0.1.jar”運(yùn)行該jar包,操作流程請參考6.1.3,連續(xù)輸入3個“helloworld”,如圖6-15所示。圖6-15輸入內(nèi)容6.3.3windows劃窗統(tǒng)計熱搜詞我們使用windows劃窗技術(shù),模擬熱點(diǎn)搜索詞滑動統(tǒng)計,每隔10秒,統(tǒng)計最近60秒的搜索詞的搜索頻次,并打印出排名最靠前的3個搜索詞及出現(xiàn)次數(shù)。實(shí)訓(xùn)方法的步驟大致如下?!な褂肐DEA工具創(chuàng)建名為WindowsWordNum的Scala文件;定義main()方法;創(chuàng)建StreamingContext對象,這是流處理關(guān)鍵點(diǎn)?!こ跏蓟疪DD、定義輸入源、定義分割方式?!ざx劃窗操作函數(shù)及其兩個重要參數(shù):窗口長度和滑動間隔?!澊昂瘮?shù)操作后的結(jié)果進(jìn)行組織排序,得到熱度前3的單詞?!ご蛴〗Y(jié)果且處理過程會一直持續(xù)到StreamingContext調(diào)用stop()方法。6.3.3windows劃窗統(tǒng)計熱搜詞(1)使用IDEA工具創(chuàng)建名為WindowsWordNum的Scala文件,具體操作為:右擊scala文件夾,在彈出的快捷菜單中執(zhí)行New→ScalaClass命令,新建一個Scala類文件,名為WindowsWordNum。編輯Scala代碼,首先創(chuàng)建SparkConf(Spark的配置文件),該對象通常需要指明應(yīng)用程序的運(yùn)行模式(本例中使用local[2])、設(shè)定應(yīng)用程序名稱(本例中為WindowsWordNum)、設(shè)定批處理時間間隔[本例中設(shè)定Seconds(5),即5秒]。然后,創(chuàng)建StreamingContext對象。具體代碼如下。6.3.3windows劃窗統(tǒng)計熱搜詞(2)創(chuàng)建用來表示TCP套接字輸入的數(shù)據(jù)流DStream對象,設(shè)置數(shù)據(jù)流是通過本機(jī)的9999端口進(jìn)行傳輸(localhost:9999),然后將接收到的數(shù)據(jù)集分割成DStream,首先按照空格將所有的搜索數(shù)據(jù)分割成單詞,然后將單詞以(word,1)的形式進(jìn)行轉(zhuǎn)換。valsearchLogsDStream=ssc.socketTextStream("locahost",9999)valsearchWordsDStream=searchLogsDStream.map{searchLog=>searchLog.split("")(1)}valsearchWordPairDStream=searchWordsDStream.map{searchWord=>(searchWord,1)}6.3.3windows劃窗統(tǒng)計熱搜詞(3)使用DStream中的劃窗操作,將DStream進(jìn)行轉(zhuǎn)換,這里對方法reduceByKeyAndWindow的參數(shù)進(jìn)行詳細(xì)的解析。reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])。①func:窗口內(nèi)值變換函數(shù),該實(shí)踐中的方法是(v1:Int,v2:Int)=>v1+v2,意思是將兩個值進(jìn)行相加。②invFunc:反函數(shù),這里省略。③windowLength:窗口長度,該實(shí)踐中長度為60s。④slideInterval:滑動間隔,該實(shí)踐中長度為10s。6.3.3windows劃窗統(tǒng)計熱搜詞(4)使用DStream的transform轉(zhuǎn)換操作,將之前統(tǒng)計的searchWordCountsDStream中的RDD進(jìn)行轉(zhuǎn)換,轉(zhuǎn)換為一種單詞的統(tǒng)計數(shù)量在前、單詞在后的形式,然后取得統(tǒng)計數(shù)量在前3名的熱搜單詞數(shù)。valfinalDStream=searchWordCountsDStream.transform(searchWordCountsRDD=>{valcountSearchWordsRDD=sea
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 司法鑒定所財務(wù)制度
- 科創(chuàng)板對財務(wù)制度
- 食品會計財務(wù)制度
- 小微廠財務(wù)制度
- 農(nóng)家書屋三個制度
- 公路工程施工監(jiān)理招標(biāo)投標(biāo)制度
- 企業(yè)設(shè)備質(zhì)量管理制度(3篇)
- 國貿(mào)理發(fā)活動策劃方案(3篇)
- 2026江西九江市田家炳實(shí)驗中學(xué)臨聘教師招聘2人備考題庫有完整答案詳解
- 2026山東泰安市屬事業(yè)單位初級綜合類崗位招聘備考題庫及答案詳解(奪冠系列)
- 車輛工程系畢業(yè)論文
- 500萬的咨詢合同范本
- 七年級語文文言文閱讀理解專項訓(xùn)練
- 中藥熱熨敷技術(shù)及操作流程圖
- 臨床提高吸入劑使用正確率品管圈成果匯報
- 娛樂場所安全管理規(guī)定與措施
- 電影項目可行性分析報告(模板參考范文)
- 老年協(xié)會會員管理制度
- LLJ-4A車輪第四種檢查器
- 大索道竣工結(jié)算決算復(fù)審報告審核報告模板
- 2025年南充市中考理科綜合試卷真題(含標(biāo)準(zhǔn)答案)
評論
0/150
提交評論