版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領
文檔簡介
實驗十八Spark實驗:SparkSparkStreaming版本的WordCountMapReduce版本的WordCountSparkStreamingSparkStreamingjar包程序,能正確的計SparkStreaming計算流程:SparkStreaming是將流式計算分解成一系列短小的批處理作業(yè)。這里的批段一段的數(shù)據(jù)(DiscretizedStream,每一段數(shù)據(jù)都轉(zhuǎn)換成Spark中的RDD(ResilientDatasetSparkRDDTransformationRDD經(jīng)過操作變成中間結(jié)果保存在內(nèi)存中。整個流式計算根據(jù)業(yè)務的需求可以對中間的結(jié)果進行疊加,或者到外部設備。如圖18-1所示:18-k中RDD的RDD都是一個不可變的分布式可重算的數(shù)據(jù)集,其記錄著確定性的操作繼承關系(lineage,所以只要輸入數(shù)據(jù)是可容錯的,那么任意一個RDD的分區(qū)對于SparkStreaming來說,其RDD的關系如下圖所示,圖中的每一個橢圓形表RDDRDDPartition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream,而每一行最后一個RDD則表示每一個BatchSizeRDDRDDlineage相SparkStreamingHDFS(多份拷貝)或是來自于網(wǎng)絡的數(shù)據(jù)流(SparkStreaming會將網(wǎng)絡輸入數(shù)據(jù)的每一個數(shù)據(jù)流拷貝兩份到其他的機器)RDDPartition出錯,都可以并行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續(xù)計算模型(如Storm)的效率更18-2所示:18-實時性:對于實時性的討論,會牽涉到流式處理框架的應用場景。SparkStreaming將SparkJobSparkDAG圖分解,以及Spark的任務集的調(diào)度過程。對于目前版本的SparkStreaming而言,其最小的BatchSize的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右,所以SparkStreaming能夠滿足除對實時性要求非常高(如高頻實時)之外的所有流式準實時計算場景。擴展性與吞吐量:Spark目前在EC2上已能夠線性擴展到100個節(jié)點(每個節(jié)點4Corerecords/sStorm2~5倍,圖4是Berkeley利用WordCountGrep兩個用例所做的測試,在Grep這個測試中,SparkStreaming670krecords/sStorm115krecords/s18-3所示:18-SparkStreamingSparkStreamingSpark的編程如出一轍,對于編程的理解也非常類似。對于Spark來說,編程就是對于RDD的操作;而對于SparkStreaming來說,就是對DStream的WordCountSparkStreaming中的輸入操valssc=newStreamingContext("Spark://…","WordCount",Seconds(1),[Homes],SparkStreamingDStreamSparkStreaming進行初始化生成StreamingContext。參數(shù)中比較重要的是第一個和第三個,第一個參數(shù)是指定valssc=newStreamingContext("Spark://…","WordCount",Seconds(1),[Homes],SparkStreamingSparkStreaming已支持了豐富的輸入接口,大致分為兩類:一類是磁盤輸入,如以batchsize作為時間間隔HDFS文件系統(tǒng)的某個,將中內(nèi)容的變化作為SparkStreaming的輸入;另一類就是網(wǎng)絡流的方式,目前支持Kafka、Flume、和TCPsocket。在WordCount例子中,假定通過網(wǎng)絡socket作為輸入流,某個特定的端口,最后得出輸入DStream(linesvallinesvallines=valwords=lines.flatMap(_.split("valwordCounts=wordsmap(x=>(x,1))reduceByKey(_+SparkStreamingSparkRDD的操作極為類似,SparkStreaming也就是DStreamvalwords=lines.flatMap(_.split("valwordCounts=wordsmap(x=>(x,1))reduceByKey(_+另外,SparkStreaming有特定的窗口操作,窗口操作涉及兩個參數(shù):一個是滑動窗口DurationDuration是batchsize的倍數(shù)。例如以過去5秒鐘為一個輸入窗口,每1秒統(tǒng)計一下WordCount,那么我們會將過去5秒鐘的每一秒鐘的WordCount都進行統(tǒng)計,然后進行疊加,得出這個窗valvalwordCounts=wordsmap(x=>(x,1)).reduceByKeyAndWindow(_+_,但上面這種方式還不夠高效。如果我們以增量的方式來計算就更加高效,例如,計算t+45秒窗口的ordCount,那么我們可以將t+35上[t+3,t+4]的統(tǒng)計量,在減去t2,t1]的統(tǒng)計量,這種方法可以復用中間三秒的統(tǒng)計量,84所示:valvalwordCounts=wordsmap(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,18-wordCounts=SparkStreaming的輸出操作:對于輸出操作,Spark提供了將數(shù)據(jù)打印到屏幕及輸入到文件中。在WordCount中DStreamwordCounts輸入到wordCounts=SparkStreaming啟動:經(jīng)過上述的操作,SparkStreaming還沒有進行工作,我們還需要調(diào)用Start操作,SparkStreaming才開始相應的端口,然后收取數(shù)據(jù),并進行統(tǒng)計SparkStreaming在互聯(lián)網(wǎng)應用中,流量統(tǒng)計作為一種常用的應用模式,需要在不同粒度上對不同數(shù)據(jù)進行統(tǒng)計,既有實時,又需要涉及到聚合、去重、連接等較為復雜的統(tǒng)計需求。傳統(tǒng)上,若是使用HadoopMapReduce框架,雖然可以容易地實現(xiàn)較為復雜的統(tǒng)計需求,但實時性卻無法得到保證;反之若是采用Storm這樣的流式框架,實時性雖可以得到保證,但需求的實現(xiàn)復雜度也大大提高了。SparkStreaming在兩者之間找到了一個平衡點,KafkaSparkStreaming搭建實時流量統(tǒng)計框架。數(shù)據(jù)暫存:Kafka作為分布式消息隊列,既有非常優(yōu)秀的吞吐量,又有較高的可靠性Kafka作為日志傳遞中間件來接收日志,抓取客戶端發(fā)送的流量日SparkStreamingSparkStreaming集群。SparkStreamingKafka集群對接,SparkStreamingKafka集群中獲取流量日志并進行處理。SparkStreamingKafka集群中獲取數(shù)據(jù)并將其存batch窗口到來時,便對這些數(shù)據(jù)進行處理。結(jié)果:為了便于前端展示和頁面請求,處理得到的結(jié)果將寫入到數(shù)據(jù)庫中相比于傳統(tǒng)的處理框架,Kafka+SparkStreaming的架構(gòu)有以下幾個優(yōu)點。Spark框架SparkStreamingSparkAPI和高靈活性,可以精簡地寫出較為復雜的算法。編程模型的高度一致使得上手SparkStreaming相當容易,同時也可以保證業(yè)務邏輯在實時處理和批處理上的復用。SparkStreaming提供了一套高效、可容錯的準實時大規(guī)模流式處理框架,它能和批處理及即時查詢放在同一個軟件棧中。如果你學會了Spark編程,那么也就學會了SparkStreaming編程,如果理解了Spark的調(diào)度和,SparkStreaming也類似。按照目前的發(fā)展趨勢,SparkStreaming一定將會得到更大范圍的使用。登錄大數(shù)據(jù)實驗,創(chuàng)建Spark集群,并點擊搭建Spark集群按鈕,等待按鈕后18-5所示:18-使用jpsHadoopSparkHadoopjpsjava[root@master[root@master~]#jps3711NameNode4174395747384635打開InliJIDEA準備編寫 ing代碼點擊File->New->Module->Maven->Next–>輸入GroupId和AriifactId->Next->輸入Modulename 新建一個maven的Module。<?xmlversion="1.0"encoding="UTF-<project4.0.0"" 下的pom.xml文件,在<project>中<?xmlversion="1.0"encoding="UTF-<project4.0.0""<groupId>org.apache <!--<artifactId>Spark在src/main/java的 下,點擊java 新建一個package命名為spark.streaming.test,然后在包下新建一個SparkStreaming的javaclass。SparkStreamingpackagepackageimport importorg.apache.spark.SparkConf;importorg.apache.spark.api.javafunction.FlatMapFunction;importorg.apache.spark.api.javafunction.Function2;importorg.apache.spark.api.javafunction.PairFunction;importorg.apache.spark.api.java.StorageLevels;importimportorg.apache.spark.streaming.api.java.JavaDStream;importimportimportjava.util.I importjava.utilpublicclassSparkStreamingprivatestaticfinalPatternSPACE pile("publicstaticvoidmain(String[]args)throwsInterruptedException{if(args.length<2){}SparkConfsparkConf=newSparkConf().setAppName("JavaNetworkWordCount");JavaStreamingContextssc=newJavaStreamingContext(sparkConf,Durations.seconds(1));JavaReceiverInputDStream<String>lines=ssc.socketTextStream(args[0],Integer.parseInt(args[1]),StorageLevels.MEMORYANDDISKSER);JavaDStream<String>words=linesflatMap(newFlatMapFunction<String,String>(){public ble<String>call(StringreturnLists}JavaPairDStream<String,Integer>wordCounts=wordsmapToPair(newPairFunction<String,String,Integer>(){publicTuple2<String,Integer>call(Strings){returnnewTuple2<String,Integer>(s,}})reduceByKey(newFunction2<Integer,Integer,Integer>(){publicIntegercall(Integeri1,Integeri2){returni1+i2;}}}點擊File->ProjectStructure->Aritifacts–>點擊加號->JAR->frommoduleswithdependences->module–>選擇MainClass->Ok->Outputdirectory–>Ok。去掉除’guava-14.0.1.jar’和‘guice-3.0.jar’JAROk。Build->BuildAritifactsjarmaster上去。SSHmasternclk9999設置路由器。 [root@master~]#nc-lk [root@master~]#cd[root@masterspark]#bin
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026政協(xié)鎮(zhèn)寧自治縣委員會辦公室公益性崗位人員招聘1人備考題庫(貴州)附答案詳解
- 2026上海市第六人民醫(yī)院公開招聘工作人員備考題庫及完整答案詳解一套
- 2025湖北武漢市漢口學院保安招聘1人備考題庫及答案詳解(奪冠系列)
- 2026廣東廣州市越秀區(qū)兒童福利會招聘1人備考題庫及答案詳解參考
- 2026云南金江滄源水泥工業(yè)有限公司專業(yè)技術崗招聘5人備考題庫完整答案詳解
- 2025聊城東阿經(jīng)濟開發(fā)區(qū)管理委員會公開招聘工作人員補充說明的備考題庫及完整答案詳解
- 2026內(nèi)蒙古鄂爾多斯市烏審旗公立醫(yī)院院長選聘3人備考題庫及參考答案詳解
- 2026江西贛州市人才集團有限公司第一批次人才招聘7人備考題庫附答案詳解
- 2026年景德鎮(zhèn)市珠山區(qū)實驗幼兒園春季專任教師招聘2人備考題庫完整答案詳解
- 2026中國21世紀議程管理中心面向社會招聘2人備考題庫及參考答案詳解一套
- THHPA 001-2024 盆底康復管理質(zhì)量評價指標體系
- JGT138-2010 建筑玻璃點支承裝置
- 垃圾清運服務投標方案(技術方案)
- 顱鼻眶溝通惡性腫瘤的治療及護理
- 光速測量實驗講義
- 斷橋鋁合金門窗施工組織設計
- 新蘇教版六年級科學上冊第一單元《物質(zhì)的變化》全部教案
- 四川山體滑坡地質(zhì)勘察報告
- 青島啤酒微觀運營
- 工程結(jié)算書(設備及安裝類)
- GB/T 19142-2016出口商品包裝通則
評論
0/150
提交評論