04單元四 大數(shù)據(jù)實時處理Storm_第1頁
04單元四 大數(shù)據(jù)實時處理Storm_第2頁
04單元四 大數(shù)據(jù)實時處理Storm_第3頁
04單元四 大數(shù)據(jù)實時處理Storm_第4頁
04單元四 大數(shù)據(jù)實時處理Storm_第5頁
已閱讀5頁,還剩83頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)

文檔簡介

單元四大數(shù)據(jù)實時處理Storm掌握Storm的安裝與配置掌握Storm集群運行模式的使用掌握Storm的Topology、Spout、Bolt組件的實現(xiàn)掌握消息完整性、并行度、分組策略的選用與實現(xiàn)掌握Storm與Flume、Kafka、HDFS、Redis整合的實現(xiàn)熟悉Nimbus、Supervisor、Worker、Executor、Task的作用熟悉StormTrident框架了解Storm與MapReduce的區(qū)別4.1什么是Storm

Storm是一個開源的、分布式、實時流(Stream)處理系統(tǒng),每秒可以處理數(shù)萬條數(shù)據(jù),Storm以Topology(拓?fù)洌┳鳛榱魈幚淼淖鳂I(yè)。是目前大數(shù)據(jù)實時處理主流技術(shù)之一。Storm中的數(shù)據(jù)結(jié)構(gòu)被稱為Tuple(元組),屬于有序元素的列表類型,可以支持所有數(shù)據(jù)類型。Storm中的流被稱為Stream,是Tuple的無序序列。Storm中還包含了若干屬于進程級別的運行組件,如下表。序號組件名稱描述1Spout流的來源。通常,Storm從原始數(shù)據(jù)源(如Kafka隊列,Kestrel隊列等)接受數(shù)據(jù),也可以編寫spout從數(shù)據(jù)源讀取數(shù)據(jù)。常用接口有IRichSpout,BaseRichSpout,KafkaSpout等。2BoltBolt是邏輯處理單元。Spout將數(shù)據(jù)傳遞到Bolt,產(chǎn)生新的輸出流。Bolt可以執(zhí)行過濾,聚合,加入與數(shù)據(jù)源和數(shù)據(jù)庫交互的操作。Bolt處理完數(shù)據(jù),并發(fā)射到一個或多個Bolts?!癐Bolt”是實現(xiàn)Bolt的核心接口。常見的接口有IRichBolt,IBasicBolt等。進程級運行組件Spout和Bolt、Tuple、Stream,四者之間的關(guān)系Storm將集群中的節(jié)點分為兩種類型:Nimbus(主節(jié)點):負(fù)責(zé)在Supervisor間分發(fā)數(shù)據(jù),分配任務(wù)和監(jiān)視故障。Supervisor(工作節(jié)點):管理Worker(工作進程),完成由Nimbus分配的任務(wù)。Worker不會親自運行任務(wù),而是創(chuàng)建Executor(執(zhí)行者),由Executor執(zhí)行指定的Task(任務(wù))組件,Executor和Task屬于線程級別的運行組件。序號組件名稱描述1Executor執(zhí)行者只是工作進程產(chǎn)生的單個線程。執(zhí)行者可以運行一個或多個Task組件(任務(wù))。2Task任務(wù)組件,執(zhí)行實際的數(shù)據(jù)處理。Supervisor、Worker、Executor、Task,四者之間的關(guān)系

Nimbus是無狀態(tài)的,它依賴于ZooKeeper來監(jiān)視Supervisor的狀態(tài),協(xié)調(diào)Supervisor與Nimbus間的交互,并通過心跳機制維護Nimbus、Supervisor的狀態(tài)。Nimbus、Supervisor、ZooKeeper、Worker,四者之間的關(guān)系Stormtopology的工作原理Storm與MapReduce的區(qū)別在于:(1)Storm是實時運算,MapReduce是離線運算;(2)Storm的Topology被啟動后,Topology會一直處于運行狀態(tài),不會因數(shù)據(jù)處理完畢而主動退出;而MapReduce的Job被啟動后,待Job處理完任務(wù),Job便會隨之結(jié)束。(3)MapReduce提供了Map和Reduce等組件,還提供了數(shù)據(jù)的分組與匯總功能。而Storm只提供了Spout和Bolt組件,怎么處理數(shù)據(jù)則交由開發(fā)人員自行設(shè)計完成。4.2Storm集群的安裝

4.2.1 下載與安裝

