版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領
文檔簡介
BigDataAnalyticswithSpark項目六SparkStreaming處理流數(shù)據(jù)項目概述近年來,隨著電子商務、輿情監(jiān)控、傳感監(jiān)控、互聯(lián)網(wǎng)金融等領域的發(fā)展,對數(shù)據(jù)實時處理的需求日漸增強,SparkStreaming計算框架就是為了實現(xiàn)流式數(shù)據(jù)的實時計算而產(chǎn)生的。本項目內(nèi)容涵蓋SparkStreaming讀取套接字、文件流、Kafka等數(shù)據(jù)源數(shù)據(jù),并進行實時處理,最終將結果輸出到數(shù)據(jù)庫中。項目效果通過本項目實踐,可以實現(xiàn)Kafka收集電商用戶行為數(shù)據(jù)后,編寫SparkStreaming程序處理流式數(shù)據(jù)。例如實時計算過去30秒內(nèi)用戶下訂單數(shù)、加入購物車數(shù)量、放入收藏夾數(shù)量如下所示:還可以將有用的數(shù)據(jù)(如用戶購買行為數(shù)據(jù))寫入到MySQL等數(shù)據(jù)庫,以供后臺使用。目錄任務1初識流數(shù)據(jù)處理模塊SparkStreaming讀取基礎數(shù)據(jù)源到DStream中讀取Kafka數(shù)據(jù)到DStream中DStream的轉換操作任務2任務3任務4DStream的輸出操作任務5任務6SparkStreaming實時處理電商用戶行為數(shù)據(jù)任務1任務3DStream的輸出操作任務5思維導圖初識流數(shù)據(jù)處理模塊SparkStreaming任務1SparkStreaming計算框架基本工作原理編寫一個簡單的SparkStreaming程序實現(xiàn)實時詞頻統(tǒng)計SparkStreaming的5階段。我們?nèi)粘L幚淼臄?shù)據(jù)總體上可以分為括靜態(tài)數(shù)據(jù)和流數(shù)據(jù)(動態(tài)數(shù)據(jù))兩大類;靜態(tài)數(shù)據(jù)是一段較長的時間內(nèi)相對穩(wěn)定的數(shù)據(jù),比如各類管理系統(tǒng)中的歷史數(shù)據(jù),例如企業(yè)的訂單數(shù)據(jù)、教務系統(tǒng)中某課程的期末考試成績等;對于靜態(tài)數(shù)據(jù)一般采用批處理方式進行計算,可以在充裕的時間內(nèi)對海量數(shù)據(jù)進行批量處理(即可以容忍較高的時間延遲),計算得到有價值的信息。HadoopMapReduce就是典型的批處理模型,用戶可以在HDFS和HBase存放大量的靜態(tài)數(shù)據(jù),由MapReduce負責對海量數(shù)據(jù)執(zhí)行批量計算。1.1SparkStreaming的產(chǎn)生流數(shù)據(jù)則是以大量、快速、時變的流形式持續(xù)到達,因此流數(shù)據(jù)是不斷變化的數(shù)據(jù);近年來,web應用、網(wǎng)絡監(jiān)控、傳感監(jiān)測等領域,流數(shù)據(jù)處理日漸興起,成為當前數(shù)據(jù)處理領域的重要一環(huán)。比如電子商務領域,淘寶、京東等電商平臺可以實時收集用戶的搜索、點擊、評論、加入購物車等各種用戶行為,進而迅速發(fā)現(xiàn)用戶的興趣點、預判用戶的購物行為,可以通過推薦算法為用戶推薦其可能感興趣的商品,一方面提高商家的銷售額,另一方面提升消費者滿意度及平臺粘性;交通領域,安裝了大量監(jiān)控設備,可以實時收集車輛通過、交通違法等各種信息,進而對車流路況情況作出預判,提升車輛出行效率。1.1SparkStreaming的產(chǎn)生
流數(shù)據(jù)是時間上無上限的數(shù)據(jù)集合,因此其空間(容量)也沒有具體限制。一般認為流數(shù)據(jù)具有如下特點:(1)數(shù)據(jù)快速持續(xù)到達,潛在大小也許是無窮無盡的;(2)數(shù)據(jù)來源眾多,格式復雜;(3)數(shù)據(jù)量大,但是不十分關注存儲;一旦經(jīng)過處理,要么被丟棄,要么被歸檔存儲;(4)注重數(shù)據(jù)的整體價值,不過分關注個別數(shù)據(jù);(5)數(shù)據(jù)順序顛倒,或者不完整,系統(tǒng)無法控制將要處理的新到達的數(shù)據(jù)元素的順序。1.1SparkStreaming的產(chǎn)生正是由于流數(shù)據(jù)的上述特性,流數(shù)據(jù)不能采用傳統(tǒng)的批處理方式,必須實時計算;實時計算最重要的一個需求是能夠實時得到計算結果,一般要求響應時間為秒級或者毫秒級。在大數(shù)據(jù)時代,數(shù)據(jù)量巨大、數(shù)據(jù)樣式復雜、數(shù)據(jù)來源眾多,這些對實時計算提出了新的挑戰(zhàn),進而催生了,針對流數(shù)據(jù)的實時計算——流計算。1.1SparkStreaming的產(chǎn)生
目前,市場上存在Storm、Flink、S4等流計算框架;其中Storm是Twitter提出的、免費開源的分布式實時計算系統(tǒng),Storm可簡單、高效、可靠地處理大量的流數(shù)據(jù);S4(SimpleScalableStreamingSystem)是Yahoo提出的開源流計算平臺,具有通用的、分布式的、可擴展的、分區(qū)容錯的、可插拔的特點;Flink是由Apache軟件基金會開發(fā)的開源流處理框架,F(xiàn)link以數(shù)據(jù)并行和流水線方式執(zhí)行任意流數(shù)據(jù)程序,F(xiàn)link的流水線可以執(zhí)行批處理和流處理程序。1.1SparkStreaming的產(chǎn)生SparkStreaming是構建在Spark上的實時計算框架,它擴展了Spark處理大規(guī)模流式數(shù)據(jù)的能力。SparkStreaming可結合批處理和交互查詢,適合一些需要對歷史數(shù)據(jù)和實時數(shù)據(jù)進行結合分析的應用場景。SparkStreaming支持從多種數(shù)據(jù)源提取數(shù)據(jù),如Kafka、Flume、Twitter、ZeroMQ、文本文件以及TCP套接字等;并且可以提供一些高級API來表達復雜的處理算法,如map、reduce、join和window等;此外,SparkStreaming支持將處理完的數(shù)據(jù)推送到文件系統(tǒng)、數(shù)據(jù)庫或者實時儀表盤中展示。1.1SparkStreaming的產(chǎn)生對于流數(shù)據(jù),SparkStreaming接收實時輸入的數(shù)據(jù)流后,將數(shù)據(jù)流按照時間片(秒級)為單位進行拆分為一個個小的批次數(shù)據(jù),然后經(jīng)Spark引擎以類似批處理的方式處理每個時間片數(shù)據(jù)。1.2
SparkStreaming的工作原理SparkStreaming將流式計算分解成一系列短小的批處理作業(yè),也就是把SparkStreaming的輸入數(shù)據(jù)按照時間片段(如1秒),分成一段一段的離散數(shù)據(jù)流(稱之為DStream,DiscretizedStream);每一段數(shù)據(jù)都轉換成Spark中的RDD,然后將SparkStreaming中對DStream流處理操作變?yōu)獒槍park中對RDD的批處理操作1.2
SparkStreaming的工作原理在進行實時單詞統(tǒng)計時,DStreamlines中每個時間片的數(shù)據(jù)(存儲句子的RDD)經(jīng)flatMap操作,生成了存儲單詞的RDD;這些新生成的單詞RDD對象就組成了words這個DStream對象。完成核心業(yè)務處理后,還可根據(jù)業(yè)務的需求對結果進一步處理,比如存儲到外部設備中。1.2
SparkStreaming的工作原理利用Netcat工具向9999端口發(fā)送數(shù)據(jù)流(文本數(shù)據(jù)),使用SparkStreaming監(jiān)聽9999端口的數(shù)據(jù)流并進行詞頻統(tǒng)計。(1)運行Netcat工具并測試Netcat是一款著名的網(wǎng)絡工具,它可以用于端口監(jiān)聽、端口掃描、遠程文件傳輸以及實現(xiàn)遠程shell等功能,Ubuntu系統(tǒng)系統(tǒng)自帶Netcat工具;下面用兩個shell窗口模擬兩個人在局域網(wǎng)進行聊天,以此測試Netcat工具是否可以正常使用。打開兩個Shell窗口,分別輸入下圖所示命令,用于監(jiān)聽9999端口;分別在兩個窗口中輸入字符,兩個窗口可以分別收到對方放的數(shù)據(jù);說明Netcat可以正常使用、通訊環(huán)境正常。1.3用spark-shell寫第一個SparkStreaming程序(2)在spark-shell中編寫程序在Linux終端使用如下命令進入sparkshell環(huán)境。注意SparkStreaming至少需要2個線程(一個接受流數(shù)據(jù),一個處理數(shù)據(jù));當在本地運行一個SparkStreaming程序時,不要使用“l(fā)ocal”或者“l(fā)ocal[1]”作為master的URL。這兩種方法中的任何一個都意味著只有一個線程將用于運行本地任務。如果你正在使用一個基于接收器(receiver)的輸入離散流DStream(例如TCPsocket,Kafka,F(xiàn)lume等),則該單獨的線程將用于運行接收器(receiver),而沒有留下任何的線程用于處理接收到的數(shù)據(jù)。因此,在本地運行時,需要使用“l(fā)ocal[N]”作為masterURL,其中的N>運行接收器的數(shù)量。1.3用spark-shell寫第一個SparkStreaming程序cd/usr/local/spark/bin./spark-shell--masterlocal[4]StreamingContext是所有流功能的主要入口點,導入相關包后,創(chuàng)建一個間歇時間為10秒的本地StreamingContext實例ssc。1.3用spark-shell寫第一個SparkStreaming程序利用創(chuàng)建的ssc(StreamingContext對象),我們可以創(chuàng)建一個DStream對象lines;該DStream代表從localhost主機的9999端口流入的數(shù)據(jù)流。lines是從數(shù)據(jù)server接收到的數(shù)據(jù)流,其中每條記錄都是一行文本。接下來,我們就要把這些文本行按空格分割成單詞;與SparkRDD中的ftatMap類似,這里的ftatMap是一個映射算子,lines中的每行都會被ftatMap映射為多個單詞,從而生成新的wordsDStream對象。。1.3用spark-shell寫第一個SparkStreaming程序有了wordsDStream后,使用map方法將其RDD元素轉換為(word,1)鍵值對形式;再使用reduceByKey算子,得到各個單詞出現(xiàn)的頻率、并打印輸出。注意,執(zhí)行以上代碼后,SparkStreaming只是將計算邏輯設置好,此時并未真正的開始處理數(shù)據(jù)。要啟動之前的處理邏輯,我們還要使用start方法啟動流計算并等待程序結束。1.3用spark-shell寫第一個SparkStreaming程序接下來在Linux終端,使用Netcat工具向9999端口發(fā)送文本數(shù)據(jù)。SparkStreaming即可計數(shù)10秒內(nèi)數(shù)據(jù)流的詞頻并輸出。1.3用spark-shell寫第一個SparkStreaming程序下面在IDEA環(huán)境下,使用SparkStreaming完成流數(shù)據(jù)的實時詞頻統(tǒng)計,具體步驟如下:(1)創(chuàng)建Maven工程。在IntelliJIDEA中創(chuàng)建SparkStreaming工程,完畢后,在Maven的porm.xml文件中添加添加SparkStreaming組件相關依賴:<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_${scala.version}</artifactId><version>1.6.3</version></dependency>1.4用IDEA工具寫第一個SparkStreaming程序(2)編寫程序上述工程中,創(chuàng)建一個名為StreamingTest.scala的ScalaObject文件,文件中寫入如下代碼:importorg.apache.spark._importorg.apache.spark.streaming.StreamingContextimportorg.apache.spark.streaming.SecondsobjectStreamTest{defmain(args:Array[String]):Unit={valconf=newSparkConf().setMaster("local[4]").setAppName("NetworkWordCount")valsc=newSparkContext(conf)//屏蔽控制臺輸出中的INFO日志輸出sc.setLogLevel("WARN")
1.4用IDEA工具寫第一個SparkStreaming程序//創(chuàng)建StreamingContextvalssc=newStreamingContext(sc,Seconds(10))//創(chuàng)建DStream,監(jiān)聽本機的9999端口vallines=ssc.socketTextStream("localhost",9999)//將監(jiān)聽到的文本切割成單詞valwords=lines.flatMap(_.split(""))//將切割后的單詞組成KV形式的鍵值對valpairs=words.map(word=>(word,1))//統(tǒng)計每個單詞的詞頻valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()}}1.4用IDEA工具寫第一個SparkStreaming程序(3)運行程序使用nc-lk9999命令打開netcat監(jiān)聽;運行StreamingTest.scala,在netcat窗口中,輸入文本則在IDEA中輸出詞頻統(tǒng)計的結果。1.4用IDEA工具寫第一個SparkStreaming程序通過書寫上述代碼,可以發(fā)現(xiàn)編寫SparkStreaming程序模式相對固定,其基本步驟包括: (1)通過創(chuàng)建輸入DStream來定義輸入源 (2)對DStream進行轉換操作和輸出操作來定義流計算。 (3)streamingContext.start()來開始接收數(shù)據(jù)和處理流程。 (4)streamingContext.awaitTermination()方法,等待處理結束(手動結束或因為錯誤而結束)。 (5)可以通過streamingContext.stop()來手動結束流計算進程。1.5編寫SparkStreaming程序的基本步驟讀取基礎數(shù)據(jù)源到DStream中任務2SparkStreaming對接多種數(shù)據(jù)源,將獲取的流數(shù)據(jù)生成DStream。在IDEA環(huán)境下,SparkStreaming從基礎數(shù)據(jù)源中獲取數(shù)據(jù)并創(chuàng)建DStream。SparkStreaming可以從HDFS文件系統(tǒng)目錄、本地系統(tǒng)的文件目錄讀取數(shù)據(jù)到DStream中,本例將演示在IDEA環(huán)境下,編寫SparkStreaming程序實時讀取HDFS文件目錄中的數(shù)據(jù)。(1)啟動HDFS服務在Linux終端中,使用如下命令,啟動hdfs服務(2)準備數(shù)據(jù)文件準備3個文件文件file1.txt、file2.txt、file3.txt(位于/home/hadoop目錄下),其內(nèi)容如下所示。2.1讀取文件流cd/usr/local/hadoop/sbin./start-dfs.sh(3)編寫程序IDEA工程中,新建一個scala文件StreamReadHdfs.scala,其代碼如下://導入相關包importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.log4j.{Level,Logger}objectStreamReadHdfs{defmain(args:Array[String]):Unit={//設置Level級別,屏蔽控制臺無關日志輸出,便于觀察輸出結果Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//新建一個SparkConf實例、SparkContext實例valconf=newSparkConf().setMaster("local[4]").setAppName("StreamReadHdfs")valsc=newSparkContext(conf)2.1讀取文件流//新建StreamingContext實例valssc=newStreamingContext(sc,Seconds(10))//創(chuàng)建DStream用于監(jiān)聽hdfs相關目錄vallines=ssc.textFileStream("hdfs://localhost:9000/user/hadoop/spark_streaming")//逐行打印監(jiān)聽的數(shù)據(jù)lines.print()//開始SparkStreaming任務,任務持續(xù)執(zhí)行,直到某種方式停止或發(fā)生異常ssc.start()ssc.awaitTermination()}}2.1讀取文件流(4)運行測試運行StreamReadHdfs.scala程序,SparkStreaming開始監(jiān)聽hdfs文件系統(tǒng)的"hdfs://localhost:9000/user/hadoop/spark_streaming"目錄,沒有新文件輸入時如下所示。在Linux終端中,使用以下命令將file1.txt、file2.txt、file3.txt依次上傳到"hdfs://localhost:9000/user/hadoop/spark_streaming"目錄下。2.1讀取文件流cd/usr/local/hadoop/bin./hdfsdfs-mkdir/user/hadoop/spark_streaming./hdfsdfs-put/home/hadoop/file1.txt/user/hadoop/spark_streaming#上傳file1.txt./hdfsdfs-put/home/hadoop/file2.txt/user/hadoop/spark_streaming#上傳file1.txt./hdfsdfs-put/home/hadoop/file3.txt/user/hadoop/spark_streaming#上傳file1.txt在IDEA的控制臺,可以看到SparkStreaming監(jiān)聽到"hdfs://localhost:9000/user/hadoop/spark_streaming"目錄下,不斷有數(shù)據(jù)流入(上傳新文件),并將數(shù)據(jù)內(nèi)容輸出。2.1讀取文件流SparkStreaming可以方便的讀取套接字流,只需要的調(diào)用StreamingContext類的socketTextStream方法即可,調(diào)用格式如下所示;valssc=newStreamingContext(sc,Seconds(10))vallines=ssc.socketTextStream("localhost",9999)其中,sc為SparkContext實例,“l(fā)ocalhost”表示本機(也可以用主機的IP代替),“9999”為監(jiān)聽的端口號。任務1中已給出具體案例,在此不再重復。2.2讀取套接字流SparkStreaming可以讀取RDD組成的數(shù)據(jù)隊列;這里我們創(chuàng)建一個隊列,將動態(tài)生成的RDD不斷發(fā)送到該隊列中,SparkStreaming持續(xù)讀取隊列中的RDD。在工程中創(chuàng)建StreamReadRDD.scala文件,代碼如下:importorg.apache.log4j.{Level,Logger}importorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.{SparkConf,SparkContext}objectStreamReadRDD{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)2.3讀取RDD隊列流valconf=newSparkConf().setMaster("local[4]").setAppName("StreamReadRDD")valsc=newSparkContext(conf)valssc=newStreamingContext(sc,Seconds(2))//創(chuàng)建線程安全的隊列,用于放置RDDvalrddQueue=newscala.collection.mutable.SynchronizedQueue[RDD[Int]]//創(chuàng)建一個線程,通過for循環(huán)向隊列中添加新的RDDvaladdQueueThread=newThread(newRunnable{overridedefrun():Unit={for(i<-1to5){//向隊列rddQueue添加新的RDDrddQueue+=sc.parallelize(i)//線程sleep2000毫秒Thread.sleep(2000)}}})2.3讀取RDD隊列流//創(chuàng)建DStream讀取RDD系列valinputDStream=ssc.queueStream(rddQueue)inputDStream.print()//啟動SparkStreamingssc.start()//啟動addQueueThread線程,不斷向rddQueue隊列中添加新的RDDaddQueueThread.start()ssc.awaitTermination()}}執(zhí)行StreamReadRDD.scala,其輸出結果如右圖所示:2.3讀取RDD隊列流讀取Kafka數(shù)據(jù)到DStream中任務3SparkStreaming與Kafka對接的方法讀取Kafka數(shù)據(jù)到DStream中,完成數(shù)據(jù)實時處理任務。除了套接字流、文件流、RDD隊列流,SparkStreaming還支持Kafka、Flume、Kinesis等高級數(shù)據(jù)源;這一類別的數(shù)據(jù)源需要使用Spark庫外的接口,其中一些還需要比較復雜的依賴關系(例如Kafka和Flume)。因此,為了最小化有關的依賴關系的版本沖突的問題,這些資源本身不能創(chuàng)建DStream的功能,需要通過依賴單獨的類庫實現(xiàn)創(chuàng)建DStream的功能。另外,這些高級數(shù)據(jù)源不能在SparkShell中使用,因此基于這些高級數(shù)據(jù)源的應用程序不能在SparkShell中直接測試;如想要在SparkShell中使用它們,則必須下載帶有它的依賴的相應的Maven組件的JAR,并且將其添加到classpath;本項任務我們將在IDEA中編寫SparkStreaming讀取Kafka高級數(shù)據(jù)源。3.1SparkStreaming支持的高級數(shù)據(jù)源
Kafka是最初由Linkedin公司開發(fā),是一個分布式、支持分區(qū)的(partition)、多副本的(replica)分布式消息系統(tǒng),它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景;Kafka用scala語言編寫,目前已成為Apache基金會頂級開源項目。Kafka有如下特點:高吞吐量、低延遲:Kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒??蓴U展性:kafka集群支持熱擴展。持久性、可靠性:消息被持久化到本地磁盤,并且支持數(shù)據(jù)備份防止數(shù)據(jù)丟失。容錯性:允許集群中節(jié)點失?。褐斜A舳鄠€副本)。高并發(fā):支持數(shù)千個客戶端同時讀寫。3.2了解Kafka的工作原理Kafka的工作原理如下所示,消息生產(chǎn)者Producer(向Kafka發(fā)送數(shù)據(jù)的終端)產(chǎn)生數(shù)據(jù)后,通過Zookeeper找到Brocker(一臺Kafka服務器就是一個Broker,一個集群可以由多個Broker組成)后,將數(shù)據(jù)放到Broker上并標記不同的主題topic;消息消費者Customer(從Kafka獲取消息的終端)根據(jù)自身訂閱的topic主題,通過Zookeeper找到相應的Broker,然后消費相關數(shù)據(jù)。3.2了解Kafka的工作原理使用Kafka模擬持續(xù)收集地交通監(jiān)控設備發(fā)來的的監(jiān)控數(shù)據(jù)(數(shù)據(jù)內(nèi)容為:監(jiān)控設備號,最高限速,車牌號,車輛通過時速),利用SparkStreaming讀取Kafka中的數(shù)據(jù),找出超速行駛的的車輛并在控制臺輸出。(1)安裝Kafka進入Kafka的官網(wǎng)/downloads,下載與本機Scala版本一致的Kafka包,此安裝包內(nèi)已經(jīng)附帶Zookeeper,不需要額外安裝Zookeeper。3.3Kafka的安裝與測試在Linux終端下,使用如下命令完成Kafka解壓等工作。(2)啟動Kafka打開一個Linux終端,輸入以下命令,啟動zookeeper服務。打開第二個Linux終端,輸入以下命令,啟動kafka服務。3.3Kafka的安裝與測試cd/usr/local/hadoop/sbin./start-dfs.shcd/usr/local/kafkabin/zookeeper-server-start.shconfig/pertiescd/usr/local/kafkabin/kafka-server-start.shconfig/perties(3)創(chuàng)建主題,測試Kafka是否安裝成功打開第三個Linux終端,使用如下命令添加一個消息主題“mytopic”使用如下命令查看主題mytopic是否創(chuàng)建成功,創(chuàng)建成功則有顯示。接下來使用命令,向主題mytopic中發(fā)送消息。3.3Kafka的安裝與測試cd/usr/local/kafkabin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topicmytopicbin/kafka-topics.sh--list--zookeeperlocalhost:2181bin/kafka-console-producer.sh--broker-listlocalhost:9092--topicmytopic打開第四個Linux終端,使用如下命令消費mytopic主題消息,可以顯示相關信息,如圖6-23所示。測試正常后,即可關閉第三、第四個Linux終端;但第一、第二個Linux終端不要關閉。3.3Kafka的安裝與測試cd/usr/local/kafkabin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicstreamtest--from-beginning(4)準備測試數(shù)據(jù)準備如下格式的Kafka消息:監(jiān)控設備號、最高限速、車牌號、通過時速,如圖6-25所示:(5)使用Kafka控制臺生成消息、測試程序本教程使用的Spark版本為2.2.3,與Kafka包存在一定的兼容問題;編譯代碼時有可能出現(xiàn)如下錯誤:3.4編寫SparkStreaming程序找出超速車輛上述問題主要是Spark2.0以上版本將Logging類轉移到包ernal包中(而非之前的org.apache.spark包)。我們可以從在當前工程下,建立org.apache.spark包,從Spark源碼中將Logging.scala文件復制到org.apache.spark包中。3.4編寫SparkStreaming程序找出超速車輛打開新的Linux終端,輸入如下命令、輸入相應消息,輸入消息及控制臺輸出結果如下;找出了超速車輛粵A8512、粵A4432、粵A2893。3.4編寫SparkStreaming程序找出超速車輛DStream的轉換操作任務4在流計算中,要進行轉換操作。DStream中常見的轉換操作。所謂的DStream無狀態(tài)轉換操作,是指不記錄歷史狀態(tài)信息,每次僅對新的批次數(shù)據(jù)進行處理;無狀態(tài)轉換操作每一個批次的數(shù)據(jù)處理都是獨立的,處理當前批次數(shù)據(jù)時,即不依賴之前的數(shù)據(jù),也不影響后續(xù)的數(shù)據(jù)。例如,任務1中的流數(shù)據(jù)詞頻統(tǒng)計,就采用無狀態(tài)轉換操作,每次僅統(tǒng)計當前批次數(shù)據(jù)中的單詞詞頻,與之前批次數(shù)據(jù)無關,不會利用之前的歷史數(shù)據(jù)。4.1DStream無狀態(tài)轉換操作4.1DStream無狀態(tài)轉換操作右表給出了常見的DStream無狀態(tài)轉換操作。DStream的操作與RDD的轉換操作類似,在流數(shù)據(jù)詞頻統(tǒng)計程序中已用到map等操作,在此不再詳述;但表中的transform方法值得深入探討。transform方法使用戶能夠直接調(diào)用任意的RDD操作方法,極大的豐富了DStream上能夠操作的內(nèi)容。下面演示使用transform方法模擬過濾黑名單車輛;現(xiàn)有一個違章車輛黑名單文件blacklist.txt,記載了車輛車牌號、違法項目。4.1DStream無狀態(tài)轉換操作用Netcat模擬交通監(jiān)控設備獲取的車流信息,信息格式為“監(jiān)控設備號,車牌號,記錄時間”,樣式如下所示;現(xiàn)要求SparkStreaming獲取車流數(shù)據(jù)后,與和名單文件中的車牌號對照,輸出違法車輛信息。4.1DStream無狀態(tài)轉換操作在Linux終端中,使用nc-lk9999命令準備向9999端口發(fā)送數(shù)據(jù);運行TrafficStream.scala,在Netcat終端中輸入數(shù)據(jù),IDEA控制臺輸出結果如下所示(如果數(shù)據(jù)輸入速度超過窗口時間,則輸出結果可能有所不同)。4.1DStream無狀態(tài)轉換操作與無狀態(tài)轉換操作不同,DStream有狀態(tài)轉換操作當前批次的處理需要使用之前批次的數(shù)據(jù)或者中間結果。有狀態(tài)轉換包括基于滑動窗口的轉換和updateStateByKey轉換?;瑒哟翱谵D換操作的計算過程如下所示,對于一個DStream,我們可以事先設定一個滑動窗口的長度(也就是窗口的持續(xù)時間),并且設定滑動窗口的時間間隔(每隔多長時間執(zhí)行一次計算);然后窗口按照指定時間間隔在源DStream上滑動,每次落入窗口的RDD都會形成一個小段的DStream(稱之為windowedDStream,包含若干個RDD),這時,就可以啟動對這個小段DStream的計算。4.2
DStream有狀態(tài)轉換操作由窗口操作的原理可知,任何窗口相關操作都要指定兩個參數(shù):?windowlength——窗口的長度,即窗口覆蓋的時間長度?slidinginterval——窗口每次滑動的距離,窗口啟動的時間間隔注意:上述兩個參數(shù)都必須是DStream批次間隔的整數(shù)倍;常用的窗口操作如下所示。4.2
DStream有狀態(tài)轉換操作上述操作中,window操作是基于源DStream的批次計算后得到新的DStream;例如要讀取套接字流數(shù)據(jù),設置批次間隔1秒,窗口長度為3秒,滑動時間間隔為1秒,截取DStream中的元素構建新的DStream使用Netcat向端口發(fā)送數(shù)據(jù),按照每秒鐘發(fā)一個字母的速度發(fā)送,輸出結果如圖6-33所示;可以看到,第一秒輸出a,第二秒輸出ab,第三秒輸出abc,而第四秒輸出bcd(因為a已經(jīng)滑出當前窗口)。4.2
DStream有狀態(tài)轉換操作窗口操作中的reduceByKeyAndWindow操作與詞頻統(tǒng)計中使用的reduceByKey類似,但reduceByKeyAndWindow針對的是窗口數(shù)據(jù)源(DStream中截取的一段),是對窗口內(nèi)所有數(shù)據(jù)進行計算。例如,設置窗口長度為3秒,滑動時間1秒,進行窗口內(nèi)單詞詞頻統(tǒng)計;使用Netcat向端口發(fā)送數(shù)據(jù),按照每秒鐘發(fā)一個字母的速度發(fā)送,輸出結果如圖6-64所示;可以看到,第一秒輸出(a,1),第二秒輸出(a,2),第三秒輸出(a,1)、(b,1),此時第一個字母已經(jīng)滑出窗口,所以a的數(shù)量減少一個。4.2
DStream有狀態(tài)轉換操作DStream的輸出操作任務5處理完畢的數(shù)據(jù)可以按照業(yè)務要求輸出到文件、數(shù)據(jù)庫、展板中。將DStream寫入到文本文件以及MySQL數(shù)據(jù)庫中。輸出算子可以將DStream的數(shù)據(jù)推送到外部系統(tǒng),如數(shù)據(jù)庫或者文件系統(tǒng);SparkStreaming只有輸出算子調(diào)用時,才會真正觸發(fā)transformation算子的執(zhí)行(與RDD類似)。目前所支持的輸出算子如下所示。5.1DStream寫入到文本文件下面使用saveAsTextFile方法,接收套接字數(shù)據(jù)流(端口號9999)后,進行單詞詞頻統(tǒng)計,統(tǒng)計結果保存到文本文件中;然后,在IDEA中停止程序運行,然后檢查下這些詞頻結果是否被成功地輸出到“file:///home/hadoop/streamsave/file.txt”文件中了,在Linux終端中執(zhí)行如下命令:5.1DStream寫入到文本文件DStream.foreachRDD是一個非常強大的算子,用戶可以基于此算子將DStream數(shù)據(jù)推送到外部系統(tǒng)中。在演示代碼前,用戶需要了解如何高效的使用這個工具;下面列舉常見的錯誤。(1)在Spark驅動程序中建立數(shù)據(jù)庫連接通常,對外部系統(tǒng)寫入數(shù)據(jù)需要一些連接對象(如遠程server的TCP連接),以便發(fā)送數(shù)據(jù)給遠程系統(tǒng)。因此,開發(fā)人員可能會不經(jīng)意地在Spark驅動器(driver)進程中創(chuàng)建一個連接對象,然后又試圖在Sparkworker節(jié)點上使用這個連接。如下例所示:dstream.foreachRDD{rdd=>valconnection=createNewConnection()//這行在驅動器(driver)進程執(zhí)行rdd.foreach{record=>connection.send(record)//而這行將在worker節(jié)點上執(zhí)行}}這段代碼是錯誤的,因為它需要把連接對象序列化,再從驅動器節(jié)點發(fā)送到worker節(jié)點。而這些連接對象通常都是不能跨節(jié)點(機器)傳遞的。5.2DStream寫入到MySQL數(shù)據(jù)庫的方法分析(2)為每一條記錄建立一個數(shù)據(jù)庫連接解決上述錯誤的辦法就是在worker節(jié)點上創(chuàng)建連接對象。然而,有些開發(fā)人員可能會走到另一個極端——為每條記錄都創(chuàng)建一個連接對象,例如:dstream.foreachRDD{rdd=>rdd.foreach{record=>valconnection=createNewConnection()connection.send(record)connection.close()}}一般來說,連接對象是有時間和資源開銷限制的。因此,對每條記錄都進行一次連接對象的創(chuàng)建和銷毀會增加很多不必要的開銷,同時也大大減小了系統(tǒng)的吞吐量。5.2DStream寫入到MySQL數(shù)據(jù)庫的方法分析(3)較為高效的做法一個比較好的解決方案是使用rdd.foreachPartition,為RDD的每個分區(qū)創(chuàng)建一個單獨的連接對象,示例如下:dstream.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>valconnection=createNewConnection()partitionOfRecords.foreach(record=>connection.send(record))connection.close()}}5.2DStream寫入到MySQL數(shù)據(jù)庫的方法分析下面使用foreachRDD方法,模擬處理出租車監(jiān)控系統(tǒng)發(fā)來的車輛定位數(shù)據(jù)(包含車牌號、經(jīng)度、維度、時間),接收到數(shù)據(jù)流后將其每10秒將其存入MySQL數(shù)據(jù)庫。首先打開一個Linux終端,輸入以下命令啟動MySQL服務并進入MySQL客戶端。進入MySQL客戶端后,使用使用下列語句,創(chuàng)建MySQL數(shù)據(jù)庫stream及數(shù)據(jù)庫表car_position。5.3車輛定位數(shù)據(jù)寫入到MySQL數(shù)據(jù)庫servicemysqlstart#啟動MySQL服務mysql-uroot-p#屏幕會提示你輸入密碼,輸入正確密碼后即進入MySQL客戶端CREATEDATABASEstream;USEstream;CREATETABLEIFNOTEXISTS`car_position`(`id`BIGINTUNSIGNEDAUTO_INCREMENT,`carNO`VARCHAR(50)NOTNULL,`longitude`VARCHAR(40)NOTNULL,`latitude`VARCHAR(40)NOTNULL,`times`VARCHAR(40)NOTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;運行StreamMySQL.scala,使用Netcat向9999端口發(fā)送數(shù)據(jù),如下所示。在IDEA控制臺可以看到輸出處理后的結果;在MySQL客戶端,輸入命令查詢car_position表的記錄,可以發(fā)現(xiàn)流數(shù)據(jù)成功寫入MySQL,如下所示。5.3車輛定位數(shù)據(jù)寫入到MySQL數(shù)據(jù)庫下面使用foreachRDD方法,模擬處理出租車監(jiān)控系統(tǒng)發(fā)來的車輛定位數(shù)據(jù)(包含車牌號、經(jīng)度、維度、時間),接收到數(shù)據(jù)流后將其每10秒將其存入MySQL數(shù)據(jù)庫。首先打開一個Linux終端,輸入以下命令啟動MySQL服務并進入MySQL客戶端。進入MySQL客戶端后,使用使用下列語句,創(chuàng)建MySQL數(shù)據(jù)庫stream及數(shù)據(jù)庫表car_position。5.3車輛定位數(shù)據(jù)寫入到MySQL數(shù)據(jù)庫servicemysqlstart#啟動MySQL服務mysql-uroot-p#屏幕會提示你輸入密碼,輸入正確密碼后即進入MySQL客戶端CREATEDATABASEstream;USEstream;CREATETABLEIFNOTEXISTS`car_position`(`id`BIGINTUNSIGNEDAUTO_INCREMENT,`carNO`VARCHAR(50)NOTNULL,`longitude`VARCHAR(40)NOTNULL,`latitude`VARCHAR(40)NOTNULL,`times`VARCHAR(40)NOTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;SparkStreaming實時處理電商用戶行為數(shù)據(jù)任務6模擬解決電商用戶行為數(shù)據(jù)處理問題。定時讀取數(shù)據(jù)到Kafka的某主題下,SparkStream獲取該主題的數(shù)據(jù)后進行處理。處理結果最終寫入MySQL數(shù)據(jù)庫。本任務模擬數(shù)據(jù)取自淘寶用戶行為數(shù)據(jù),數(shù)據(jù)源自阿里云天池數(shù)據(jù)集(/dataset/dataDetail?dataId=649);該數(shù)據(jù)集包含了2017年11月25日至2017年12月3日之間,有行為的約一百萬隨機用戶的所有行為(行為包括點擊、購買、加購、喜歡);即數(shù)據(jù)集的每一行表示一條用戶行為,由用戶ID、商品ID、商品類目ID、行為類型和時間戳組成,并以逗號分隔。關于數(shù)據(jù)集中每一列的詳細描述如下:6.1數(shù)據(jù)集說明與準備工作整個數(shù)據(jù)集包含用戶數(shù)量987994,商品數(shù)量4162024,商品類別9439,用戶行為數(shù)據(jù)(行數(shù))100150807?,F(xiàn)抽取其中的2000行,構建子數(shù)據(jù)集userbehavior2000.csv(樣例數(shù)據(jù)如圖6-68所示)完成本實驗。6.1數(shù)據(jù)集說明與準備工作本項任務整體實現(xiàn)思路如圖6-69所示;
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026湖北省定向浙江大學選調(diào)生招錄參考題庫附答案
- 2026湖南省農(nóng)林工業(yè)勘察設計研究院有限公司招聘考試備考題庫附答案
- 2026西藏日喀則市薩嘎縣招聘藝術團演職人員5人備考題庫附答案
- 2026陜西省面向中國農(nóng)業(yè)大學招錄選調(diào)生備考題庫附答案
- 吉安市公安局2026年公開招聘警務輔助人員【58人】備考題庫附答案
- 招6人!湟源縣公安局2025年面向社會公開招聘警務輔助人員考試備考題庫附答案
- 浙江國企招聘-2026臺州椒江城市發(fā)展投資集團有限公司、臺州市高鐵新區(qū)開發(fā)建設有限公司招聘31人備考題庫附答案
- 2026福建省面向鄭州大學選調(diào)生選拔工作備考題庫附答案
- 2026湖南郴州市宜章縣金信建設有限公司面向社會招聘3名工作人員備考題庫附答案
- 慶陽市市直學校2026年公開引進高層次和急需緊缺人才參考題庫附答案
- 湖南省2025-2026學年七年級歷史上學期期末復習試卷(含答案)
- 2026年中國熱帶農(nóng)業(yè)科學院南亞熱帶作物研究所第一批招聘23人備考題庫完美版
- 2026新疆阿合奇縣公益性崗位(鄉(xiāng)村振興專干)招聘44人考試參考試題及答案解析
- 紡織倉庫消防安全培訓
- 器官移植術后排斥反應的風險分層管理
- 虛擬電廠關鍵技術
- 事業(yè)單位清算及財務報告編寫范本
- 護坡綠化勞務合同范本
- 臨床績效的DRG與CMI雙指標調(diào)控
- 護坡施工安全專項方案
- 2026年湛江日報社公開招聘事業(yè)編制工作人員備考題庫及完整答案詳解
評論
0/150
提交評論