版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
大數(shù)據(jù)分析與實戰(zhàn)項目6SparkStreaming處理用戶行為數(shù)據(jù)針對電商平臺用戶的瀏覽、收藏、購物車、下單等基本行為,借助成熟的SparkStreaming模塊,模擬開展實時分析與處理,為電商平臺運維提供參考。黨的二十大作出加快建設(shè)網(wǎng)絡強國、數(shù)字中國的重大部署。隨著移動商務、輿情監(jiān)控、傳感監(jiān)控、在線教育等領(lǐng)域的發(fā)展,對數(shù)據(jù)實時處理的需求日漸增強。情境導入Spark項目分解Spark序號任務任務說明1初探廣告點擊行為使用Necat模擬發(fā)出用戶點擊行為數(shù)據(jù),SparkStreaming程序捕獲數(shù)據(jù),統(tǒng)計各個廣告頁面的點擊量。2識別無效的廣告點擊識別無效點擊,輸出這些無效點擊用戶的ID。3統(tǒng)計一分鐘內(nèi)的訂單數(shù)量統(tǒng)計1分鐘內(nèi)用戶的下單數(shù)量。4電商用戶的行為分析統(tǒng)計1分鐘內(nèi)用戶下單、加購、收藏次數(shù),并將下單行為保存到MySQL數(shù)據(jù)庫表中。能編寫無狀態(tài)轉(zhuǎn)換、基于窗口的有狀態(tài)轉(zhuǎn)換流式處理程序。了解SparkStreaming原理,能夠讀取Socket、File、Kafka等數(shù)據(jù)源創(chuàng)建DStream。能按照要求輸出流式計算的結(jié)果(如輸出到MySQL數(shù)據(jù)庫中。123學習目標Spark項目6
SparkStreaming處理用戶行為數(shù)據(jù)Spark任務1初探廣告點擊行為識別無效的廣告點擊統(tǒng)計1分鐘內(nèi)的訂單數(shù)量任務2任務3電商用戶的行為分析任務4任務分析Spark通常,電商平臺首頁均開設(shè)一定的廣告服務,入駐商家可以購買廣告位進行商品推廣。在重點營銷時段,需時刻關(guān)注廣告位的費效。本任務要求每10秒鐘,統(tǒng)計一次用戶點擊量,據(jù)此調(diào)整營銷策略。認識流數(shù)據(jù)與SparkStreamingSpark數(shù)據(jù)分類:靜態(tài)數(shù)據(jù)、動態(tài)數(shù)據(jù)(流數(shù)據(jù))。靜態(tài)數(shù)據(jù)是一段較長的時間內(nèi)相對穩(wěn)定的數(shù)據(jù),一般采用批處理方式進行計算,可以在充裕的時間內(nèi)對海量數(shù)據(jù)進行批量處理(即可以容忍較高的時間延遲)。流數(shù)據(jù)則是以大量、快速、時變的流形式持續(xù)到達,因此流數(shù)據(jù)是不斷變化的數(shù)據(jù);流數(shù)據(jù)是時間上無上限的數(shù)據(jù)集合,不能采用傳統(tǒng)的批處理方式,必須實時計算。SparkStreaming的工作原理SparkSparkStreaming接收實時輸入的數(shù)據(jù)流后,將數(shù)據(jù)流按時間片(通常為秒級)為單位,拆分為一個個小的批次數(shù)據(jù);然后這些小批次的數(shù)據(jù)交給Spark引擎,以類似批處理的方式處理每個時間片數(shù)據(jù)。初步體驗SparkSQLSpark將輸入數(shù)據(jù)按照實際片段(例如1秒鐘)分割成一段一段的離散數(shù)據(jù)流(稱之為DStream,DiscretizedStream);每一個片段內(nèi)的數(shù)據(jù)都會變成一個RDD,然后將DStream流處理操作轉(zhuǎn)變?yōu)獒槍DD的操作。編寫第1個SparkStreaming程序Spark例子:利用Netcat工具向9999端口發(fā)送數(shù)據(jù)流(文本數(shù)據(jù)),使用SparkStreaming監(jiān)聽9999端口的數(shù)據(jù)流,并實時地進行詞頻統(tǒng)計(每10秒中,計算獲取的單詞數(shù)量)scala>importorg.apache.spark.streaming._//導入包scala>valssc=newStreamingContext(sc,Seconds(10))//創(chuàng)建上下文環(huán)境scala>vallines=ssc.socketTextStream("localhost",9999)//創(chuàng)建DStream,監(jiān)聽9999端口數(shù)據(jù)編寫第1個SparkStreaming程序Sparkscala>valwords=lines.flatMap(x=>x.split(""))//按照空格切分,產(chǎn)生新DStreamscala>valpairs=words.map(x=>(x,1))//轉(zhuǎn)換為(word,1)鍵值對形式scala>valwordsCounts=pairs.reduceByKey(_+_)//計算詞頻,注意:返回一個新的DStreamscala>wordsCounts.print()//使用DStream的print方法,打印結(jié)果scala>ssc.start()//啟動上述計算邏輯,等待結(jié)束。編寫第1個SparkStreaming程序Sparkhadoop@zsz-VirtualBox:~$nc-lk9999#啟動Netcat,發(fā)送下面的文本數(shù)據(jù):IlikeSparkSparkSparkisgoodIlikeSparkSparkispowerful相關(guān)知識小結(jié)SparkSparkStreaming原理:按照時間間隔,將流數(shù)據(jù)切分為片段,每個片段為一個RDD;按照RDD的處理方式進行處理。SparkStreaming編程:(1)讀取數(shù)據(jù)源,創(chuàng)建DStream;(2)執(zhí)行轉(zhuǎn)換操作;(3)輸出結(jié)果;(4)start開啟流計算。根據(jù)知識儲備的相關(guān)知識,讀取Socket流數(shù)據(jù),每10秒鐘,統(tǒng)計一次用戶點擊量。任務實施項目6
SparkStreaming處理用戶行為數(shù)據(jù)Spark任務1初探廣告點擊行為識別無效的廣告點擊統(tǒng)計1分鐘內(nèi)的訂單數(shù)量任務2任務3電商用戶的行為分析任務4任務分析Spark用戶通過APP或者瀏覽器點擊廣告的過程中,可能出現(xiàn)誤操作(例如短時間內(nèi)連續(xù)點擊);某些機構(gòu)出于非法目的,通過機器人點擊;這些均是無效的廣告點擊行為,需要加以去除。若某用戶10秒內(nèi),點擊3次以上的廣告,則判定該用戶的點擊無效),輸出這些無效點擊用戶的ID(每4秒更新1次)。DStream無狀態(tài)轉(zhuǎn)換操作SparkDStream無狀態(tài)轉(zhuǎn)換操作是指不記錄歷史狀態(tài)信息,每次僅對新的批次數(shù)據(jù)進行處理;無狀態(tài)轉(zhuǎn)換操作每一個批次的數(shù)據(jù)處理都是獨立的,處理當前批次數(shù)據(jù)時,即不依賴之前的數(shù)據(jù),也不影響后續(xù)的數(shù)據(jù)。無狀態(tài)轉(zhuǎn)換示例Spark有一份黑名單文件blacklist.txt,記載了若干列入黑名單的IP地址,內(nèi)容如下:355采用Netcat模擬用戶訪問所產(chǎn)生的數(shù)據(jù)流:用戶每訪問平臺一次,在Netcat中輸入一個IP地址。要求采用無狀態(tài)轉(zhuǎn)換方式,計算10秒內(nèi)的平臺有效訪問次數(shù)(忽略黑名單IP的訪問)無狀態(tài)轉(zhuǎn)換示例SparkvalblackIPs=sc.textFile(path)//讀取本地黑名單文件,創(chuàng)建黑名單RDDvalssc=newStreamingContext(sc,Seconds(10))valIPs=ssc.socketTextStream("localhost",9999)//監(jiān)聽9999端口的數(shù)據(jù)valwhiteIPs=IPs.transform{rdd=>rdd.subtract(blackIPs)//點擊數(shù)據(jù)(IP地址)中去掉黑名單RDD中的IP}valcount=whiteIPs.count()//統(tǒng)計剩余的IP地址數(shù)量count.print()//打印輸出結(jié)果DStream有狀態(tài)轉(zhuǎn)換操作Spark有狀態(tài)轉(zhuǎn)換操作在處理當前批次的數(shù)據(jù)時,需要用到之前批次的數(shù)據(jù)或者中間計算結(jié)果;有狀態(tài)轉(zhuǎn)換包括基于滑動窗口的轉(zhuǎn)換和updateStateByKey轉(zhuǎn)換。有狀態(tài)轉(zhuǎn)換操作示例Spark示例:讀取套接字流數(shù)據(jù)(9999端口),設(shè)置批次間隔1秒;窗口長度為3秒,滑動時間間隔為1秒,統(tǒng)計窗口內(nèi)單詞詞頻后打印輸出。
vallinesDS=ssc.socketTextStream("localhost",9999)valwordsDS=linesDS.flatMap(x=>x.split(""))valkvDS=wordsDS.map(x=>(x,1))
//轉(zhuǎn)換為(word,1)形式的鍵值對//使用reduceByKeyAndWindow方法,計算窗口內(nèi)單詞詞頻
val
windowWordCount=kvDS.reduceByKeyAndWindow(
(a:Int,b:Int)=>a+b,Seconds(3),Seconds(1))windowWordCount.print()無狀態(tài)轉(zhuǎn)換:不記錄之前狀態(tài)數(shù)據(jù);有狀態(tài)轉(zhuǎn)換:需要用到之前批次的數(shù)據(jù)或者中間計算結(jié)果;重點學習窗口操作。Spark綜合利用本任務中的知識儲備,讀取用戶點擊數(shù)據(jù),識別出無效的點擊行為,并輸出相應的結(jié)果吧。任務實施相關(guān)知識小結(jié)Spark項目6
SparkStreaming處理用戶行為數(shù)據(jù)Spark任務1初探廣告點擊行為識別無效的廣告點擊統(tǒng)計1分鐘內(nèi)的訂單數(shù)量任務2任務3電商用戶的行為分析任務4任務分析Spark除了監(jiān)聽socket端口方式獲取流數(shù)據(jù),SparkStreaming還可以讀取文件、RDD隊列及Kafka數(shù)據(jù),生成DStream。獲取Kafka中的用戶行為數(shù)據(jù)(數(shù)據(jù)形式為“用戶ID,行為類型”,行為類型”包括:①pv點擊,②buy下單,③cart購物車,④fav收藏),統(tǒng)計1分鐘內(nèi)的用戶下單數(shù)據(jù)。由文件流創(chuàng)建DStreamSparkSparkStreaming可以從HDFS文件系統(tǒng)目錄、本地系統(tǒng)的文件目錄讀取數(shù)據(jù)到DStream中;一旦有目錄中有文件加入,則獲取文件中的內(nèi)容,創(chuàng)建DStream。valssc=newStreamingContext(sc,Seconds(10))
valpath="hdfs://localhost:9000/user/hadoop/spark_streaming"
//創(chuàng)建DStream,監(jiān)聽hdfs相關(guān)目錄vallines=ssc.textFileStream(path)lines.print()
RDD隊列流創(chuàng)建DStreamSparkDStream可以看做離散的RDD序列,因此SparkStreaming可以讀取RDD組成的數(shù)據(jù)隊列;該方式較少使用。valinputDStream=ssc.queueStream(rddQueue)Kafka的原理SparkKafka是一個分布式、支持分區(qū)的(partition)、多副本的(replica)分布式消息系統(tǒng),它可以實時的處理大量數(shù)據(jù)以滿足多種需求場景,廣泛應用于web日志、訪問日志等領(lǐng)域。Kafka的安裝與體驗Spark解壓(安裝)Kafkasudotar-zxvf/home/hadoop/soft/kafka_2.12-3.6.0.tgz-C/usr/localKafka通常需要Zookeeper的支持,啟動Zookeeper/usr/local/kafka/bin/zookeeper-server-start.shconfig/perties打開第二個Linux終端,輸入以下命令,啟動kafka服務/usr/local/kafkabin/kafka-server-start.shconfig/perties創(chuàng)建消息主題(Topic)/usr/local/kafkabin/kafka-topics.sh
--create--topicmytopic--bootstrap-serverlocalhost:9092Kafka的安裝與體驗Spark向Kafka主題發(fā)出消息/usr/local/kafkabin/kafka-console-producer.sh--broker-listlocalhost:9092--topicmytopic可以自行輸入多行文本數(shù)據(jù)后,消費數(shù)據(jù):/usr/local/kafkabin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicmytpic--from-beginningKafka作為DStream數(shù)據(jù)源SparkvalkafkaParas=Map[String,String](//設(shè)置Kafka相關(guān)參數(shù)"bootstrap.servers"->"localhost:9092","key.deserializer"->"mon.serialization.StringDeserializer","value.deserializer"->"mon.serialization.StringDeserializer","group.id"->"use_a_separate_group_id_for_each_stream",)valtopics=Set("mytopic")//將讀取的Kafka主題Topic寫入到一個Set中Kafka作為DStream數(shù)據(jù)源SparkvalkafkaInputDS=KafkaUtils.createDirectStream(//創(chuàng)建DStreamssc,PreferConsistent,Subscribe[String,String](topics,kafkaParas))SparkStreaming可以從HDFS文件系統(tǒng)目錄、本地系統(tǒng)的文件目錄讀取數(shù)據(jù)到DStream中。DStream可以看做離散的RDD序列,因此SparkStreaming可以讀取RDD組成的數(shù)據(jù)隊列。SparkStreaming還支持Kafka、Flume等高級數(shù)據(jù)源。Spark綜合利用本任務中的知識儲備,完成規(guī)定指標的統(tǒng)計分析。任務實施相關(guān)知識小結(jié)Spark項目6
SparkStreaming處理用戶行為數(shù)據(jù)Spark任務1初探廣告點擊行為識別無效的廣告點擊統(tǒng)計1分鐘內(nèi)的訂單數(shù)量任務2任務3電商用戶的行為分析任務4任務分析Spark某電商平臺用戶行為數(shù)據(jù)集,包含了十萬隨機用戶的行為數(shù)據(jù)。定時讀取用戶行為文件中的行為數(shù)據(jù),寫入到Kafka的某主題下,SparkStream獲取該主題的數(shù)據(jù)后進行處理,統(tǒng)計1分鐘內(nèi)的各種用戶行為(10秒更新一次),過濾出購買行為后寫入MySQL數(shù)據(jù)庫。DStream數(shù)據(jù)保存到文件Spark除了print算子,DStream有若干輸出算子;相關(guān)說明如下:DStream數(shù)據(jù)保存到文件示
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 廣州地鐵車輛檢修工長理論考試題庫含答案
- 2026年初級經(jīng)濟師之初級經(jīng)濟師人力資源管理考試題庫500道【含答案】
- 天津軌道車輛調(diào)度員調(diào)度模擬操作考核含答案
- 2026年勞務員考試題庫附參考答案(綜合題)
- 2026年房地產(chǎn)經(jīng)紀協(xié)理考試題庫及答案【奪冠系列】
- 2026年勞務員考試題庫附答案【研優(yōu)卷】
- 急診護理倫理與法規(guī)
- 使用權(quán)轉(zhuǎn)讓合同匯編15篇
- 人工智能教育資源共享平臺在小學音樂教學中的應用研究教學研究課題報告
- 教師面試地理面試題及答案
- 人工智能行業(yè)-“人工智能+”行動深度解讀與產(chǎn)業(yè)發(fā)展機遇
- 2025棗莊市生態(tài)環(huán)境修復礦區(qū)復墾政策實施效果與國土空間規(guī)劃
- (一診)達州市2026屆高三第一次診斷性測試思想政治試題(含標準答案)
- 購車意向金合同范本
- 2025廣東廣電網(wǎng)絡校園招聘筆試歷年參考題庫附帶答案詳解
- 江蘇大學《無機與分析化學實驗B》2025-2026學年第一學期期末試卷
- 2025GINA全球哮喘處理和預防策略(更新版)解讀課件
- 2025年中國職場人心理健康調(diào)查研究報告
- 2025四川成都東方廣益投資有限公司下屬企業(yè)招聘9人備考題庫及完整答案詳解1套
- 國家中醫(yī)藥管理局《中醫(yī)藥事業(yè)發(fā)展“十五五”規(guī)劃》全文
- 2025公需課《新質(zhì)生產(chǎn)力與現(xiàn)代化產(chǎn)業(yè)體系》考核試題庫及答案
評論
0/150
提交評論