在瀏覽器地址欄中輸入Storm官方網(wǎng)址“/downloads.html”(截止到2021年8月,Storm的最新版本是2.2.0版),在打開的頁面中,單擊“apache-storm-2.2.0.tar.gz”超鏈接。在跳轉(zhuǎn)后的頁面中,指向下載鏈接,單擊鼠標(biāo)右鍵,在彈出的快捷菜單中,單擊復(fù)制鏈接地址菜單項。4.2.2 配置Storm集群 進入主節(jié)點node1的storm的“conf”目錄。打開“conf”目錄下的“storm.yaml”文件,添加以下內(nèi)容。注意格式的對齊與縮進#Zookeeper集群的主機列表storm.zookeeper.servers:-"node1主機名"-"node2主機名"-"node3主機名"#冒號后要加空格,設(shè)置Storm的HA集群所需,Nimbus的節(jié)點列表,至少需要兩個成員nimbus.seeds:["node1主機名","node2主機名"]#storm的webui默認(rèn)端口8080,與其它程序默認(rèn)端口容易產(chǎn)生沖突,所以這里需要修改為8081ui.port:8081#冒號后要加空格,Storm集群運行時,需要存儲狀態(tài)的路徑storm.local.dir:"/opt/storm/tmp"(未完)接上一頁#workers進程的端口,每個worker進程會使用一個端口來接收消息supervisor.slots.ports:-6700-6701-6702-6703#冒號后要加空格,設(shè)置Supervisors運行狀態(tài)的監(jiān)控路徑,開啟對Supervisors的狀態(tài)監(jiān)控(可選)storm.health.check.dir:"healthchecks"#冒號后要加空格,指定監(jiān)控的頻率(可選)storm.health.check.timeout.ms:5000將配置后的storm程序復(fù)制到節(jié)點node2、node3。[root@node1app]#scp-rapache-storm-2.2.0/root@node2:$PWD[root@node1app]#scp-rapache-storm-2.2.0/root@node3:$PWD并在所有節(jié)點的環(huán)境配置文件中添加Storm的配置信息。4.2.3 啟動集群,測試安裝效果 進入主節(jié)點node1的storm下的“bin”目錄,創(chuàng)建storm啟動腳本文件“start-storm.sh”和關(guān)停腳本文件“stop-storm.sh”(腳本代碼內(nèi)容參見書中內(nèi)容),并修改腳本文件的權(quán)限。在所有節(jié)點上,啟動ZooKeeper服務(wù)。zkServer.shstart[root@node1bin]#chmodu+xstart-storm.sh[root@node1bin]#chmodu+xstop-storm.sh在主節(jié)點node1上,執(zhí)行以下命令,啟動Storm集群。[root@node1bin]#start-storm.sh啟動過程會消耗一定時間,待啟動完畢后,在各節(jié)點上執(zhí)行jps命令,Storm集群正常啟動后在主節(jié)點node1中的進程情況如下。......3575Supervisor3640LogviewerServer3482Nimbus3534UIServer......在瀏覽器中訪問“http://node1:8081”或“http://node2:8081”,可打開Storm的WEB頁面。4.3Storm應(yīng)用開發(fā)基礎(chǔ) 4.3.1 通話日志統(tǒng)計項目 項目特點:(1)Spout和Bolt的使用;(2)本地運行模式的使用;(3)對通話記錄進行統(tǒng)計分析。項目結(jié)構(gòu)項目中各個類的功能如下:(1)FakeCallLogReaderSpout類,繼承于BaseRichSpout基類,用于配置數(shù)據(jù)源,本項目中主要用于產(chǎn)生通訊數(shù)據(jù),并將數(shù)據(jù)發(fā)給下游的Bolt,即CallLogCreatorBolt類。數(shù)據(jù)由“來電號碼+接聽號碼+通話時長”三部分組成;(2)CallLogCreatorBolt類,用于將從FakeCallLogReaderSpout接收到的通訊數(shù)據(jù)整合成通訊號碼對+通話時長,并發(fā)給下游的Bolt,即CallLogCounterBolt類;(3)CallLogCounterBolt類,用于將從CallLogCreatorBolt接收到的通訊數(shù)據(jù)統(tǒng)計出通話總時長;(4)CallLogCreatorBolt類和CallLogCounterBolt類,均繼承于BaseRichBolt基類;(5)App類是程序執(zhí)行的入口類,負(fù)責(zé)組裝topology對象。其中BaseRichSpout類是FakeCallLogReaderSpout.java的基類,用于數(shù)據(jù)生成的組件,該類的主要方法有:(1)open():為Spout提供執(zhí)行環(huán)境。項目可以運行此方法來實現(xiàn)一些初始化工作。open(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector)①conf參數(shù):為此spout提供storm配置。②context參數(shù):提供有關(guān)拓?fù)渲械膕pout位置,其任務(wù)ID,輸入和輸出信息的完整信息。③collector參數(shù):將使用它的emit方法發(fā)出,將由bolts處理的元組。(2)nextTuple():負(fù)責(zé)數(shù)據(jù)生成的具體業(yè)務(wù),并將生成的數(shù)據(jù)以SpoutOutputCollector的emit方法發(fā)出。nextTuple方法會被Storm定期調(diào)用,如同一個循環(huán)調(diào)用。(3)close():當(dāng)Spout將要關(guān)閉時調(diào)用此方法。(4)declareOutputFields():聲明Tuple的輸出模式。(5)ack():確認(rèn)處理了特定Tuple。(6)fail():指定不處理和不重新處理特定Tuple。其中BaseRichBolt類是CallLogCreatorBolt.java的基類,用于將上游發(fā)出的Tuple作為輸入的數(shù)據(jù),處理后,產(chǎn)生新的Tuple作為輸出數(shù)據(jù),一個Stormtopology程序中,可以有多個Bolt。BaseRichBolt有以下主要方法:(1)prepare():為bolt提供要執(zhí)行的環(huán)境。執(zhí)行器將運行此方法來初始化bolt。prepare(Mapconf,TopologyContextcontext,OutputCollectorcollector)①conf參數(shù):為此bolt提供Storm配置。②context參數(shù):提供有關(guān)拓?fù)渲械腷olt位置,其任務(wù)ID,輸入和輸出信息等信息。③collector參數(shù):用于發(fā)出處理的Tuple。(2)execute():處理輸入的Tuple,一次處理單個Tuple。Tuple數(shù)據(jù)通過Tuple類的getValue方法獲取Tuple的值,多個Tuple可以被處理為單個輸出Tuple,處理后的Tuple可以通過使用OutputCollector類發(fā)給下游的Bolt。(3)cleanup():當(dāng)bolt要關(guān)閉時調(diào)用。(4)declareOutputFields():聲明Tuple的輸出模式。

項目的拓?fù)浣Y(jié)構(gòu),TopologyBuilder類使用createTopology來創(chuàng)建Topology(拓?fù)洌?,通過setSpout、setBolt方法來配置spout和bolt。一個完整的Topology遵循以下步驟來完成:創(chuàng)建Builder→配置Spout→配置Bolt→配置分組策略→提交Topology,整個過程在項目的App.java類中有具體代碼體現(xiàn)。4.3.2 創(chuàng)建一個簡易的詞頻統(tǒng)計項目 項目特點:(1)集群運行模式,對比本地運行模式;(2)StormUI頁面操作;(3)Storm運行日志查看;(4)Storm消息的完整性處理。項目結(jié)構(gòu)(1)SpoutWords類,繼承于BaseRichSpout基類,用于配置數(shù)據(jù)源,給下游的Bolt發(fā)送數(shù)據(jù);(2)BoltSplit類,用于拆分句子中的單詞;BoltCount類,用于統(tǒng)計單詞出現(xiàn)的總次數(shù);BoltReport類,用于匯總報告,這三個類均繼承于BaseRichBolt基類;(3)WordCountTopology類,程序的執(zhí)行入口,負(fù)責(zé)組裝topology對象。集群模式下的運行按順序,啟動好ZooKeeper集群和storm集群。然后按以下命令格式,將jar文件提交到storm集群運行。storm+空格+jar+空格+mvn命令編譯項目后產(chǎn)生的jar文件+空格+帶包名全稱的項目主類名+空格+指定topology名稱。例如:stormjarhelloworld-0.0.1-SNAPSHOT-shaded.jarorg.my.storm.WordCountTopologytest在反饋的信息中,如果輸出有以下內(nèi)容,則表明項目的topology提交Storm集群成功。03:51:33.634[main]INFOo.a.s.StormSubmitter-Finishedsubmittingtopology:testStorm日志的使用

