版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
大數(shù)據(jù)分析與實(shí)戰(zhàn)項(xiàng)目7基于StructuredStreaming的智慧交通數(shù)據(jù)處理某智慧交通建設(shè)項(xiàng)目中,針對(duì)實(shí)時(shí)采集的卡口監(jiān)控?cái)?shù)據(jù),決定采用Spark新一代流計(jì)算引擎StructuredStreaming,完成數(shù)據(jù)的實(shí)時(shí)處理。利用物聯(lián)網(wǎng)、大數(shù)據(jù)等技術(shù),解決“安全監(jiān)管壓力大、交通擁堵整治難、應(yīng)急事件處置慢、分析研判不智能”等痛點(diǎn),成為行業(yè)應(yīng)用熱點(diǎn)。情境導(dǎo)入Spark項(xiàng)目分解Spark序號(hào)任務(wù)任務(wù)說(shuō)明1統(tǒng)計(jì)正常工作的監(jiān)控設(shè)備數(shù)了解StructuredStreaming的工作原理,編寫簡(jiǎn)單的流計(jì)算程序統(tǒng)計(jì)各卡口正常狀態(tài)的監(jiān)控設(shè)備數(shù)。2找出超速通過(guò)卡口的車輛采用Kafka作為數(shù)據(jù)源,采集車輛通過(guò)卡口時(shí)的車速,與本卡口的最高限速進(jìn)行比對(duì),找出超速通過(guò)的車輛。3統(tǒng)計(jì)車輛的平均車速計(jì)算各卡口10分鐘內(nèi)的平均通過(guò)速度(每2分鐘更新一次),要求考慮延遲到達(dá)的數(shù)據(jù)及重復(fù)數(shù)據(jù)。4數(shù)據(jù)處理結(jié)果寫入MySQL為了便于后續(xù)的處理分析,將StructuredStreaming處理完畢的數(shù)據(jù)寫入到MySQL數(shù)據(jù)庫(kù)中。能夠利用DataFrame的相關(guān)操作,完成流數(shù)據(jù)的處理。了解StructuredStreaming原理,根據(jù)Kafka等數(shù)據(jù)源,創(chuàng)建流式DataFrame。使用writestream等方法,將處理后的數(shù)據(jù)輸出到File、Kafka及數(shù)據(jù)庫(kù)。123學(xué)習(xí)目標(biāo)Spark項(xiàng)目7
基于StructuredStreaming的智慧交通數(shù)據(jù)處理Spark任務(wù)1統(tǒng)計(jì)正常工作的監(jiān)控設(shè)備數(shù)找出超速通過(guò)卡口的車輛統(tǒng)計(jì)車輛通過(guò)的平均速度任務(wù)2任務(wù)3數(shù)據(jù)處理的結(jié)果寫入MySQL任務(wù)4任務(wù)分析Spark每個(gè)交通卡口均包含若干監(jiān)控?cái)z像頭,這些攝像頭需要定時(shí)發(fā)送設(shè)備狀態(tài)信息到后臺(tái);使用Netcat工具發(fā)出設(shè)備狀態(tài)數(shù)據(jù)(數(shù)據(jù)樣式為“卡口ID,監(jiān)控設(shè)備ID,狀態(tài)碼”,如狀態(tài)碼為100,則表示設(shè)備正常,其他狀態(tài)碼均為異常),要求使用StructuredStreaming計(jì)算每個(gè)卡口正常工作的監(jiān)控設(shè)備數(shù)量。SparkStreaming的不足Spark(1)用批處理的思想處理流式數(shù)據(jù),延遲高,不能做的真正的實(shí)時(shí)。(2)API基于底層RDD,不直接支持簡(jiǎn)單的SQL。(3)以數(shù)據(jù)處理時(shí)間基準(zhǔn),難以支持EventTime(事件發(fā)生的時(shí)間,簡(jiǎn)稱事件時(shí)間)。(4)批處理、流處理的API不一致。StructuredStreaming編程模型Spark核心思想是將流數(shù)據(jù)視為一張可以不斷添加數(shù)據(jù)的表(UnboundedTable,可以“無(wú)限”擴(kuò)充的無(wú)界表),每個(gè)新到達(dá)的流數(shù)據(jù)會(huì)被添加到這個(gè)表中(作為表的新行)。StructuredStreaming編程模型Spark基于StructuredStreaming的WordCount計(jì)算演示:編寫第一個(gè)StructuredStreaming程序Spark確定輸入源,創(chuàng)建流式DataFrame(無(wú)界表)vallines=spark.readStream.format("socket")
.option("host","localhost").option("port",9999).load()定義流計(jì)算的處理過(guò)程vallinesDS=lines.as[String]//轉(zhuǎn)為Dataset[String],便于后續(xù)處理valwords=linesDS.flatMap(x=>x.split(""))//包含一個(gè)名為value的列valwordCounts=words.groupBy("value").count()//分組、統(tǒng)計(jì)啟動(dòng)流式計(jì)算valquery=wordCounts.writeStream
.outputMode("complete").format("console").start()IDEA下編寫結(jié)構(gòu)化流處理程序Sparkprom.xml中添加如下依賴(SparkSQL、SparkCore)代碼中導(dǎo)入隱式轉(zhuǎn)換:importspark.implicits._代碼最后,添加等待流計(jì)算結(jié)束的語(yǔ)句:query.awaitTermination()相關(guān)知識(shí)小結(jié)SparkStructuredStreaming的思路:新數(shù)據(jù)到達(dá)添加到無(wú)界表中,對(duì)新數(shù)據(jù)進(jìn)行處理(按照DataFrame的方式),處理完畢后添加到Result結(jié)果中。readStream讀取流式數(shù)據(jù),創(chuàng)建流式DataFrame(無(wú)界表)。根據(jù)知識(shí)儲(chǔ)備的相關(guān)知識(shí),讀取Socket模擬的交通流數(shù)據(jù),統(tǒng)計(jì)各卡口正常的設(shè)備數(shù)量。任務(wù)實(shí)施項(xiàng)目7
基于StructuredStreaming的智慧交通數(shù)據(jù)處理Spark任務(wù)1統(tǒng)計(jì)正常工作的監(jiān)控設(shè)備數(shù)找出超速通過(guò)卡口的車輛統(tǒng)計(jì)車輛通過(guò)的平均速度任務(wù)2任務(wù)3數(shù)據(jù)處理的結(jié)果寫入MySQL任務(wù)4任務(wù)分析Spark使用Kafka作為數(shù)據(jù)源,向Kafka某主題中添加數(shù)據(jù)(數(shù)據(jù)樣式為“卡口ID,監(jiān)控設(shè)備ID,車牌號(hào),通過(guò)速度,車輛類型”);每個(gè)卡口都有自己的限速,如果車輛通過(guò)卡口時(shí)速度超過(guò)限速,則輸出相關(guān)信息。由文件生成StructuredStreamingSpark在流數(shù)據(jù)處理應(yīng)用中,經(jīng)常出現(xiàn)這樣場(chǎng)景:Flume(或Sqoop)不斷地將小文件(數(shù)據(jù)文件、服務(wù)器日志等)上傳到HDFS目錄,我們需要監(jiān)控該目錄,對(duì)目錄下的小文件開展實(shí)時(shí)處理。vallines=spark.readStream.format("csv")//csv、json等數(shù)據(jù)格式.load(path)//文件的路徑
lines就是一個(gè)DataFrame,后續(xù)按照SparkSQL中的方式進(jìn)行處理即可。由Kafka生成StructuredStreamingSparkKafka是流式數(shù)據(jù)處理的最主要數(shù)據(jù)源,StructuredStreaming同樣提供了訂閱Kafka主題、消費(fèi)其數(shù)據(jù)的能力。valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092")//指定kafka服務(wù)器.option("subscribe","structuredStreaming")//指定讀取的kafka主題.load()Kafka數(shù)據(jù)以key-value形式存儲(chǔ),獲取其中value即可(轉(zhuǎn)為字符串類型)
vallines=df.selectExpr("CAST(valueASSTRING)")
StructuredStreaming的操作SparkStructuredStreaming的數(shù)據(jù)抽象為DataFrame/Dataset,與SparkSQL一致;因此,SparkSQL中的大多數(shù)操作亦適用于StructuredStreaming;可以在StructuredStreaming的流式DataFrame中使用select、where、filter等檢查查詢操作,也可以使用groupBy、count、sum等聚合操作。輸出模式的選擇SparkStructuredStreaming處理后的結(jié)果,可以有以下3中輸出模式:(1)append模式:StructuredStreaming默認(rèn)的輸出模式;該模式下,只有上一次計(jì)算結(jié)束后,結(jié)果集(result)中新增加的行才會(huì)被輸出;該模式不能包含聚合操作。(2)complete模式:該模式下,輸出當(dāng)前批次為止所有的統(tǒng)計(jì)結(jié)果;該模式下的計(jì)算過(guò)程,必須包含聚合操作。(3)update模式:處理完一個(gè)批次的數(shù)據(jù)后,只輸出與之前批次相比變動(dòng)的內(nèi)容(新增或者更新);如果計(jì)算過(guò)程沒有聚合操作,該模式與append等效。StructuredStreaming可以讀取File、Kafka等數(shù)據(jù)源,生成流式DataFrame;數(shù)據(jù)抽象為DataFrame/Dataset,按照SparkSQL中的方式進(jìn)行處理;輸出模式:append、complete、updateSpark讀取Kafka數(shù)據(jù),結(jié)合每個(gè)卡口的限速,輸出超速車輛的信息。任務(wù)實(shí)施相關(guān)知識(shí)小結(jié)Spark項(xiàng)目7
基于StructuredStreaming的智慧交通數(shù)據(jù)處理Spark任務(wù)1統(tǒng)計(jì)正常工作的監(jiān)控設(shè)備數(shù)找出超速通過(guò)卡口的車輛統(tǒng)計(jì)車輛通過(guò)的平均速度任務(wù)2任務(wù)3數(shù)據(jù)處理的結(jié)果寫入MySQL任務(wù)4任務(wù)分析Spark智慧城市交通建設(shè)中需要及時(shí)了解各監(jiān)控卡口的平均車速(單位時(shí)間內(nèi)通過(guò)該卡口的所有車輛平均時(shí)速),從而合理規(guī)劃交通設(shè)施、安排疏導(dǎo)力量。通過(guò)Socket采集交通監(jiān)控?cái)?shù)據(jù)(數(shù)據(jù)樣式:“時(shí)間戳,卡口ID,設(shè)備ID,車牌號(hào),速度”),計(jì)算各卡口10分鐘內(nèi)的平均通過(guò)速度(每2分鐘更新一次),要求考慮延遲到達(dá)的數(shù)據(jù)及重復(fù)數(shù)據(jù)。基于窗口的聚合Spark但很多時(shí)候,希望以事件時(shí)間為標(biāo)準(zhǔn),開展數(shù)據(jù)的統(tǒng)計(jì)分析;例如因?yàn)榫W(wǎng)絡(luò)延遲等因素,某些物聯(lián)網(wǎng)設(shè)備產(chǎn)生事件(數(shù)據(jù))的時(shí)間要早于Spark接收到該事件(數(shù)據(jù))的時(shí)間,但我們還是希望能夠根據(jù)事件時(shí)間來(lái)處理數(shù)據(jù)。StructuredStreaming提供了window窗口操作,它可以依據(jù)事件時(shí)間,將流式數(shù)據(jù)放置到非重疊的“桶”中;如果要開展聚合操作,則是對(duì)桶內(nèi)的數(shù)據(jù)執(zhí)行聚合。
基于窗口的聚合Spark案例:每輛汽車通過(guò)卡口時(shí)都會(huì)產(chǎn)生帶有時(shí)間戳的數(shù)據(jù)(樣式:“時(shí)間戳,卡口ID,監(jiān)控設(shè)備ID,車牌號(hào),車速”),要求統(tǒng)計(jì)10分鐘內(nèi)通過(guò)的車輛總數(shù)。valvehicles=lines.as[String]//lines為從socket端口讀取的數(shù)據(jù),為DataFrame.map(x=>{valarr=x.split(",")//按照逗號(hào)進(jìn)行切割valdate=sdf.parse(arr(0))//安裝設(shè)定的格式,將字符串轉(zhuǎn)為Datevalts=newTimestamp(date.getTime())//data轉(zhuǎn)換時(shí)間戳,表示事件時(shí)間(ts,arr(1),arr(2),arr(3),arr(4))//轉(zhuǎn)為(時(shí)間戳,卡口ID,監(jiān)控設(shè)備ID,車牌號(hào),車速)}).toDF("timestamp","checkID","equipmentID","carNO","speed")基于窗口的聚合Spark案例:每輛汽車通過(guò)卡口時(shí)都會(huì)產(chǎn)生帶有時(shí)間戳的數(shù)據(jù)(樣式:“時(shí)間戳,卡口ID,監(jiān)控設(shè)備ID,車牌號(hào),車速”),要求統(tǒng)計(jì)10分鐘內(nèi)通過(guò)的車輛總數(shù)。valvehiclesCounts=vehicles.groupBy(window($"timestamp","10minutes","2minutes"))//按照窗口分組.count()//計(jì)算每組內(nèi)的元素?cái)?shù)量(即為車輛數(shù))valresult=vehiclesCounts.select("window.start","window.end","count")//抽取3列.orderBy("start")//按照開始時(shí)間排序遲到數(shù)據(jù)與水印Spark由于網(wǎng)絡(luò)延遲等因素,采用事件時(shí)間方式處理流式數(shù)據(jù)均面臨數(shù)據(jù)遲到問(wèn)題。Spark內(nèi)部可以保留中間狀態(tài),從而不丟棄遲到數(shù)據(jù);但如果一個(gè)流式計(jì)算運(yùn)行時(shí)間較長(zhǎng)(幾天、幾周甚至更長(zhǎng)),系統(tǒng)內(nèi)各中間狀態(tài)積累數(shù)據(jù)量便會(huì)持續(xù)增加,導(dǎo)致內(nèi)存等資源不斷被占用,系統(tǒng)穩(wěn)定性下降為了及時(shí)釋放資源,Spark允許用戶通過(guò)WarterMark水印的方式來(lái)決定保留多長(zhǎng)時(shí)間的舊數(shù)據(jù)狀態(tài),即決定最大延遲閾值。遲到數(shù)據(jù)與水印Spark案例:每輛汽車通過(guò)卡口時(shí)都會(huì)產(chǎn)生帶有時(shí)間戳的數(shù)據(jù)(樣式:“時(shí)間戳,卡口ID,監(jiān)控設(shè)備ID,車牌號(hào),車速”),要求統(tǒng)計(jì)10分鐘內(nèi)通過(guò)的車輛總數(shù)。valvehiclesCounts=vehicles.withWatermark("timestamp","2minutes")//添加水印,最大延遲2分鐘.groupBy(window($"timestamp","10minutes","2minutes")).count()重復(fù)數(shù)據(jù)的處理Spark可能某些因素導(dǎo)致數(shù)據(jù)源多次發(fā)送相同的數(shù)據(jù),或者因?yàn)閭鬏旀溌返纫蛩赝粭l數(shù)據(jù)多次到達(dá),這就需要數(shù)據(jù)去重操作。valvehiclesCounts=vehicles.withWatermark("timestamp","2minutes").dropDuplicates()//完成去重.groupBy(window($"timestamp","10minutes","2minutes")).count()groupBy(window())可以實(shí)現(xiàn)基于窗口的聚合操作;采用水印方式處理遲到數(shù)據(jù)dropDuplicate方法去除重復(fù)數(shù)據(jù);Spark綜合利用本任務(wù)中的知識(shí)儲(chǔ)備,借助窗口、水印,計(jì)算各卡口10分鐘內(nèi)的平均通過(guò)速度。任務(wù)實(shí)施相關(guān)知識(shí)小結(jié)Spark項(xiàng)目7
基于StructuredStreaming的智慧交通數(shù)據(jù)處理Spark任務(wù)1統(tǒng)計(jì)正常工作的監(jiān)控設(shè)備數(shù)找出超速通過(guò)卡口的車輛統(tǒng)計(jì)車輛通過(guò)的平均速度任務(wù)2任務(wù)3數(shù)據(jù)處理的結(jié)果寫入MySQL任務(wù)4任務(wù)分析Spark讀取Socket數(shù)據(jù)源(9999端口),找出超速車輛(假設(shè)城區(qū)限速70公里/小時(shí)),將超速通過(guò)的車輛信息寫入到MySQL數(shù)據(jù)庫(kù)及本地文件系統(tǒng)(json格式)。FileSink輸出數(shù)據(jù)SparkStructuredStreaming計(jì)算結(jié)果可以按照CSV、JSON、Parquet等形式,寫入本地或HDFS目錄中。例如針對(duì)卡口的車輛通行信息,將“時(shí)間戳、卡口ID、監(jiān)控設(shè)備ID、車牌號(hào)、車速”以JSON文件形式保存到本地目錄。vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()FileSink輸出數(shù)據(jù)Sparkvalvehicles=lines.as[String]//DataFrame轉(zhuǎn)為Dataset.map(x=>{valarr=x.split(",")//String元素按照逗號(hào)切割(arr(0),arr(1),arr(2),arr(3),arr(4).toInt)//生成元組}).toDF("timestamp","checkID","equipmentID","carNO","speed")FileSink輸出數(shù)據(jù)Sparkvalquery=vehicles.writeStream.outputMode("append")//寫入文件時(shí),僅支持append模式.format("json")//文件的格式為json.option("path","file:///home/hadoop/data/filesink")//文件保存的目錄.option("checkpointLocation","streamingsink")//設(shè)置檢查點(diǎn).start()//啟動(dòng)流式計(jì)算KafkaSink輸出數(shù)據(jù)SparkStructuredStreaming清洗處理完畢后,也可以寫入到Kafka主題中,以供其他程序(系統(tǒng))使用。valquery=vehicles.writeStream.outputMode("append")
.format("kafka")//寫入kafka
.option("kafka.bootstrap.servers","localhost:9092")//設(shè)置kafka服務(wù)器
.option("topic","kafkasink")//指定kafka主題.option("checkpointLocation","streamingsink")//指定檢查點(diǎn)路徑.start()foreachBatch、foreach輸出數(shù)據(jù)SparkforeachBatch、foreach具有高度的靈活性,用戶可以自由決定數(shù)據(jù)寫入何處、如何寫;兩者應(yīng)用場(chǎng)景略有不同,foreachBatch以微批為單位進(jìn)行任意的處理、輸出;foreach則是作用在流式DataFrame的每一行上。valquery=vehicles.writeStream.
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 生日蛋糕合同范本
- 苗木園合同范本
- 蔬菜訂合同范本
- 褲子加工合同范本
- 認(rèn)干爸的協(xié)議書
- 設(shè)備售后協(xié)議書
- 設(shè)備索賠協(xié)議書
- 請(qǐng)?jiān)律﹨f(xié)議合同
- 建筑訂金合同范本
- 居間貸款合同協(xié)議
- 混凝土及外加劑知識(shí)培訓(xùn)課件
- 1-視頻交換矩陣
- 收養(yǎng)政策知識(shí)培訓(xùn)內(nèi)容課件
- 2025-2026學(xué)年統(tǒng)編版一年級(jí)上冊(cè)道德與法治教學(xué)計(jì)劃
- 《機(jī)器學(xué)習(xí)》課件-第6章 強(qiáng)化學(xué)習(xí)
- 早產(chǎn)合并新生兒呼吸窘迫綜合征護(hù)理查房
- 警校偵查專業(yè)畢業(yè)論文
- 生態(tài)教育心理干預(yù)-洞察及研究
- 票務(wù)提成管理辦法
- 肺炎克雷伯菌肺炎護(hù)理查房
- 人教版(2024)七年級(jí)上冊(cè)英語(yǔ)Unit1-7各單元語(yǔ)法專項(xiàng)練習(xí)題(含答案)
評(píng)論
0/150
提交評(píng)論