版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領
文檔簡介
·SparkStreaming第7章目
錄01流計算概述02SparkStreaming03DStream操作概述04基本輸入源05高級數(shù)據(jù)源06轉(zhuǎn)換操作07輸出操作流計算概述7.1
流計算概述靜態(tài)數(shù)據(jù)和流數(shù)據(jù)批量計算和實時計算流計算概念流計算框架目錄CONTENT流計算處理流程7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)靜態(tài)數(shù)據(jù)流數(shù)據(jù)7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)靜態(tài)數(shù)據(jù)數(shù)據(jù)倉庫7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)流數(shù)據(jù)應用越來越多7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)網(wǎng)絡監(jiān)控Web應用傳感監(jiān)測7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)每時每刻都在采集數(shù)據(jù)對數(shù)據(jù)進行實時的分析攝像頭視頻7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)大量快速時變持續(xù)到達流數(shù)據(jù)7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)PM2.5檢測電子商務網(wǎng)站用戶點擊流7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)數(shù)據(jù)快速持續(xù)到達
潛在大小也許無窮無盡數(shù)據(jù)來源眾多,格式復雜數(shù)據(jù)量大注重數(shù)據(jù)的整體價值數(shù)據(jù)順序顛倒,或不完整流數(shù)據(jù)特征7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)靜態(tài)數(shù)據(jù)流數(shù)據(jù)兩種典型數(shù)據(jù)7.1.1靜態(tài)數(shù)據(jù)和流數(shù)據(jù)實時計算處理邏輯靜態(tài)數(shù)據(jù)價值丟棄流數(shù)據(jù)(動態(tài)數(shù)據(jù))價值處理邏輯批量計算7.1.2
批量計算和實時計算MapReduce無法滿足秒級響應處理大規(guī)模靜態(tài)數(shù)據(jù)處理邏輯靜態(tài)數(shù)據(jù)價值批量計算7.1.2
批量計算和實時計算實時獲取不同數(shù)據(jù)源的海量數(shù)據(jù)獲得有價值的信息經(jīng)過實時分析處理流計算7.1.2
批量計算和實時計算數(shù)據(jù)采集實時分析處理結果反饋7.1.2
批量計算和實時計算數(shù)據(jù)的價值時間的流逝流計算基本理念數(shù)據(jù)過了時間點就沒有價值7.1.3流計算概念流計算基本理念在互聯(lián)網(wǎng)應用產(chǎn)品中尤其典型7.1.3流計算概念用戶點擊流7.1.3流計算概念您可能感興趣7.1.3流計算概念您感興趣的過了十幾分鐘用戶離開網(wǎng)站7.1.3流計算概念數(shù)據(jù)的價值時間的流逝7.1.3流計算概念低延遲高可靠處理引擎可擴展7.1.3流計算概念高性能每秒處理幾十萬條數(shù)據(jù)流計算框架7.1.3流計算概念海量式支持TB級、PB級的數(shù)據(jù)規(guī)模流計算框架7.1.3流計算概念實時性低延遲,達到秒級別、毫秒級別流計算框架7.1.3流計算概念分布式支持大數(shù)據(jù)基本架構,平滑擴展流計算框架7.1.3流計算概念易用性快速進行開發(fā)和部署流計算框架7.1.3流計算概念可靠性可靠地處理流數(shù)據(jù)流計算框架7.1.3流計算概念開源流計算框架商業(yè)級的流計算平臺公司為支持自身業(yè)務開發(fā)的流計算框架三類流計算框架和平臺7.1.4流計算框架IBMStreamBaseIBMInfoSphereStreams商業(yè)級流計算平臺7.1.4流計算框架TwitterStormYahoo!S4開源流計算框架7.1.4流計算框架TwitterStorm7.1.4流計算框架TwitterStorm免費開源簡單高效可靠分布式實時計算系統(tǒng)處理大量的流數(shù)據(jù)7.1.4流計算框架Yahoo!S47.1.4流計算框架通用分布式可擴展分區(qū)容錯可插拔流式系統(tǒng)Yahoo!S47.1.4流計算框架公司為支持自身業(yè)務開發(fā)的流計算框架Dstream銀河流計算處理平臺7.1.4流計算框架Puma概述數(shù)據(jù)實時采集數(shù)據(jù)實時計算實時查詢服務7.1.5 流計算處理流程存儲的數(shù)據(jù)是舊的不具備時效性傳統(tǒng)的數(shù)據(jù)處理流程概述需要用戶主動發(fā)出查詢來獲取結果7.1.5 流計算處理流程流計算處理流程示意圖概述7.1.5 流計算處理流程數(shù)據(jù)實時采集采集多個數(shù)據(jù)源的海量數(shù)據(jù)實時性低延遲穩(wěn)定可靠7.1.5 流計算處理流程日志數(shù)據(jù)數(shù)據(jù)實時采集7.1.5 流計算處理流程開源分布式日志采集系統(tǒng)每秒數(shù)百MB的數(shù)據(jù)采集每秒數(shù)百MB的數(shù)據(jù)傳輸數(shù)據(jù)實時采集7.1.5 流計算處理流程Scribe數(shù)據(jù)實時采集7.1.5 流計算處理流程數(shù)據(jù)實時采集Kafka7.1.5 流計算處理流程數(shù)據(jù)實時采集TimeTunnel7.1.5 流計算處理流程數(shù)據(jù)實時采集ChukwaFlume7.1.5 流計算處理流程流計算處理流程示意圖數(shù)據(jù)實時計算7.1.5 流計算處理流程數(shù)據(jù)實時計算數(shù)據(jù)實時計算流程7.1.5 流計算處理流程流計算處理流程示意圖實時查詢服務7.1.5 流計算處理流程結果流計算框架實時查詢展示儲存實時查詢服務7.1.5 流計算處理流程傳統(tǒng)計算方式主動發(fā)起查詢用戶數(shù)據(jù)庫7.1.5 流計算處理流程流數(shù)據(jù)處理系統(tǒng)實時查詢服務實時數(shù)據(jù)7.1.5 流計算處理流程和結果推送更新結果傳統(tǒng)的數(shù)據(jù)處理系統(tǒng)定時查詢過去某一時刻實時查詢服務7.1.5 流計算處理流程流處理系統(tǒng)傳統(tǒng)的數(shù)據(jù)處理系統(tǒng)數(shù)據(jù)實時的數(shù)據(jù)預先存儲好的靜態(tài)數(shù)據(jù)結果實時結果過去某一時刻的結果用戶得到結果的方式主動將實時結果推送給用戶用戶主動發(fā)出查詢7.1.5 流計算處理流程SparkStreaming7.2
SparkStreaming從“Hadoop+Storm”架構轉(zhuǎn)Spark架構SparkStreaming與Storm的對比SparkStreaming設計SparkStreamingSparkStreamingKafkaFlumeHDFSTCPsocketHDFSDatabasesDashboards圖SparkStreaming支持的輸入、輸出數(shù)據(jù)源7.2.1SparkStreaming設計SparkStreaming圖
SparkStreaming執(zhí)行流程inputdatastreambatchesofinputdatabatchesofprocesseddataSparkEngine7.2.1SparkStreaming設計連續(xù)不斷的數(shù)據(jù)流模仿流計算7.2.1SparkStreaming設計Spark是以線程級別并行實時響應級別高可以實現(xiàn)秒級響應變相實現(xiàn)高效的流計算…批處理批處理批處理批處理批處理模仿流計算交給Spark核心引擎處理7.2.1SparkStreaming設計圖
DStream操作示意圖LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result47.2.1SparkStreaming設計SparkCoreRDD數(shù)據(jù)抽象SparkSQLDataFrame數(shù)據(jù)抽象SparkStreamingDStream數(shù)據(jù)抽象7.2.1SparkStreaming設計圖
DStream操作示意圖LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result4第一秒切出的數(shù)據(jù)第二秒切出的數(shù)據(jù)7.2.1SparkStreaming設計圖
DStream操作示意圖LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result4DStream這個時間段內(nèi)獲得的語句7.2.1SparkStreaming設計SparkStreamingStorm毫秒級響應無法實現(xiàn)可以實現(xiàn)實時計算可用于實時計算可實時計算容錯處理RDD數(shù)據(jù)集更容易、更高效的容錯處理高度容錯計算方式兼容批量和實時數(shù)據(jù)處理實時流計算7.2.2SparkStreaming與Storm的對比企業(yè)批處理需求流處理需求7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構圖
采用Hadoop+Storm部署方式的一個案例7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構圖
采用Hadoop+Storm部署方式的一個案例7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構圖
采用Hadoop+Storm部署方式的一個案例批量處理實時流計算7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構圖
用Spark架構滿足批處理和流處理需求批處理7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構圖
用Spark架構滿足批處理和流處理需求實時計算7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構圖
用Spark架構滿足批處理和流處理需求查詢7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構Spark架構Hadoop+Storm架構vs7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構Spark架構優(yōu)點一鍵安裝和布置7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構硬件集群軟件維護任務監(jiān)控應用開發(fā)難度Spark架構優(yōu)點7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構統(tǒng)一的硬件、計算平臺資源池Spark架構優(yōu)點7.2.3從“Hadoop+Storm”架構轉(zhuǎn)向Spark架構DStream操作概述7.3DStream操作概述創(chuàng)建StreamingContext對象SparkStreaming工作機制SparkStreaming程序的基本步驟0201DStream03圖
DStream操作示意圖LinesDStreamLinesfromtime0to1RDD@time1Linesfromtime1to2Linesfromtime2to3Linesfromtime3to4RDD@time2RDD@time3RDD@time4WordsDStreamwordsfromtime0to1flatMapoperationwordsfromtime1to2wordsfromtime2to3wordsfromtime3to4RDD@result1RDD@result2RDD@result3RDD@result47.3.1SparkStreaming工作機制ExecutorExecutorDriverProgramSparkContextClusterManagerWorkerNodeCacheTaskTask(Receiver)WorkerNodeCacheTaskTask(Receiver)HDFS、HBaseInputDStreamInputDStream7.3.1SparkStreaming工作機制InputDStream套接字流從Kafka中讀取的輸入流文件流7.3.1SparkStreaming工作機制InputDStreamreceiver組件接收數(shù)據(jù)輸送數(shù)據(jù)掛接7.3.1SparkStreaming工作機制1創(chuàng)建輸入DStream輸入源定義編寫SparkStreaming程序有固定的步驟7.3.2SparkStreaming程序的基本步驟數(shù)據(jù)源頭對文件進行監(jiān)控文件流通過Kafka拋數(shù)據(jù)Kafka數(shù)據(jù)流構建一個RDD隊列RDD隊列流7.3.2SparkStreaming程序的基本步驟2DStream應用轉(zhuǎn)換操作和輸出操作流計算定義7.3.2SparkStreaming程序的基本步驟3streamingContext.start()開始接收數(shù)據(jù)和處理流程7.3.2SparkStreaming程序的基本步驟4等待處理結束streamingContext.awaitTermination()7.3.2SparkStreaming程序的基本步驟5手動結束流計算進程streamingContext.stop()7.3.2SparkStreaming程序的基本步驟怎么能夠創(chuàng)建StreamingContext對象????為什么創(chuàng)建StreamingContext對象?7.3.3創(chuàng)建StreamingContext對象進行SparkCore的RDD編程7.3.3創(chuàng)建StreamingContext對象進行SparkCore的RDD編程SparkContext對象創(chuàng)建7.3.3創(chuàng)建StreamingContext對象spark-shell7.3.3創(chuàng)建StreamingContext對象對象變量名稱叫scSparkContext對象7.3.3創(chuàng)建StreamingContext對象有SparkSession對象SparkSQL編程7.3.3創(chuàng)建StreamingContext對象spark-shell變量名稱:spark7.3.3創(chuàng)建StreamingContext對象scala>importorg.apache.spark.streaming._scala>valssc=newStreamingContext(sc,Seconds(1))導入包7.3.3創(chuàng)建StreamingContext對象
創(chuàng)建StreamingContext對象importorg.apache.spark._importorg.apache.spark.streaming._valconf=newSparkConf().setAppName("TestDStream").setMaster("local[2]")valssc=newStreamingContext(conf,Seconds(1))7.3.3創(chuàng)建StreamingContext對象
創(chuàng)建StreamingContext對象基本輸入源7.4基本輸入源基本輸入源7.4.1文件流7.4.3RDD隊列流7.4.2套接字流7.4.1文件流$cd/usr/local/spark/mycode$mkdirstreaming$cdstreaming$mkdirlogfile$cdlogfile
1.
在spark-shell中創(chuàng)建文件流7.4.1文件流scala>importorg.apache.spark.streaming._scala>valssc=newStreamingContext(sc,Seconds(20))scala>vallines=ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")scala>valwords=lines.flatMap(_.split(""))scala>valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)scala>wordCounts.print()scala>ssc.start()scala>ssc.awaitTermination()
進入spark-shell創(chuàng)建文件流,另外打開一個終端窗口,啟動spark-shell7.4.1文件流//這里省略若干屏幕信息-------------------------------------------Time:1479431100000ms-------------------------------------------//這里省略若干屏幕信息-------------------------------------------Time:1479431120000ms-------------------------------------------//這里省略若干屏幕信息-------------------------------------------Time:1479431140000ms-------------------------------------------輸入ssc.start()以后,程序就開始自動進入循環(huán)監(jiān)聽狀態(tài)在“/usr/local/spark/mycode/streaming/logfile”目錄新建一個log.txt,可在監(jiān)聽窗口中顯示詞頻統(tǒng)計結果7.4.1文件流$cd/usr/local/spark/mycode$mkdirstreaming$cdstreaming$mkdir-psrc/main/scala$cdsrc/main/scala$vimTestStreaming.scala
2.
采用獨立應用程序方式創(chuàng)建文件流7.4.1文件流importorg.apache.spark._importorg.apache.spark.streaming._objectWordCountStreaming{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("WordCountStreaming").setMaster("local[2]")//設置為本地運行模式,2個線程,一個監(jiān)聽,另一個處理數(shù)據(jù)
valssc=newStreamingContext(sparkConf,Seconds(2))//時間間隔為2秒vallines=ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")//這里采用本地文件,當然你也可以采用HDFS文件valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()}}
用vim編輯器新建一個TestStreaming.scala代碼文件7.4.1文件流$cd/usr/local/spark/mycode/streaming$vimsimple.sbt
用vim編輯器新建一個TestStreaming.scala代碼文件name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%%"spark-core"%"3.2.0"libraryDependencies+="org.apache.spark"%"spark-streaming_2.12"%"3.2.0"%"provided"
在simple.sbt文件中輸入以下代碼7.4.1文件流$cd/usr/local/spark/mycode/streaming$/usr/local/sbt/sbtpackage
執(zhí)行sbt打包編譯的命令如下$cd/usr/local/spark/mycode/streaming$/usr/local/spark/bin/spark-submit--class"WordCountStreaming“/usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.12-1.0.jar
打包成功以后,就可以輸入以下命令啟動這個程序7.4.1文件流Shell窗口"/usr/local/spark/mycode/streaming/logfile"log2.txtHellowordHellowordHelloword保存好文件退出vim編輯器7.4.1文件流監(jiān)聽窗口20秒后按Ctrl+C或Ctrl+D停止監(jiān)聽程序7.4.1文件流監(jiān)聽窗口打印出單詞統(tǒng)計信息ABA:撥號B:接起任一方掛掉電話Socket編程原理7.4.2套接字流1Socket工作原理STEP17.4.2套接字流socket()socket()connect()write()read()close()bind()listen()accept()read()write()read()close()阻塞直到有客戶端連接回應數(shù)據(jù)結束連接請求數(shù)據(jù)建立連接處理請求TCP客戶端TCP服務器端7.4.2套接字流2使用套接字流作為數(shù)據(jù)源實現(xiàn)SparkStreaming編程STEP27.4.2套接字流$cd/usr/local/spark/mycode$mkdirstreaming#如果已經(jīng)存在該目錄,則不用創(chuàng)建$cdstreaming$mkdirsocket$cdsocket$mkdir-psrc/main/scala#如果已經(jīng)存在該目錄,則不用創(chuàng)建$cd/usr/local/spark/mycode/streaming/socket/src/main/scala$vimNetworkWordCount.scala#這里使用vim編輯器創(chuàng)建文件
使用套接字流作為數(shù)據(jù)源7.4.2套接字流SparkStreaming程序socket()connect()write()read()close()TCP客戶端相當于7.4.2套接字流socket()connect()write()read()close()TCP客戶端數(shù)據(jù)發(fā)送TCP服務端發(fā)起請求7.4.2套接字流數(shù)據(jù)發(fā)送TCP服務端SparkStreaming組件進行詞頻統(tǒng)計7.4.2套接字流socket()connect()write()read()close()TCP客戶端構建TCP客戶端7.4.2套接字流importorg.apache.spark._importorg.apache.spark.streaming._objectNetworkWordCount{defmain(args:Array[String]){if(args.length<2){System.err.println("Usage:NetworkWordCount<hostname><port>")System.exit(1)}valsparkConf=new
請在NetworkWordCount.scala文件中輸入如下內(nèi)容7.4.2套接字流SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")valsc=newSparkContext(sparkConf)sc.setLogLevel("ERROR")valssc=newStreamingContext(sc,Seconds(1))vallines=ssc.socketTextStream(args(0),args(1).toInt)valwords=lines.flatMap(_.split(""))valwordCounts=words.map(x=>(x,1)).reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()}}
請在NetworkWordCount.scala文件中輸入如下內(nèi)容7.4.2套接字流<hostname><port>客戶端向服務端發(fā)起連接告訴它向哪個主機哪個端口發(fā)起連接7.4.2套接字流StreamingExamplesStreamingExample.scala代碼文件定義在定義在org.apache.spark.examples.streaming7.4.2套接字流StreamingExamples不需要實例化直接用它的靜態(tài)方法單例對象7.4.2套接字流StreamingExamples.setStreamingLogLevels()StreamingExamples.scala代碼文件定義設置日志log4j的級別格式7.4.2套接字流name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%"spark-streaming_2.12"%"3.2.0"%"provided"
在“/usr/local/spark/mycode/streaming/socket”目錄創(chuàng)建simple.sbt文件7.4.2套接字流$cd/usr/local/spark/mycode/streaming/socket$/usr/local/sbt/sbtpackage$cd/usr/local/spark/mycode/streaming/socket$/usr/local/spark/bin/spark-submit\>--class"NetworkWordCount"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>localhost9999
在“/usr/local/spark/mycode/streaming/socket”目錄創(chuàng)建simple.sbt文件7.4.2套接字流“NetworkWordCount”客戶端7.4.2套接字流啟動TCP服務端netcat程序打開終端7.4.2套接字流nc程序$nc-lk9999輸入命令7.4.2套接字流$nc-lk9999啟動監(jiān)聽連續(xù)監(jiān)聽輸入命令7.4.2套接字流向客戶端發(fā)數(shù)據(jù)輸入命令$nc-lk99997.4.2套接字流$nc-lk99997.4.2套接字流Helloword$nc-lk99997.4.2套接字流監(jiān)聽窗口-------------------------------------------Time:1479431100000ms-------------------------------------------(hello,1)(world,1)-------------------------------------------Time:1479431120000ms-------------------------------------------(hadoop,1)-------------------------------------------Time:1479431140000ms-------------------------------------------(spark,1)7.4.2套接字流7.4.2套接字流能不能自己去編寫一個TCP服務端?7.4.2套接字流自己編寫程序$cd/usr/local/spark/mycode/streaming/socket/src/main/scala$vimDataSourceSocket.scala
采用自己編寫的程序產(chǎn)生Socket數(shù)據(jù)源7.4.2套接字流importjava.io.{PrintWriter}import.ServerSocketimportscala.io.Source7.4.2套接字流
采用自己編寫的程序產(chǎn)生Socket數(shù)據(jù)源objectDataSourceSocket{defindex(length:Int)={//返回位于0到length-1之間的一個隨機數(shù)valrdm=newjava.util.Randomrdm.nextInt(length)}defmain(args:Array[String]){if(args.length!=3){System.err.println("Usage:<filename><port><millisecond>")System.exit(1)}valfileName=args(0)//獲取文件路徑vallines=Source.fromFile(fileName).getLines.toList//讀取文件中的所有行的內(nèi)容valrowCount=lines.length//計算出文件的行數(shù)
vallistener=newServerSocket(args(1).toInt)//創(chuàng)建監(jiān)聽特定端口的7.4.2套接字流ServerSocket對象while(true){valsocket=listener.accept()newThread(){overridedefrun={println("Gotclientconnectedfrom:"+socket.getInetAddress)valout=newPrintWriter(socket.getOutputStream(),true)while(true){Thread.sleep(args(2).toLong)//每隔多長時間發(fā)送一次數(shù)據(jù)valcontent=lines(index(rowCount))//從lines列表中取出一個元素println(content)out.write(content+'\n')//寫入要發(fā)送給客戶端的數(shù)據(jù)out.flush()//發(fā)送數(shù)據(jù)給客戶端}socket.close()}}.start()}}}$cd/usr/local/spark/mycode/streaming/socket$/usr/local/sbt/sbtpackage
執(zhí)行sbt打包編譯的命令如下7.4.2套接字流7.4.2套接字流創(chuàng)建文本/usr/local/spark/mycode/streaming/word.txtHellowordHellowordHellowordword.txt$cd/usr/local/spark/mycode/streaming/socket$/usr/local/sbt/sbtpackage
執(zhí)行sbt打包編譯的命令如下$cd/usr/local/spark/mycode/streaming/socket$/usr/local/spark/bin/spark-submit\>--class"DataSourceSocket"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>./word.txt99991000
打包成功以后,啟動DataSourceSocket程序7.4.2套接字流$cd/usr/local/spark/mycode/streaming/socket$/usr/local/spark/bin/spark-submit\>--class"NetworkWordCount"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>localhost9999
下面就可以啟動客戶端,即NetworkWordCount程序7.4.2套接字流啟動成功后,你就會看到,屏幕上不斷打印出詞頻統(tǒng)計信息RDD隊列輸入的數(shù)據(jù)RDDRDDRDDSparkStreaming7.4.3
RDD隊列流TestRDDQueueStream.scalaRDD1秒1秒RDD1秒……RDD隊列7.4.3
RDD隊列流1秒……RDDRDDRDD隊列7.4.3
RDD隊列流StreamingRDDRDDRDD隊列每隔2秒對數(shù)據(jù)進行處理7.4.3
RDD隊列流
新建一個TestRDDQueueStream.scala代碼文件packageorg.apache.spark.examples.streamingimportorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming.StreamingContext._importorg.apache.spark.streaming.{Seconds,StreamingContext}7.4.3
RDD隊列流
新建一個TestRDDQueueStream.scala代碼文件objectQueueStream{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("TestRDDQueue").setMaster("local[2]")valssc=newStreamingContext(sparkConf,Seconds(2))valrddQueue=newscala.collection.mutable.SynchronizedQueue[RDD[Int]]()valqueueStream=ssc.queueStream(rddQueue)valmappedStream=queueStream.map(r=>(r%10,1))valreducedStream=mappedStream.reduceByKey(_+_)reducedStream.print()ssc.start()7.4.3
RDD隊列流
新建一個TestRDDQueueStream.scala代碼文件for(i<-1to10){rddQueue+=ssc.sparkContext.makeRDD(1to100,2)Thread.sleep(1000)}ssc.stop()}}7.4.3
RDD隊列流
sbt打包成功后,執(zhí)行下面命令運行程序$cd/usr/local/spark/mycode/streaming/rddqueue$/usr/local/spark/bin/spark-submit\>--class"org.apache.spark.examples.streaming.QueueStream"\>./target/scala-2.12/simple-project_2.12-1.0.jar7.4.3
RDD隊列流
執(zhí)行上面命令以后,程序就開始運行,就可以看到類似下面的結果-------------------------------------------Time:1479522100000ms-------------------------------------------(4,10)(0,10)(6,10)(8,10)(2,10)(1,10)(3,10)(7,10)(9,10)(5,10)7.4.3
RDD隊列流高級數(shù)據(jù)源提綱123Kafka準備工作Spark準備工作Kafka簡介4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)訂閱消息分發(fā)消息7.5.1Kafka
簡介Kafka:消息分發(fā)系統(tǒng)
起到信息傳遞中樞的作用7.5.1Kafka
簡介Kafka是一個分布式的消息分發(fā)系統(tǒng)7.5.1Kafka
簡介Kafka作為一個信息傳遞的樞紐7.5.1Kafka
簡介Kafka集群............BrokerBroker1Broker2Broker37.5.1Kafka
簡介TopicTopic7.5.1Kafka
簡介Topic1Broker1PartitionPartitionBroker2PartitionPartition.........……Partition7.5.1Kafka
簡介ProducerPdoducerBroker2ConsumerBroker2Consumer消息消費者向Kafkabroker讀取消息的客戶端SparkStreaming7.5.1Kafka
簡介ConsumerGroup每個Consumer只屬于某個ConsumerGroup若不指定groupname則屬于默認的groupConsumerGroup1Comsumer1Comsumer2.........ConsumerGroupZhidingComsumer37.5.1Kafka
簡介邏輯概念7.5.1Kafka
簡介7.5.2Kafka準備工作Kafka準備工作安裝Kafka測試Kafka是否正常工作啟動Kafka/blog/1096-2/“/usr/local/kafka”安裝目錄:安裝Kafka7.5.2Kafka準備工作安裝Kafka$cd~/Downloads$sudotar-zxfkafka_2.12-2.6.0.tgz-C/usr/local$cd/usr/local$sudomvkafka_2.12-2.6.0kafka$sudochown-Rhadoop./kafka7.5.2Kafka準備工作啟動Kafka$cd/usr/local/kafka$./bin/zookeeper-server-start.shconfig/perties不能關閉這個終端窗口關閉這個窗口,會使Zookeeper服務停止7.5.2Kafka準備工作打開第二個終端,輸入下面命令啟動Kafka服務不能關閉這個終端窗口關閉這個窗口,會使kafka服務停止$cd/usr/local/kafka$bin/kafka-server-start.shconfig/perties7.5.2Kafka準備工作打開第三個終端$cd/usr/local/kafka$./bin/kafka-topics.sh--create--zookeeperlocalhost:2181\>--replication-factor1--partitions1\>--topicwordsender然后,可以執(zhí)行如下命令,查看名稱為“wordsender”的Topic是否已經(jīng)成功創(chuàng)建:$./bin/kafka-topics.sh--list--zookeeperlocalhost:21817.5.2Kafka準備工作再新開一個終端(記作“監(jiān)控輸入終端”)$cd/usr/local/kafka$bin/kafka-console-consumer.sh\>--bootstrap-serverlocalhost:9092--topicwordsender7.5.2Kafka準備工作jar包啟動spark-shell7.5.3Spark準備工作高級數(shù)據(jù)源......需要獨立的庫默認不存在jar文件7.5.3Spark準備工作spark-streaming-kafka-0-10_2.12-3.2.0.jar
spark-token-provider-kafka-0-10_2.12-3.2.0.jar7.5.3Spark準備工作復制/usr/local/spark/jarsspark-streaming-kafka-0-10_2.12-3.2.0.jar
spark-token-provider-kafka-0-10_2.12-3.2.0.jar7.5.3Spark準備工作Kafka安裝目錄的libs目錄下的所有jar文件7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源編寫消費者程序編寫生產(chǎn)者程序編譯打包程序運行程序010203047.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
1.
編寫生產(chǎn)者(producer)程序:編寫KafkaWordProducer程序$cd/usr/local/spark/mycode$mkdirkafka$cdkafka$mkdir-psrc/main/scala$cdsrc/main/scala$vimKafkaWordProducer.scala7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
在KafkaWordProducer.scala中輸入以下代碼importjava.util.HashMapimportducer.{KafkaProducer,ProducerConfig,ProducerRecord}importorg.apache.spark.SparkConfimportorg.apache.spark.streaming._importorg.apache.spark.streaming.kafka010._7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源objectKafkaWordProducer{defmain(args:Array[String]){if(args.length<4){System.err.println("Usage:KafkaWordProducer<metadataBrokerList><topic>"+"<messagesPerSec><wordsPerMessage>")System.exit(1)}valArray(brokers,topic,messagesPerSec,wordsPerMessage)=args//Zookeeperconnectionpropertiesvalprops=newHashMap[String,Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"mon.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"mon.serialization.StringSerializer")valproducer=newKafkaProducer[String,String](props)7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源//Sendsomemessageswhile(true){(1tomessagesPerSec.toInt).foreach{messageNum=>valstr=(1towordsPerMessage.toInt).map(x=>scala.util.Random.nextInt(10).toString).mkString("")print(str)println()valmessage=newProducerRecord[String,String](topic,null,str)producer.send(message)}Thread.sleep(1000)}}}7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
2.
編寫消費者程序:在當前目錄下創(chuàng)建KafkaWordCount.scala代碼文件importorg.apache.spark._importorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming._importorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.StreamingContext._importorg.apache.spark.streaming.kafka010.KafkaUtilsimportmon.serialization.StringDeserializerimportorg.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimportorg.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源objectKafkaWordCount{defmain(args:Array[String]){valsparkConf=newSparkConf().setAppName("KafkaWordCount").setMaster("local[2]")valsc=newSparkContext(sparkConf)sc.setLogLevel("ERROR")valssc=newStreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint")//設置檢查點,如果存放在HDFS上面,則寫成類似ssc.checkpoint("/user/hadoop/checkpoint")這種形式,但是,要啟動HadoopvalkafkaParams=Map[String,Object]("bootstrap.servers"->"localhost:9092","key.deserializer"->classOf[StringDeserializer],"value.deserializer"->classOf[StringDeserializer],"group.id"->"use_a_separate_group_id_for_each_stream","auto.offset.reset"->"latest","mit"->(true:java.lang.Boolean))7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源valtopics=Array("wordsender")valstream=KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,Subscribe[String,String](topics,kafkaParams))stream.foreachRDD(rdd=>{valoffsetRange=rdd.asInstanceOf[HasOffsetRanges].offsetRangesvalmaped:RDD[(String,String)]=rdd.map(record=>(record.key,record.value))vallines=maped.map(_._2)valwords=lines.flatMap(_.split(""))valpair=words.map(x=>(x,1))valwordCounts=pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}}7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
3.
編譯打包程序:創(chuàng)建simple.sbt文件$cd/usr/local/spark/mycode/kafka/$vimsimple.sbt7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
在simple.sbt中輸入以下代碼name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%%"spark-core"%"3.2.0"libraryDependencies+="org.apache.spark"%%"spark-streaming"%"3.2.0"%"provided"libraryDependencies+="org.apache.spark"%%"spark-streaming-kafka-0-10"%"3.2.0"libraryDependencies+="org.apache.kafka"%"kafka-clients"%"2.6.0"7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源3.編譯打包程序:進行打包編譯$cd/usr/local/spark/mycode/kafka/$/usr/local/sbt/sbtpackage7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
4.
運行程序:打開一個終端,運行“KafkaWordProducer”程序$cd/usr/local/spark/mycode/kafka/$/usr/local/spark/bin/spark-submit\>--class"KafkaWordProducer"\>./target/scala-2.12/simple-project_2.12-1.0.jar\>localhost:9092wordsender357.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
執(zhí)行上面命令后,屏幕上會不斷滾動出現(xiàn)新的單詞,如下75073282130129280909900686616583677……不要關閉這個終端窗口讓它一直不斷發(fā)送單詞7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源
新打開一個終端,執(zhí)行下面命令,運行KafkaWordCount程序$cd/usr/local/spark/mycode/kafka/$/usr/local/spark/bin/spark-submit\>--class"KafkaWordCount"\>./target/scala-2.12/simple-project_2.12-1.0.jar7.5.4編寫SparkStreaming程序使用Kafka數(shù)據(jù)源屏幕顯示(4,5)(8,12)(6,14)(0,19)(2,11)(7,20)(5,10)(9,9)(3,9)(1,11)轉(zhuǎn)換操作提綱12DStream有狀態(tài)轉(zhuǎn)換操作DStream無狀態(tài)轉(zhuǎn)換操作???為什么叫無狀態(tài)轉(zhuǎn)換操作?7.6.1DStream無狀態(tài)轉(zhuǎn)換操作t/s當前批次只針對當前一個批次進行統(tǒng)計102030405060707.6.1DStream無狀態(tài)轉(zhuǎn)換操作10203040506070t/st/s7.6.1DStream無狀態(tài)轉(zhuǎn)換操作t/s進行新的統(tǒng)計與前一批次無關不會進行累計102030405060707.6.1DStream無狀態(tài)轉(zhuǎn)換操作7.6.2DStream有狀態(tài)轉(zhuǎn)換操作滑動窗口轉(zhuǎn)換操作updateStateByKey操作0102DStreamtime1time2time3time4time5windowattime1windowattime3windowattime5OriginalDstreamWindowedDstream滑動窗口轉(zhuǎn)換操作7.6.2DStream有狀態(tài)轉(zhuǎn)換操作time1time2time3time4time5windowattime1windowattime3windowattime5OriginalDstreamWindowedDstream滑動窗口轉(zhuǎn)換操作7.6.2DStream有狀態(tài)轉(zhuǎn)換操作設定滑動窗口大小設定滑動窗口時間間隔大小兩個參數(shù)7.6.2DStream有狀態(tài)轉(zhuǎn)換操作設定滑動窗口時間間隔大小每隔多長時間執(zhí)行一次計算7.6.2DStream有狀態(tài)轉(zhuǎn)換操作time1time2time3time4time5函數(shù)func進行聚合數(shù)值time6time7OriginalDstream7.6.2DStream有狀態(tài)轉(zhuǎn)換操作time1time2time3time4time5函數(shù)func進行聚合數(shù)值time6time7OriginalDstream數(shù)值7.6.2DStream有狀態(tài)轉(zhuǎn)換操作time1time2time3time4time5函數(shù)func進行聚合數(shù)值time6time7OriginalDstream數(shù)值數(shù)值7.6.2DStream有狀態(tài)轉(zhuǎn)換操作time1time2time3time4time5數(shù)值time6time7OriginalDstream數(shù)值數(shù)值單元素流7.6.2DStream有狀態(tài)轉(zhuǎn)換操作7.6.2DStream有狀態(tài)轉(zhuǎn)換操作updateStateByKey操作在跨批次之間維護狀態(tài)怎么進行跨批次維護呢????updateStateByKey操作7.6.2DStream有狀態(tài)轉(zhuǎn)換操作有狀態(tài)地進行維護NetworkWordCountStateful.scala7.6.2DStream有狀態(tài)轉(zhuǎn)換操作objectNetworkWordCountStateful{defmain(args:Array[String]){//定義狀態(tài)更新函數(shù)valupdateFunc=(values:Seq[Int],state:Option[Int])=>{valcurrentCount=values.foldLeft(0)(_+_)valpreviousCount=state.getOrElse(0)Some(currentCount+previousCount)}
valsparkConf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")valsc=newSparkContext(sparkConf)sc.setLogLevel("ERROR")7.6.2DStream有狀態(tài)轉(zhuǎn)換操作valssc=newStreamingContext(sc,Seconds(5))ssc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")//設置檢查點,檢查點具有容錯機制vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valwordDstream=words.map(x=>(x,1))valstateDstream=wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()ssc.start()ssc.awaitTermination()}}7.6.2DStream有狀態(tài)轉(zhuǎn)換操作輸出操作提綱12把DStream寫入到MySQL數(shù)據(jù)庫中把DStream輸出到文本文件中7.7.1把DStream輸出到文本文件中importorg.apache.spark._importorg.apache.spark.streaming._objectNetworkWordCountStatef
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025蝦苗培育技術創(chuàng)新對誘捕漁業(yè)可持續(xù)發(fā)展影響分析評估報告
- 2025荷蘭機械制造業(yè)市場供需現(xiàn)狀診斷及投資潛力規(guī)劃研究報告
- 2025荷蘭制造業(yè)市場供需格局及投資機會評估規(guī)劃分析研究報告
- 2025荷蘭光伏產(chǎn)業(yè)技術迭代與創(chuàng)新服務平臺建設深度研究報告
- 2025英國智能交通系統(tǒng)研發(fā)技術現(xiàn)狀市場分析研究評估發(fā)展策略報告
- 2025北理工長三院自旋隧穿微機電傳感芯片團隊招聘筆試備考重點題庫及答案解析
- 2025年哈爾濱鐵道職業(yè)技術學院公開招聘教師20人筆試備考重點試題及答案解析
- 2025湖南永州市永華高級中學高中教師招聘筆試備考重點試題及答案解析
- 2025湖北恩施州宣恩縣園投人力資源服務有限公司招聘湖北楚墨文化傳媒有限公司人員1人筆試備考重點試題及答案解析
- 一年級下冊第六單元以內(nèi)的加法和減法一教案
- 2025浙江寧波市梅山鐵路有限公司招聘3人備考考點試題及答案解析
- 2025安徽淮北市消防救援支隊招聘政府專職消防文員17人考試歷年真題匯編帶答案解析
- 2025湖南日報融媒傳播有限公司招聘7人筆試考試參考試題及答案解析
- 2025年法醫(yī)學案例分析與判斷及答案解析
- 股東借款協(xié)議書范本
- CCAA合格評定基礎重點資料
- 護理人文關懷與醫(yī)患溝通技巧
- 北京市順義區(qū)2024-2025學年八年級上學期期末考試英語試卷
- 《化工企業(yè)可燃液體常壓儲罐區(qū)安全管理規(guī)范》解讀課件
- 2025至2030等靜壓行業(yè)發(fā)展研究與產(chǎn)業(yè)戰(zhàn)略規(guī)劃分析評估報告
- 聽障兒童家庭康復訓練
評論
0/150
提交評論