在瀏覽器中訪問“http://node1:8081”或“http://node2:8081”,打開Storm的WEBUI頁面,可以查看Storm集群運行狀態(tài)。單擊Storm的WEBUI頁面上“TopologySummary”區(qū)域的“test”,可以查看“testTopology”的運行狀態(tài)。

進入“TopologySummary”頁面后,找到“Bolts(Alltime)”區(qū)域,區(qū)域中的“bolt-report”對應(yīng)的是Storm項目中創(chuàng)建的BoltReportJava類,“bolt-count”對應(yīng)的是項目中的BoltCountJava類,這些id名稱均是來至WordCountTopology類中的代碼設(shè)置。因為testTopology運行需要一段時間,因此需要間隔5-10秒刷新一次頁面,以便查看最新的數(shù)字,如果“bolt-report”的“Executed”(已執(zhí)行量)中的數(shù)字與“bolt-count”的“Emitted”(已發(fā)射量)中的數(shù)字相等,則表明當(dāng)前“testTopology”的業(yè)務(wù)已完成。

繼續(xù)往下,找到“WorkerResources”區(qū)域,單擊區(qū)域右上角的“ToggleComponents”按鈕,查看“bolt-report”是在集群中的哪個節(jié)點的哪個端口下運行。

然后回到頁面的最上方,在搜索輸入框前顯示有“testTopology”在集群中的完整名稱,將“testTopology”的完整名稱填入輸入框中后,然后單擊“Search”按鈕。

在打開的日志查看頁面,找到“bolt-report”的運行節(jié)點及端口號,單擊這一行中的“testTopology”在集群中的完整名稱。

在接下來打開的“Work”日志頁面,通過頁面左上角的“Prev”和“Next”兩個按鈕,可以翻頁查看日志內(nèi)容,找到“testTopology”的運行結(jié)果。4.3.3 消息完整性處理

在程序中,首先是從Spout開始,Spout會將消息封裝成Tuple發(fā)送出去,下游的Bolt會繼續(xù)將消息封裝成新的Tuple發(fā)送出去,后面的Bolt從前面的Bolt的Tuple中讀取消息,并再次封裝成新的Tuple發(fā)送出去,直至最后一個Bolt。從上游的Bolt到下游的Bolt,每一層都會產(chǎn)生新的Tuple,整個過程如同構(gòu)成了一棵消息樹。

但這棵消息樹并不是可靠的,在處理過程中,如果發(fā)生消息未處理完,或者異常等情況,都會影響到最終的運算結(jié)果。為了提高消息處理的完整性,storm提供了重新發(fā)送未被處理的消息機制,保證發(fā)送出去的每個Tuple都得到完整的處理。實現(xiàn)消息處理的完整性,需要做兩件事:(1)創(chuàng)建Tuple時(即,消息樹上的一個新節(jié)點)需要通知Storm。在發(fā)送新Tuple時,把上游的Tuple與新數(shù)據(jù)同時作為參數(shù),Storm會進行消息的錨定(anchoring)。具體使用方法如下:collector.emit(tuple,newValues(word));//代表了錨定collector.emit(newValues(word));//代表了未錨定,storm無法跟蹤這個消息,也就無法提供可靠性(2)處理完一個Tuple,也需要通知Storm。通過OutputCollector中的ack和fail方法來通知Storm,Storm中的Acker負(fù)責(zé)跟蹤每個消息的處理狀態(tài)。具體使用方法如下:collector.ack(tuple);collector.fail(tuple);

每一個處理Tuple的bolt,都通過ack和fail方法來告知storm,上游的Tuple是否被處理成功。如果一個Tuple被處理成功,就調(diào)用ack方法;如果失敗,則調(diào)用fail方法。消息完整性處理的API方法優(yōu)化

為了簡化消息完整性編程,Storm提供了BaseBasicBolt類,此類提供的collector不是OutputCollector對象,而是BasicOutputCollector對象。在BasciOutputCollector的底層已經(jīng)實現(xiàn)了消息的錨定和結(jié)果通知。因此,項目中只需要使用BaseBasicBolt來創(chuàng)建Bolt類。在發(fā)送消息的時候,只需要在調(diào)用collector.emit(newValues)時,設(shè)置Values值,無需再指定上游的tuple。如果發(fā)送成功,也不需要再寫任何代碼;如果失敗,則拋出一個FailedException異常。publicclassBoltSplitextendsBaseBasicBolt{@Overridepublicvoidexecute(Tupleinput,BasicOutputCollectorcollector){try{......collector.emit(newValues(word));//把每個單詞發(fā)射出去}catch(FailedExceptione){collector.reportError(e);}}......}4.3.4 基于《老人與?!酚⑽男≌f的詞頻統(tǒng)計項目 項目特點:(1)被統(tǒng)計的數(shù)據(jù)來源于磁盤上的文本文件;(2)統(tǒng)計后的結(jié)果保存到磁盤的文本文件;(3)為減少運算延遲,給項目配置并行度。項目的結(jié)構(gòu)與配置與前面的HelloWorld項目一致,項目中各個類的功能如下:(1)SpoutWords類,繼承于BaseRichSpout基類,讀取指定目錄下的文件內(nèi)容,給下游的Bolt發(fā)送讀取的句子數(shù)據(jù);(2)BoltSplit類,用于拆分句子中的單詞;BoltCount類,用于過濾單詞中出現(xiàn)的標(biāo)點符號,并統(tǒng)計單詞出現(xiàn)的總次數(shù);BoltReport類,用于將統(tǒng)計的結(jié)果保存到磁盤文件中,這三個類均繼承于BaseRichBolt基類;(3)WordCountTopology類,程序的執(zhí)行入口。負(fù)責(zé)組裝topology對象,為topology配置并行度,配置《老人與海》文件的所在位置。并發(fā)機制的配置在WordCountTopology類中進行并發(fā)機制的配置:(1)通過conf.setNumWorkers(5);,為storm集群配置5個工作進程;(2)通過topologyBuilder.setSpout("spout",sentenceSpout,2);,為spout配置兩個工作線程;(3)通過topologyBuilder.setBolt("bolt-split",splitSentenceBolt,2).setNumTasks(4).shuffleGrouping("spout");,為splitbolt配置兩個工作線程,四個工作任務(wù),每個線程平均兩個工作任務(wù);(4)通過topologyBuilder.setBolt("bolt-count",wordCountBolt,4).fieldsGrouping("bolt-split",newFields("words"));,為countbolt配置四個工作線程;(5)topologyBuilder.setBolt("bolt-report",reportBolt).globalGrouping("bolt-count");,reportbolt沒有顯式指定數(shù)量,默認(rèn)是配置一個工作線程。4.3.5 商品交易額統(tǒng)計項目 項目特點:(1)Storm的分組策略的使用;(2)分組策略效果的對比。項目的結(jié)構(gòu)項目中各個類的功能如下:(1)MySpout類,繼承于BaseRichSpout基類,用于生成模擬的用戶ID,以及用戶的交易額數(shù)據(jù),給下游的Bolt發(fā)送數(shù)據(jù);(2)MyBolt類,以用戶為單位,用以統(tǒng)計出每個用戶的總交易額;(3)App類,程序的執(zhí)行入口,負(fù)責(zé)組裝topology對象,配置分組策略;(4)MyCustomStreamingGrouping類,用于實現(xiàn)自定義的分組策略。storm的分組策略:指在Spout與Bolt、Bolt與Bolt之間進行Tuple的傳遞方式。Storm的分組策略對結(jié)果有著直接的影響,采用不同的分組策略,會導(dǎo)致最終的結(jié)果不一樣。Storm分組策略的種類。序號策略名稱功能1ShuffleGrouping隨機分組,隨機分配stream里面的tuple,不保證每個bolttask接收到的tuple數(shù)目相同。2FieldsGrouping按字段分組,相同fields的會被分發(fā)到同一個Bolt,比如,按“userid”這個字段來分組,那么具有相同“userid”字段值的tuple會被分到相同的Bolttask里,而不同的"userid"則會被分配到不同的task。適用對時序比較敏感的應(yīng)用,如計費系統(tǒng)。3PartialKeygrouping與FieldsGrouping分組相似,也是用指定的字段進行分組,但可以在多個下游Bolt之間進行負(fù)載均衡,適用于輸入數(shù)據(jù)有傾斜時,可以更好的利用資源。4AllGrouping廣播發(fā)送,將所有的tuple分配到全部Bolttask上,即每一個task都會接收到一份完全相同的tuple拷貝。5GlobalGrouping全局分組,tuple被分配到bolt的其中一個task上,且這個task的id值是最低的一個。6NoneGrouping不分組,指stream不關(guān)心到底怎樣分組。這種分組類似于Shufflegrouping,隨機分配tuple到每個bolttask中,每個bolttask每次分配到的tuple數(shù)量不確定。7DirectGrouping指向型分組,這是一種比較特別的分組方法,用這種分組意味著tuple的發(fā)送者指定由消息接收者的哪個task處理這個消息。只有被聲明為DirectStream的消息流可以聲明這種分組方法。而且這種tuple必須使用emitDirect方法來發(fā)射。消息處理者可以通過TopologyContext來獲取處理消息的taskid(OutputCollector.emit方法也會返回taskid)8Localorshufflegrouping本地或隨機分組。如果目標(biāo)bolt有一個或者多個task與源bolt的task在同一個工作進程中,tuple將會被隨機發(fā)送給這些在同進程中的tasks。否則,和普通的ShuffleGrouping行為一致。9customGroupingCustomStreamGrouping`接口的自定義分組策略類,由開發(fā)人員自己定義分組策略。以分別使用ShuffleGrouping、AllGrouping分組策略為例,進行對比(其它分組策略的使用,請參照書中內(nèi)容)://采用ShuffleGrouping分組策略。上游的spout將tuple隨機發(fā)送給下游的boltbuilder.setBolt("id_sum_bolt",newMyBolt(),3).shuffleGrouping("id_order_spout");

項目在Storm集群中運行后,打開StormUI界面,進入各自的“bolt”的日志查看頁面,會發(fā)現(xiàn)只有其中一個“bolt”的日志記錄了三個用戶的訂單總金額,而其它兩個“bolt”的日志則沒有記錄。//采用AllGrouping分組策略builder.setBolt("id_sum_bolt",newMyBolt(),3).allGrouping("id_order_spout");在storm集群的三個“bolt”的日志中均有一份相同的用戶訂單總金額記錄。node1節(jié)點bolt日志記錄三名用戶的訂單總金額node2節(jié)點bolt日志記錄三名用戶的訂單總金額node3節(jié)點bolt日志記錄三名用戶的訂單總金額分組策略運行結(jié)果總結(jié):(1)ShuffleGrouping:三個bolt線程,同時執(zhí)行,但對于同一個tuple數(shù)據(jù),只有一個bolt會接收到,并且是隨機。(2)AllGrouping:三個bolt線程,同時執(zhí)行,但對于同一個tuple數(shù)據(jù),3個bolt都會接收到。(3)GlobalGrouping:三個bolt線程,同時執(zhí)行,但對于同一個tuple數(shù)據(jù),只有taskid值是最小的一個bolt會接收到,其它2個bolt不會接收到。(4)customGrouping:由開發(fā)人員自己自定義分組策略,如果開發(fā)人員在代碼中沒有明確定義分組策略,則所有的tuple數(shù)據(jù)會集中分配到同一個bolt中。(5)FieldsGrouping:按照字段分組,同一個字段的值只能發(fā)送給同一個Bolt。4.4基于《鋼鐵是怎樣煉成的》中文小說的字詞統(tǒng)計項目,整合hdfs+Storm

該項目只在集群模式下運行,其特點是:(1)整合hdfs+Storm,從hdfs讀取原始數(shù)據(jù);(2)借助第三方庫jieba對中文句子進行符合語意的分詞操作;(3)整合hdfs+Storm,將統(tǒng)計后的結(jié)果寫到hdfs;(4)通過Sqoop將統(tǒng)計結(jié)果導(dǎo)出到MySQL;(5)采用Python訪問MySQL進行重復(fù)數(shù)據(jù)的清洗;(6)最后采用Python實現(xiàn)統(tǒng)計結(jié)果的可視化分析。項目結(jié)構(gòu)由于該項目的數(shù)據(jù)來源于HDFS,最后的統(tǒng)計結(jié)果也是保存到HDFS中,因此項目中的Spout組件和最后負(fù)責(zé)匯總輸出的Bolt組件會直接在項目的執(zhí)行主類WordCountTopology.java中配置,不再需要另行創(chuàng)建。項目中各個類的功能如下:(1)BoltSplit類,用于對中文句子進行符合語意的分詞操作;BoltCount類,用于過濾非中文的字符及標(biāo)點符號內(nèi)容,統(tǒng)計每個字詞出現(xiàn)的總次數(shù);(2)WordCountTopology類,程序的執(zhí)行入口,創(chuàng)建HdfsSpout組件對象,讀取存放在HDFS中的《鋼鐵是怎樣煉成的》文件內(nèi)容,HdfsBolt組件對象,將統(tǒng)計結(jié)果存儲到HDFS中,對HDFS和Storm進行整合。4.4.2 整合hdfs+Storm

publicclassWordCountTopology{

publicstaticvoidmain(String[]args)throwsException{......

//整合HDFS,創(chuàng)建讀取HDFS數(shù)據(jù)的spout實例

HdfsSpoutspout=newHdfsSpout().withOutputFields(TextFileReader.defaultFields)//指定文件讀取器.setReaderType("text")//指定讀取的文件類型.setHdfsUri("hdfs://node1:9820")//指定hdfs地址(未完).setSourceDir("/data/in")//指定數(shù)據(jù)源文件所在的目錄.setArchiveDir("/data/done")//指定讀取完成后,數(shù)據(jù)源文件轉(zhuǎn)移的目錄

//指定讀取過程中發(fā)生錯誤時,錯誤文件存放的目錄.setBadFilesDir("/data/badfiles");

......

//整合HDFS,創(chuàng)建保存數(shù)據(jù)的Bolt實例HdfsBoltreportBolt=newHdfsBolt().withFsUrl("hdfs://node1:9820")//指定hdfs地址.withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy); ......}}

(接上一頁)4.4.3 數(shù)據(jù)可視化

數(shù)據(jù)可視化,分本地和Web兩種,這里以簡單快捷實現(xiàn)為目的,介紹兩種數(shù)據(jù)可視化方法。1)基于matplotlib庫的本地柱狀圖可視化importpymysqlasmysqlimportmatplotlib.pyplotaspltfrompylabimportmpl#查詢數(shù)據(jù)表......words=list()#用于存儲字詞amounts=list()#用于存儲字詞數(shù)量......mpl.rcParams['font.sans-serif']=['SimHei']#設(shè)置中文字體環(huán)境plt.bar(words,amounts,align='center')#創(chuàng)建柱狀圖plt.title("《鋼鐵是怎樣煉成的》一文中頻次Top20字詞")#設(shè)置標(biāo)題plt.ylabel('出現(xiàn)頻次')#設(shè)置y軸文本標(biāo)簽plt.xlabel('字詞')#設(shè)置x軸文本標(biāo)簽plt.grid(True,linestyle='-.')#設(shè)置背景網(wǎng)格plt.show()2)基于pyecharts的Web版詞云可視化

pyecharts是一款將Python和Echarts相結(jié)合的數(shù)據(jù)可視化工具庫。開發(fā)前,需要首先分別安裝pyecharts和詞云庫wordcloud。frompyecharts.chartsimportWordCloudfrompyecharts.globalsimportSymbolTypeimportpyecharts.optionsasoptsimportpymysqlasmysql#查詢數(shù)據(jù)表......words=list()......

#創(chuàng)建詞云,保存為HTML格式c=(

WordCloud().add("",words,word_size_range=[20,100],shape=SymbolType.DIAMOND).set_global_opts(title_opts=opts.TitleOpts(title='《鋼鐵是怎樣煉成的》一文詞云',title_textstyle_opts=opts.TextStyleOpts(font_size=23)),tooltip_opts=opts.TooltipOpts(is_show=True)).render("盤符:\\非中文路徑\\index.html")#按此格式設(shè)置生成后的文件名,及路徑)4.5基于HDFS+Storm+Redis整合項目

4.5.1 Storm+Redis整合技術(shù) 1.基于Redis的存儲

本節(jié)的案例項目以詞頻統(tǒng)計為例,采用RedisStoreBolt類,以及RedisStoreMapper接口,將統(tǒng)計結(jié)果保存到Redis中。創(chuàng)建后的項目結(jié)構(gòu)如下:項目中各個類的功能如下:(1)Spout類,繼承于BaseRichSpout基類,用于配置數(shù)據(jù)源,給下游的Bolt發(fā)送數(shù)據(jù);(2)CounterBolt類,用于統(tǒng)計單詞出現(xiàn)的次數(shù);(3)SetupRedisStoreMapper類,實現(xiàn)RedisStoreMapper接口,進行Redis表結(jié)構(gòu)、表類型、表名等信息的配置;(4)WordCountTopology主類,程序的執(zhí)行入口,創(chuàng)建RedisStoreBolt類對象,實現(xiàn)Redis+Storm整合。publicclassSetupRedisStoreMapperimplementsRedisStoreMapper{privatestaticfinallongserialVersionUID=2538872152713105116L;@OverridepublicStringgetKeyFromTuple(ITupletuple){returntuple.getStringByField("word");//配置redis表中的鍵名}@OverridepublicStringgetValueFromTuple(ITupletuple){returntuple.getStringByField("count");//配置redis表中的值名}@OverridepublicRedisDataTypeDescriptiongetDataTypeDescription(){//配置Redis的映射表類型為HASH,及表名count-resultreturnnewRedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,"count-result");}}SetupRedisStoreMapper.java文件。App.java文件,項目的執(zhí)行入口類,實現(xiàn)Redis+Storm的整合

Stringhost="node3";//指定Redis服務(wù)端所在的主機名intport=6379;//指定Redis服務(wù)端的端口號//創(chuàng)建Redis訪問池對象,配置Redis相關(guān)參數(shù)JedisPoolConfigpoolConfig=newJedisPoolConfig.Builder().setHost(host)

.setPort(port).build();Spoutspout=newSpout();

......SetupRedisStoreMapperstoreMapper=newSetupRedisStoreMapper();

//整合Redis,實現(xiàn)數(shù)據(jù)的保存RedisStoreBoltstoreBolt=newRedisStoreBolt(poolConfig,storeMapper);TopologyBuilderbuilder=newTopologyBuilder();

......

//接收上游CounterBolt發(fā)送過來的數(shù)據(jù),保存到Redis中builder.setBolt("store-bolt",storeBolt,1).shuffleGrouping("count-bolt"); ......擴展知識:還可以使用Redis進行消息內(nèi)容的過濾,感興趣的同學(xué)可查看書中節(jié)內(nèi)容。4.5.2 實現(xiàn)HDFS+Storm+Redis的整合

本節(jié)項目是4.4節(jié)項目與4.5.1節(jié)項目的整合,運行在集群模式下。區(qū)別于4.4節(jié)項目的是:(1)使用Storm+Redis整合,將統(tǒng)計結(jié)果保存到Redis;(2)利用Redis的鍵值對的特性,避免重復(fù)數(shù)據(jù)的存儲;(3)采用Python訪問Redis;(4)最后實現(xiàn)統(tǒng)計結(jié)果的可視化分析。項目結(jié)構(gòu)項目中各個類的功能已經(jīng)在4.4節(jié)項目與4.5.1節(jié)項目中有過描述,因此不再贅述。在WordCountTopology類中實現(xiàn)HDFS+Storm+Redis的整合

......HdfsSpoutspout=newHdfsSpout()

.withOutputFields(TextFileReader.defaultFields)

.setReaderType("text").setHdfsUri("hdfs://node1:9820")

.setSourceDir("/data/in").setArchiveDir("/data/done").setBadFilesDir("/data/badfiles");//創(chuàng)建整合HDFS的spout實例builder.setSpout("hdfs-spout",spout,1);//配置spout......//整合RedisStringhost="node3";intport=6379;JedisPoolConfigpoolConfig=newJedisPoolConfig.Builder().setHost(host).setPort(port).build();SetupRedisStoreMapperstoreMapper=newSetupRedisStoreMapper();RedisStoreBoltstoreBolt=newRedisStoreBolt(poolConfig,storeMapper);//配置與Redis關(guān)聯(lián)的Bolt,上游為countboltbuilder.setBolt("bolt-store",storeBolt,1).globalGrouping("bolt-count");......4.6基于Flume+Kafka+Storm+Redis整合的項目

本節(jié)項目的特點是,在集群模式下:(1)通過Flume+Kafka整合,采集數(shù)據(jù)到Kafka;(2)通過KafkaSpout來從Kafka中獲取數(shù)據(jù);(3)通過RedisBolt將分析后的數(shù)據(jù)存于Redis中;(4)最后通過Django+Reids,實現(xiàn)數(shù)據(jù)的實時動態(tài)/離線靜態(tài)兩種可視化分析。本節(jié)用兩個項目來分別實現(xiàn)數(shù)據(jù)的實時動態(tài)/離線靜態(tài)可視化分析:(1)kafka-storm-redis-one項目:對實時采集的日志數(shù)據(jù)進行分析處理,從17種商品數(shù)據(jù)中獲取某一類商品的實時銷售數(shù)據(jù),并實時動態(tài)地可視化數(shù)據(jù)的銷售走勢;(2)kafka-storm-redis-all項目:對促銷期內(nèi)的17種商品的銷售數(shù)據(jù)進行分類匯總,并離線靜態(tài)可視化對比17種商品的銷售情況。4.6.1 kafka-storm-redis-one項目

項目結(jié)構(gòu)項目中的各java文件作用依次如下:(1)App.java:項目的執(zhí)行入口文件,負(fù)責(zé)Topology的配置與搭建;(2)OrdersInfo.java:商品信息實體類;(3)PickDataBolt.java:負(fù)責(zé)從采集的17種商品數(shù)據(jù)中,篩選出連衣裙的商品數(shù)據(jù);(4)SetupRedisStoreMapper.java:負(fù)責(zé)將獲取的連衣裙商品數(shù)據(jù)映射為Redis數(shù)據(jù)結(jié)構(gòu)。App.java文件通過KafkaSpout來從Kafka中獲取數(shù)據(jù)......//指定kafka集群節(jié)點及端口

finalStringbrokerUrl="node1:9092,node2:9092,node3:9092";finalStringTOPIC_STREAM="test_stream";//指定Spout接收kafka數(shù)據(jù)的數(shù)據(jù)流名稱//配置接收kafka數(shù)據(jù)的Stormtuple轉(zhuǎn)換器對象,tuple數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu),//由"key","value"組成,并指定數(shù)據(jù)流名稱ByTopicRecordTranslator<String,String>trans=newByTopicRecordTranslator<>((r)->newValues(r.key(),r.value()),

newFields("key","value"),TOPIC_STREAM);//設(shè)置Spout接收數(shù)據(jù)失敗時,回退重試的相關(guān)參數(shù),依次是:首次回退延遲時間、//回退頻次間隔時間、最大重試次數(shù)、重試時最長等待時間KafkaSpoutRetryServiceksrs=newKafkaSpoutRetryExponentialBackoff(

TimeInterval.microSeconds(500),

TimeIliSeconds(2),

Integer.MAX_VALUE,TimeInterval.seconds(10));(未完)接上一頁//創(chuàng)建Spout,整合kafka和StormKafkaSpoutConfig<String,String>spoutConfig=KafkaSpoutConfig//第二個參數(shù),必須與Kafka的當(dāng)前Topic名相同.builder(brokerUrl,newString[]{"kafka_test"})//設(shè)置kafka角色,為數(shù)據(jù)消費者.setProp(ConsumerConfig.GROUP_ID_CONFIG,"Consumer").setRetry(ksrs).setRecordTranslator(trans)//設(shè)置獲取數(shù)據(jù)的頻率時長,并將采樣策略設(shè)置為EARLIEST,//從kafka的partition中的第一個offset開始采集數(shù)據(jù).setOffsetCommitPeriodMs(10_000).setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST).setMaxUncommittedOffsets(250).build();//設(shè)置一次批量采樣偏移量的最高上限操作部分操作前,啟動好ZooKeeper集群+Storm集群+Kafka集群+Redis服務(wù)端,并確認(rèn)Kafka集群的Topic清單中是否存在名稱為“kafka_test”的Topic,在主節(jié)點node1上,運行編寫好的Flume腳本文件。另開一個主節(jié)點node1的連接窗口,運行模擬日志生成程序。再開一個主節(jié)點node1的連接窗口,提交本節(jié)項目的jar文件到Storm集群。產(chǎn)生結(jié)果數(shù)據(jù)后,進行可視化處理。數(shù)據(jù)實時動態(tài)可視化-----基于Django框架的Web可視化技術(shù)安裝Django框架pipinstalldjango-i/simple創(chuàng)建Django項目django-adminstartprojectwebpyecharts創(chuàng)建項目的新應(yīng)用pythonmanage.pystartappshow安裝rest_framework模塊pipinstalldjangorestframework-i/simple/在項目的settings.py文件的“INSTALLED_APPS”列表中,添加剛剛創(chuàng)建的應(yīng)用,以及“rest_framework”模塊在settings.py文件中進行Redis的連接配置。CACHES={'default':{#默認(rèn)存儲信息,在0號庫'BACKEND':'django_redis.cache.RedisCache','LOCATION':'redis://node3:6379/0','OPTIONS':{'CLIENT_CLASS':'django_redis.client.DefaultClient','CONNECTION_POOL_KWARGS':{'max_connections':100},}},

......}“show”應(yīng)用目錄下的views.py文件,處理數(shù)據(jù)請求......#定義函數(shù),創(chuàng)建折線圖defline_base():

......

time_list=list()#用于存儲每次交易的時間pay_list=list()#用于存儲每次交易的交易額

#從Redis中讀取數(shù)據(jù)......#定義并配置Pyecharts的折線對象line=(

Line().add_xaxis(xaxis_data=time_list)#將時間列表配置給x軸.add_yaxis(series_name='銷售總金額',#配置節(jié)點說明標(biāo)簽y_axis=pay_list,#將銷售額列表配置給y軸......)returnline(未完)#創(chuàng)建繼承于rest_frameworkAPI模塊的視圖類,用于響應(yīng)對折線數(shù)據(jù)的請求classLineTagViews(APIView):defget(self,request,*args,**kwargs):returnJsonResponse(json.loads(line_base()))#創(chuàng)建視圖類,用于響應(yīng)對折線模板頁的請求classLineViews(APIView):defget(self,request,*args,**kwargs):returnHttpResponse(content=open('./templates/line.html').read())接上一頁:在“show”應(yīng)用所在路徑下,創(chuàng)建應(yīng)用路由urls.py文件......urlpatterns=[#訪問地址采用正則表達模式識別url(r'^line_tag/$',views.LineTagViews.as_view(),name='va'),url(r'^index/$',views.LineViews.as_view(),name='va'),]在項目路由文件urls.py中,添加“show”應(yīng)用路由在項目的根目錄下,新建目錄“templates”,在“templates”目錄下,創(chuàng)建模板文件line.html(這兩個名稱必須與“show”應(yīng)用的views.py文件中指定的路徑和文件名稱保持一致),在文件中,添加以下代碼。<!DOCTYPEhtml><htmllang="en">......(未完)......<body><!--設(shè)置商品的實時動態(tài)折線圖在頁面上的顯示區(qū)域大小-->

<divid="bar"style="width:900px;height:400px;"></div><!--編寫js代碼,實現(xiàn)折線圖在頁面上的生成--><script>varchart=echarts.init(document.getElementById('bar'),'white',{renderer:'canvas'});//初始化pyecharts對象$(function(){//設(shè)置頁面導(dǎo)入時的執(zhí)行函數(shù)fetchData(chart);//調(diào)用函數(shù),向后端請求折線數(shù)據(jù)setInterval(fetchData,2000);//每2秒請求一次新數(shù)據(jù)});接上一頁(未完)functionfetchData(){//定義請求折線數(shù)據(jù)函數(shù)$.ajax({//配置ajax對象,發(fā)起ajax請求type:"GET",url:":8000/va/line_tag",//配置請求地址dataType:"json",success:function(result){varoptions=result.data;//獲取請求的折線數(shù)據(jù)

chart.setOption(options);//將數(shù)據(jù)配置給pyecharts對象}});}</script></body></html>接上一頁待Django服務(wù)器啟動完畢后,在瀏覽器中訪問“:8000/show/index/”地址,頁面中顯示有實時的商品交易動態(tài)信息。只要數(shù)據(jù)不斷被Flume采集,被Storm處理,此頁面就會不斷實時地顯示連衣裙商品交易的數(shù)據(jù)。4.6.2 kafka-storm-redis-all項目

項目結(jié)構(gòu)項目中的各java文件作用依次如下:(1)App.java是項目的執(zhí)行入口文件,負(fù)責(zé)Topology的配置與搭建,與kafka-storm-redis-one項目中非常相似,因此不再贅述;(2)OrdersInfo.java作用與kafka-storm-redis-one項目中一致,不再贅述;(3)CountBolt.java負(fù)責(zé)從統(tǒng)計匯總17種商品在活動期間內(nèi)的銷售數(shù)據(jù);(4)SetupRedisStoreMapper.java負(fù)責(zé)將獲取的商品數(shù)據(jù)映射為Redis數(shù)據(jù)結(jié)構(gòu)。數(shù)據(jù)離線靜態(tài)可視化

繼續(xù)使用前面創(chuàng)建的Django項目,以此項目為基礎(chǔ),實現(xiàn)數(shù)據(jù)的離線靜態(tài)可視化。在Django項目中添加一個新應(yīng)用,在新應(yīng)用的views.py文件,添加以下代碼。......defbar_base():

......name_list=list()#用于存儲17種商品各自的名稱pay_list=list()#用于存儲17種商品各自的銷售總額amount_list=list()#用于存儲17種商品各自的銷量#迭代讀取Redis庫中total-result的所有記錄

......

c_pay=(#銷售額柱狀圖對象

Bar().add_xaxis(name_list)#x軸裝配商品名稱列表數(shù)據(jù).add_yaxis('銷售額',pay_list)#y軸裝配銷售額列表數(shù)據(jù)

......)(未完)

c_amount=(#銷量柱狀圖對象Bar().add_xaxis(name_list).add_yaxis('銷量',amount_list,color='#2f4553') ......)#返回柱狀圖對象列表

return[c_pay,c_amount]#創(chuàng)建視圖類,用于響應(yīng)對銷售額數(shù)據(jù)的請求classBarPayTagViews(APIView):defget(self,request,*args,**kwargs):returnJsonResponse(json.loads(bar_base()[0]))#創(chuàng)建視圖類,用于響應(yīng)對銷量數(shù)據(jù)的請求classBarAmountTagViews(APIView):defget(self,request,*args,**kwargs):returnJsonResponse(json.loads(bar_base()[1]))......接上一頁創(chuàng)建模板文件bar.html,在模板文件中,添加以下內(nèi)容<!DOCTYPEhtml><htmllang="en">......<body><!--用于銷售額可視化標(biāo)簽--><divid="bar_pay"></div><!--用于銷量可視化標(biāo)簽--><divid="bar_amount"></div><script>//初始化pyecharts對象,用于銷售額可視化

varchart_pay=echarts.init(document.getElementById('bar_pay'),'white',{renderer:'canvas'});//初始化pyecharts對象,用于銷量可視化varchart_amount=echarts.init(document.getElementById('bar_amount'),'white',{renderer:'canvas'});......在Web瀏覽器中訪問“:8000/va/bar/”,頁面顯示商品的靜態(tài)銷售信息4.7StormTrident框架的使用(擴展知識)

4.7.1 什么是StormTrident

Trident是對Storm的實時流處理的高級抽象,是編寫StormTopology的一套高級框架。Trident以Batch(一組tuples)為單位進行數(shù)據(jù)的處理,當(dāng)Trident開始處理數(shù)據(jù)時,會將數(shù)據(jù)切分成若干個Batch,每個Batch中包含了若干個StormTuple。這樣的設(shè)計,可大大減少開發(fā)Storm程序的工作量,而且一個batch可以將原本需要多次讀寫操作的任務(wù),轉(zhuǎn)變成只執(zhí)行1次讀/寫請求就可以完成。4.7.2 TridentTuple數(shù)據(jù)處理

在Trident框架下,Tuple的數(shù)據(jù)類型是TridentTuple,是一個命名的值列表,Storm提供了多種方法來處理Batch中的Tuple數(shù)據(jù)。Functions操作:BaseFunction類的execute()方法,接收一個Tuple(需指定接收Tuple中的哪個字段),輸出0個或多個tuples。對接收的Tuple中的第一、第二字段進行了運算(如何運算,由開發(fā)人員決定),并通過最后一行代碼將運算結(jié)果發(fā)送給下游,如果在execute()方法中不輸出Tuple,可省略。publicclassSumFunction

extendsBaseFunction{privatestaticfinallongserialVersionUID=5L;publicvoidexecute(TridentTupletuple,TridentCollectorcollector){//通過位置索引,獲取tuple中第一、二字段的值intnumber1=tuple.getInteger(0);intnumber2=tuple.getInteger(1);intsum=number1+number2;collector.emit(newValues(sum));//將運算結(jié)果發(fā)送出去}}

TridentTopology通過newStream()方法,在拓?fù)渲袆?chuàng)建一個數(shù)據(jù)流對象,從輸入源中讀取數(shù)據(jù),在each()方法中調(diào)用SumFunction類,例如:在以下代碼中,“a”和“b”被稱為Tuple輸入字段名稱,“sum”被稱為函數(shù)字段名稱,輸入字段用于選擇Tuple的子集,作為運算的輸入,而函數(shù)字段則作為運算后發(fā)出的字段名。TridentTopologytopology=newTridentTopology();StreamdummyStream=topology.newStream("spout1",spout);dummyStream.each(newFields("a","b"),newSumFunction(),newFields("sum"));發(fā)出的新字段值會被追加到原始輸入Tuple的后面。Filters(過濾)操作:BaseFilter類的isKeep()方法,接收一個Tuple(需指定接收Tuple中的哪個字段),輸出布爾值,決定是否保留這個Tuple。當(dāng)返回值為true時,保留這個Tuple,當(dāng)返回值為false時,則過濾掉這個Tuple。projection(投影)操作:投影操作指保留指定字段的數(shù)據(jù),例如有一個Tuple中包含有三個字段:[“a”,“b”,“c”]。現(xiàn)在只保留其中的“a”字段,則可以在代碼中直接調(diào)用projection投影操作。Trident的Repartitioning(重定向):重定向操作如同storm的分組策略,共有以下六種:shuffle:通過隨機分配算法來均衡Tuple到各個分區(qū)broadcast:每個Tuple都被廣播到所有的分區(qū)partitionBy:根據(jù)指定的字段列表進行劃分,確保相同字段列表的數(shù)據(jù)被劃分到同一個分區(qū)global:所有的Tuple都被發(fā)送到一個分區(qū),這個分區(qū)用來處理整個StreambatchGlobal:一個Batch中的所有Tuple都被發(fā)送到同一個分區(qū),不同的Batch會去往不同的分區(qū)partition:通過一個自定義的分區(qū)函數(shù)CustomStreamGrouping來進行分區(qū)Trident中的Aggragation(聚合):Trident中的的Aggragation類似于MapReduce中的Combiner,但是,是在發(fā)送Tuple之前進行聚合。Trident中的聚合操作分兩種:(1)partitionAggregate()的操作是在Partition上,一個Batch被分成多個Partition后,每個Partition都會單獨運行partitionAggregate()中指定的聚合操作。(2)aggregate()隱含了一個global分區(qū)操作,也就是它做的是全局聚合操作。它針對的是整個Batch的聚合運算。(未完)Trident還提供了三種aggregator接口,分別是:ReducerAggregator,CombinerAggregator,Aggregator。(1)使用ReducerAggregator或Aggregator時,Stream首先被重定向到一個分區(qū)中,然后其中的聚合函數(shù)便在這個分區(qū)上運行。(2)CombinerAggregator,它是先在Partition上做部分聚合,然后再將這些部分聚合結(jié)果通過global分區(qū)到一個總的分區(qū),在這個總的分區(qū)上對結(jié)果進行匯總。Trident的groupBy()分組:根據(jù)指定的字段將相同字段的Tuple重定向到一起,匯聚成一個group,每個group就像一個Batch一樣被處理(如右圖中的示例)。接上一頁4.7.3 用Trident框架實現(xiàn)一個簡易的選擇性詞頻統(tǒng)計項目

該項目的特點如下:(1)創(chuàng)建TridentTopology類型的topology對象。(2)TridentState對象:在本例中代表了所有單詞的統(tǒng)計,用于對外提供給DRPC服務(wù)進行查詢。(3)newStream方法:在拓?fù)渲袆?chuàng)建一個新的數(shù)據(jù)流以便從輸入源中讀取數(shù)據(jù)。(4)parallelismHint():設(shè)置并行處理的數(shù)量。(5)each():配合運行函數(shù)(或過濾器)處理數(shù)據(jù),例如:each(newFields(“sentence”),newSplit(),newFields(“word”))。(6)groupBy()分組操作:按指定的字段進行分組。(7)persistentAggregate()聚合函數(shù):實現(xiàn)的是將數(shù)據(jù)存入指定的存儲介質(zhì)中。(8)stateQuery():提供對已生成的TridentState對象的查詢過濾。(9)分別在本地、集群,兩種模式下,進行了DRPC查詢。DRPC(DistributedRemoteProcedureCall分布式遠程過程調(diào)用),適用于在Trident框架的應(yīng)用開發(fā)中被調(diào)用,用于讀取Topology數(shù)據(jù)。其對TridentTopology信息的請求過程如下圖。項目結(jié)構(gòu)項目中各個類的功能如下:(1)TridentUtil.java類:繼承于BaseFunction類,負(fù)責(zé)將句子拆分成單詞;(2)TridentWordCount.java類:項目的執(zhí)行入口文件,負(fù)責(zé)Trident的搭建,以及TridentTopology、DRPC

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論