分布式計(jì)算系統(tǒng) 課件 Chapter-3-批處理系統(tǒng)MapReduce、Chapter-4-批處理系統(tǒng)Spark_第1頁(yè)
分布式計(jì)算系統(tǒng) 課件 Chapter-3-批處理系統(tǒng)MapReduce、Chapter-4-批處理系統(tǒng)Spark_第2頁(yè)
分布式計(jì)算系統(tǒng) 課件 Chapter-3-批處理系統(tǒng)MapReduce、Chapter-4-批處理系統(tǒng)Spark_第3頁(yè)
分布式計(jì)算系統(tǒng) 課件 Chapter-3-批處理系統(tǒng)MapReduce、Chapter-4-批處理系統(tǒng)Spark_第4頁(yè)
分布式計(jì)算系統(tǒng) 課件 Chapter-3-批處理系統(tǒng)MapReduce、Chapter-4-批處理系統(tǒng)Spark_第5頁(yè)
已閱讀5頁(yè),還剩238頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第三章批處理系統(tǒng)MapReduceHadoop簡(jiǎn)介Hadoop是Apache軟件基金會(huì)旗下的一個(gè)開源分布式計(jì)算平臺(tái),為用戶提供了系統(tǒng)底層細(xì)節(jié)透明的分布式基礎(chǔ)架構(gòu)Hadoop是基于Java語(yǔ)言開發(fā)的,具有很好的跨平臺(tái)特性,并且可以部署在廉價(jià)的計(jì)算機(jī)集群中Hadoop的核心是分布式文件系統(tǒng)HDFS(HadoopDistributedFileSystem)和MapReduceHadoop被公認(rèn)為行業(yè)大數(shù)據(jù)標(biāo)準(zhǔn)開源軟件,在分布式環(huán)境下提供了海量數(shù)據(jù)的處理能力2Hadoop生態(tài)圈發(fā)展路線3大綱4設(shè)計(jì)思想MPI與MapReduce數(shù)據(jù)模型計(jì)算模型體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例MPI(MessagePassingInterface)編程簡(jiǎn)介MPI是一個(gè)信息傳遞應(yīng)用程序接口,包括協(xié)議和和語(yǔ)義說(shuō)明常用接口MPI_Init(…)并行環(huán)境初始化MPI_Comm_size(…)獲得進(jìn)程個(gè)數(shù)sizeMPI_Comm_rank(…)獲取進(jìn)程的rank值MPI_Send(…)發(fā)送消息MPI_Recv(…)獲取消息MPI_Finalize()退出MPI環(huán)境5MPI編程舉例6運(yùn)行MPI程序7MPI程序工作示意8進(jìn)程0進(jìn)程1進(jìn)程2進(jìn)程3MPI_Recv()MPI_Send()MPI_Send()MPI_Send()MPI的局限性9從用戶編程的角度來(lái)看,程序員需要考慮到進(jìn)程之間的并行問(wèn)題,并且進(jìn)程之間的通信需要用戶在程序中顯式地表達(dá),這無(wú)疑增加了程序員編程的復(fù)雜性。從系統(tǒng)實(shí)現(xiàn)的角度來(lái)看,MPI程序是以多進(jìn)程方式運(yùn)行的。如果在運(yùn)行過(guò)程中某一進(jìn)程因故障導(dǎo)致崩潰,那么除非用戶在編寫程序時(shí)添加了故障恢復(fù)的功能,否則MPI編程框架本身并不能提供容錯(cuò)能力。大綱10設(shè)計(jì)思想MPI與MapReduce數(shù)據(jù)模型計(jì)算模型體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例數(shù)據(jù)模型11將數(shù)據(jù)抽象為一系列鍵值對(duì),在處理過(guò)程中對(duì)鍵值對(duì)進(jìn)行轉(zhuǎn)換大綱12設(shè)計(jì)思想MPI與MapReduce數(shù)據(jù)模型計(jì)算模型體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例抽象為Map和Reduce兩個(gè)過(guò)程Map的過(guò)程將輸入鍵值對(duì)進(jìn)行一次變換,產(chǎn)生若干個(gè)新的鍵值對(duì),Map轉(zhuǎn)換前后的鍵值對(duì)的內(nèi)容通常都會(huì)不同Reduce過(guò)程會(huì)對(duì)相同鍵的鍵值對(duì)進(jìn)行計(jì)算,并可根據(jù)需要將計(jì)算結(jié)果進(jìn)行一次鍵值對(duì)轉(zhuǎn)換后輸出有向無(wú)環(huán)圖(DirectedAcyclicGraph,DAG)邏輯計(jì)算模型13MapReduce物理計(jì)算模型14采用“分而治之”策略,由多個(gè)任務(wù)并行處理MapTaskMapTaskMapTaskMapTaskMapReduceTaskReduceTaskReduceTaskReduce用戶編程容易用戶不需要掌握分布式并行編程細(xì)節(jié)ClassX{map(){//map方法的實(shí)現(xiàn)…}reduce(){//reduce方法的實(shí)現(xiàn)}main(){Jobjob=…//定義分布式作業(yè)job.config=//作業(yè)參數(shù)設(shè)置}}15大綱16設(shè)計(jì)思想體系架構(gòu)架構(gòu)圖應(yīng)用程序執(zhí)行流程工作原理容錯(cuò)機(jī)制編程示例抽象架構(gòu)圖17HadoopMapReduce架構(gòu)圖18JobTracker資源管理通過(guò)監(jiān)控TaskTracker來(lái)管理系統(tǒng)擁有的計(jì)算資源作業(yè)管理負(fù)責(zé)將作業(yè)(Job)拆分成任務(wù)(Task),并進(jìn)行任務(wù)調(diào)度以及跟蹤任務(wù)的運(yùn)行進(jìn)度、資源使用量等信息19TaskTracker20管理本節(jié)點(diǎn)的資源TaskTracker使用slot等量劃分本節(jié)點(diǎn)上的資源量(CPU、內(nèi)存等)執(zhí)行JobTracker的命令接收J(rèn)obTracker發(fā)送過(guò)來(lái)的命令并執(zhí)行(如啟動(dòng)新Task、殺死Task等)向JobTracker匯報(bào)情況通過(guò)心跳將本節(jié)點(diǎn)上資源使用情況和任務(wù)運(yùn)行進(jìn)度匯報(bào)給JobTrackerTask任務(wù)執(zhí)行JobTracker根據(jù)TaskTracker匯報(bào)的信息進(jìn)行調(diào)度,命令存在空閑slot的TaskTracker啟動(dòng)Task進(jìn)程執(zhí)行map或reduce任務(wù)在HadoopMapReduce的實(shí)現(xiàn)中該進(jìn)程的名稱為Child21Client提交作業(yè)用戶編寫的MapReduce程序通過(guò)Client提交到JobTracker用戶可通過(guò)Client提供的一些接口查看作業(yè)運(yùn)行狀態(tài)在HadoopMapReduce的實(shí)現(xiàn)中,該進(jìn)程的名稱為RunJar22MapReduce與HDFS關(guān)系23計(jì)算與存儲(chǔ)相分離計(jì)算向數(shù)據(jù)靠攏,而不是數(shù)據(jù)向計(jì)算靠攏大綱24設(shè)計(jì)思想體系架構(gòu)架構(gòu)圖應(yīng)用程序執(zhí)行流程工作原理容錯(cuò)機(jī)制編程示例應(yīng)用程序執(zhí)行流程25應(yīng)用程序執(zhí)行流程26Client將用戶編寫的MapReduce作業(yè)的配置信息、jar包等信息上傳到共享文件系統(tǒng),通常是HDFS。Client提交作業(yè)給JobTracker,即告知作業(yè)信息的位置。JobTracker讀取作業(yè)的信息,生成一系列Map和Reduce任務(wù),調(diào)度給擁有空閑slot的TaskTracker。TaskTracker根據(jù)JobTacker的指令啟動(dòng)Child進(jìn)程執(zhí)行Map任務(wù),Map任務(wù)將從共享文件系統(tǒng)讀取輸入數(shù)據(jù)。

JobTracker從TaskTracker處獲得Map任務(wù)進(jìn)度信息。一旦Map任務(wù)完成后,JobTacker將Reduce任務(wù)分發(fā)給TaskTracker。TaskTracker根據(jù)JobTacker的指令啟動(dòng)Child進(jìn)程執(zhí)行Reduce任務(wù),Reduce任務(wù)將從Map任務(wù)所在節(jié)點(diǎn)的本地磁盤拉取Map的輸出結(jié)果。JobTracker從TaskTracker處獲得Reduce任務(wù)進(jìn)度信息。當(dāng)Reduce任務(wù)運(yùn)行結(jié)束并將結(jié)果寫入共享文件系統(tǒng),則意味著整個(gè)作業(yè)執(zhí)行完畢。大綱27設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例Map-Shuffle-Reduce28大綱29設(shè)計(jì)思想體系架構(gòu)工作原理數(shù)據(jù)輸入Map階段Shuffle階段Reduce階段數(shù)據(jù)輸出容錯(cuò)機(jī)制編程示例數(shù)據(jù)輸入30從存儲(chǔ)系統(tǒng)中文件與Map任務(wù)可處理的鍵值對(duì)記錄之間的映射輸入文件的格式問(wèn)題:文件分塊存儲(chǔ),可能存在跨塊記錄Splitvs.Block31Split相對(duì)block而言是邏輯概念,包含一些元信息,如數(shù)據(jù)起始位置、數(shù)據(jù)長(zhǎng)度、數(shù)據(jù)所在節(jié)點(diǎn)等信息InputFormat32數(shù)據(jù)邏輯劃分InputFormat根據(jù)預(yù)定義格式將輸入數(shù)據(jù)在邏輯上劃分為若干個(gè)Split(切片)Map任務(wù)讀取的單位是Split,而不是物理的文件塊Split的數(shù)量往往決定了Map任務(wù)的個(gè)數(shù),一個(gè)Split的數(shù)據(jù)一般由一個(gè)Map任務(wù)來(lái)處理鍵值對(duì)解析給定一個(gè)Split,InputFormat將根據(jù)分隔符、大小等元信息將Split中的數(shù)據(jù)解析為相應(yīng)鍵值對(duì)常見的InputFormat33TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat…自定義InputFormat大綱34設(shè)計(jì)思想體系架構(gòu)工作原理數(shù)據(jù)輸入Map階段Shuffle階段Reduce階段數(shù)據(jù)輸出容錯(cuò)機(jī)制編程示例Map邏輯過(guò)程35[k1,v1]→List([k2,v2])Map物理過(guò)程36Map任務(wù)的數(shù)量MapReduce為每個(gè)split創(chuàng)建一個(gè)Map任務(wù),split的多少?zèng)Q定了Map任務(wù)的數(shù)目mapred.map.tasks設(shè)置程序員期望的map個(gè)數(shù)37大綱38設(shè)計(jì)思想體系架構(gòu)工作原理數(shù)據(jù)輸入Map階段Shuffle階段Reduce階段數(shù)據(jù)輸出容錯(cuò)機(jī)制編程示例Shuffle邏輯過(guò)程39List([k2,v2])

→[k2,List(v2)]Shuffle物理過(guò)程40何時(shí)Shuffle?41當(dāng)系統(tǒng)中的Map任務(wù)完成率達(dá)到設(shè)定閾值時(shí),系統(tǒng)將啟動(dòng)Reduce任務(wù)例如,閾值設(shè)定為60%意味著如果系統(tǒng)中共有100個(gè)Map任務(wù),那么一旦有60個(gè)Map任務(wù)已經(jīng)完成了就可以啟動(dòng)Reduce任務(wù),而不必等到這100個(gè)Map任務(wù)全部完成Reduce任務(wù)不會(huì)等到所有的Map任務(wù)執(zhí)行結(jié)束才拉取Map任務(wù)的輸出結(jié)果,但是拉取的數(shù)據(jù)必然來(lái)自于已經(jīng)完成運(yùn)行的Map任務(wù),即已經(jīng)保存在磁盤上的文件大綱42設(shè)計(jì)思想體系架構(gòu)工作原理數(shù)據(jù)輸入Map階段Shuffle階段Reduce階段數(shù)據(jù)輸出容錯(cuò)機(jī)制編程示例[k2,List(v2)]→[k3,v3]Reduce邏輯過(guò)程43Reduce物理過(guò)程44Reduce任務(wù)的數(shù)量程序指定最優(yōu)的Reduce任務(wù)個(gè)數(shù)取決于集群中可用的reduce任務(wù)槽(slot)的數(shù)目通常設(shè)置比reduce任務(wù)槽數(shù)目稍微小一些的Reduce任務(wù)個(gè)數(shù)45大綱46設(shè)計(jì)思想體系架構(gòu)工作原理數(shù)據(jù)輸入Map階段Shuffle階段Reduce階段數(shù)據(jù)輸出容錯(cuò)機(jī)制編程示例數(shù)據(jù)輸出47每個(gè)Reduce任務(wù)的輸出結(jié)果將以一個(gè)文件的形式保持到指定的目錄當(dāng)中MapReduce輸出結(jié)果是一組文件OutputFormat48與數(shù)據(jù)輸入階段相反,MapReduce需要定義輸出文件的格式,即OutputFormat包括分隔符等元信息從MapReduce程序處理的邏輯鍵值對(duì)數(shù)據(jù)到物理存儲(chǔ)之間的映射MapReduce系統(tǒng)將Reduce任務(wù)處理產(chǎn)生的結(jié)果按OutputFormat定義的格式寫入HDFS等常見的OutputFormat49TextOutputFormatNullOutputFormatLazyOutputFormat…自定義OutputFormat大綱50設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例MapReduce故障類型51主節(jié)點(diǎn)故障JobTracker故障:如宕機(jī)引起從節(jié)點(diǎn)故障TaskTracker故障:如節(jié)點(diǎn)宕機(jī)引起Task故障:如JVM內(nèi)存不夠退出MapReduce容錯(cuò)和HDFS容錯(cuò)是兩回事大綱52設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制JobTracker故障TaskTracker故障Task故障編程示例JobTracker故障53對(duì)于MapReduce1.0的架構(gòu),JobTracker故障意味著所有作業(yè)需要重新執(zhí)行MapReduce1.0沒(méi)有處理JobTracker故障的機(jī)制,因而成為單點(diǎn)瓶頸大綱54設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制JobTracker故障TaskTracker故障Task故障編程示例TaskTracker故障55JobTracker不會(huì)接收到“心跳”JobTracker會(huì)安排其他TaskTracker重新運(yùn)行失敗TaskTracker的任務(wù)這個(gè)過(guò)程對(duì)于用戶來(lái)說(shuō)是透明的,只會(huì)感覺(jué)到該作業(yè)在執(zhí)行某段時(shí)間里變慢了而已大綱56設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制JobTracker故障TaskTracker故障Task故障編程示例Task故障57MapTask故障重新執(zhí)行Map任務(wù)去HDFS重新讀入數(shù)據(jù)Task故障58ReduceTask故障重新執(zhí)行Reduce任務(wù)去哪里重新讀入數(shù)據(jù)?Task故障59典型例子Map任務(wù)或Reduce任務(wù)代碼異常當(dāng)一個(gè)任務(wù)經(jīng)過(guò)最大嘗試次數(shù)運(yùn)行后仍然失敗,那么整個(gè)作業(yè)將被標(biāo)記為失敗大綱60設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例Map方法框架61TextInputFormat→[LongWritable,Text]Reduce方法框架62主方法框架63大綱64設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例詞頻統(tǒng)計(jì)關(guān)系表自然連接及其優(yōu)化網(wǎng)頁(yè)鏈接排名K均值聚類詞頻統(tǒng)計(jì)65輸入:一個(gè)包含大量單詞的文本文件輸出:文件中每個(gè)單詞及其出現(xiàn)次數(shù)(頻數(shù))每個(gè)單詞和其頻數(shù)占一行,單詞和頻數(shù)之間有間隔輸入輸出AnAnMyMeAnAnMyHeMyMyAnMyAn5He1Me1My5解決方案66Map過(guò)程:把文本的每行內(nèi)容轉(zhuǎn)換為鍵值對(duì)[單詞,1]Reduce過(guò)程:?jiǎn)卧~相同的鍵值對(duì)被發(fā)送到同一個(gè)Reduce中對(duì)單詞相同的鍵值對(duì)進(jìn)行計(jì)數(shù)輸出計(jì)數(shù)后的結(jié)果[單詞,頻數(shù)]詞頻統(tǒng)計(jì)運(yùn)行過(guò)程67編寫map方法68編寫reduce方法69編寫主方法70詞頻統(tǒng)計(jì)運(yùn)行過(guò)程71優(yōu)化方案72使用combine方法減少Shuffle數(shù)據(jù)量減少Reduce過(guò)程需要處理的數(shù)據(jù)量編寫combine方法73修改主方法74設(shè)置combine方法大綱75設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例詞頻統(tǒng)計(jì)關(guān)系表自然連接及其優(yōu)化網(wǎng)頁(yè)鏈接排名K均值聚類關(guān)系表的自然連接76輸入:兩個(gè)CSV文件,分別保存雇員表和部門表輸出:雇員表和部門表的自然連接結(jié)果雇員表部門表雇員表?部門表解決方案77Map過(guò)程:把來(lái)自雇員表的每個(gè)元組A轉(zhuǎn)換成鍵值對(duì)[DeptName,雇員A]把來(lái)自部門表的每個(gè)元組B轉(zhuǎn)換成鍵值對(duì)[DeptName,部門B]Reduce過(guò)程:具有相同DeptName值的元組被發(fā)送到同一Reduce中來(lái)自雇員表和部門表的具有相同DeptName值的元組進(jìn)行連接輸出連接后的元組標(biāo)記來(lái)自哪個(gè)關(guān)系表關(guān)系表自然連接的運(yùn)行過(guò)程78編寫自定義ReduceJoinWritable79自定義數(shù)據(jù)類型來(lái)保存標(biāo)識(shí)和元組兩類信息編寫map方法80獲取鍵值對(duì)所屬的文件路徑,并利用路徑對(duì)鍵值對(duì)進(jìn)行分類處理編寫reduce方法81從輸入值中分離元組,并執(zhí)行連接操作編寫主方法82

83假如部門表比較小,雇員表非常大雇員表部門表………數(shù)據(jù)分布示列84不考慮備份藍(lán)色:雇員表白色:部門表若數(shù)據(jù)本身無(wú)序,連接將有大量的數(shù)據(jù)移動(dòng)MapReduce層HDFS層hadoop-namenodehadoop-datanode1hadoop-datanode2hadoop-datanode3hadoop-datanode4JobTrackerTaskTrackerTaskTrackerTaskTrackerTaskTrackerMasterSlavesDataNodeDataNodeDataNodeDataNodeNameNodeMapReduce層HDFS層優(yōu)化方案85編程時(shí)將“小表”廣播出去MapReduce層HDFS層hadoop-namenodehadoop-datanode1hadoop-datanode2hadoop-datanode3hadoop-datanode4JobTrackerTaskTrackerTaskTrackerTaskTrackerTaskTrackerMasterSlavesDataNodeDataNodeDataNodeDataNodeNameNodeMapReduce層HDFS層優(yōu)化方案86編寫map方法87讀取廣播的部門表,并與輸入的雇員表執(zhí)行連接操作修改主方法88將Reduce任務(wù)數(shù)設(shè)置為0,并廣播部門表大綱89設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例詞頻統(tǒng)計(jì)關(guān)系表自然連接及其優(yōu)化網(wǎng)頁(yè)鏈接排名K均值聚類網(wǎng)頁(yè)鏈接排名90輸入:保存在文本文件中,一行為一項(xiàng)網(wǎng)頁(yè)信息網(wǎng)頁(yè)信息:(網(wǎng)頁(yè)名網(wǎng)頁(yè)排名值(出站鏈接出站鏈接的權(quán)重...))輸出:網(wǎng)頁(yè)名及其排名值輸入輸出A1.0B1.0D1.0B1.0C1.0C1.0A1.0B1.0D1.0B1.0C1.0A0.21436B0.36332C0.40833D0.13027網(wǎng)頁(yè)鏈接排名91算法思路許多網(wǎng)頁(yè)鏈向該網(wǎng)頁(yè),則該網(wǎng)頁(yè)排名高有一個(gè)高排名值的網(wǎng)頁(yè)鏈向該網(wǎng)頁(yè),則該網(wǎng)頁(yè)排名高圖片來(lái)源:/wiki/File:PageRank-hi-res-2.png網(wǎng)頁(yè)鏈接排名92算法執(zhí)行過(guò)程初始時(shí),每個(gè)網(wǎng)頁(yè)的排名值為1網(wǎng)頁(yè)鏈接排名93

網(wǎng)頁(yè)鏈接排名94

網(wǎng)頁(yè)鏈接排名95

網(wǎng)頁(yè)鏈接排名96

網(wǎng)頁(yè)鏈接排名97

迭代計(jì)算98迭代計(jì)算的特征由一系列迭代步驟(Step)的循環(huán)構(gòu)成,每個(gè)步驟執(zhí)行的操作完全相同,直到最大迭代次數(shù)一個(gè)步驟的輸出是下一個(gè)步驟的輸入迭代計(jì)算在MapReduce中的實(shí)現(xiàn)一個(gè)步驟的計(jì)算過(guò)程由一個(gè)MapReduce作業(yè)來(lái)實(shí)現(xiàn),迭代次數(shù)決定了MapReduce作業(yè)的個(gè)數(shù)每一步驟結(jié)束時(shí)將結(jié)果寫入HDFS,下一步將該結(jié)果再次從HDFS讀出解決方案99

一次迭代一個(gè)MapReduce作業(yè)解決方案100Map過(guò)程:把每項(xiàng)網(wǎng)頁(yè)信息轉(zhuǎn)換為鍵值對(duì)[網(wǎng)頁(yè)名,{PAGE_INFO,網(wǎng)頁(yè)信息}]計(jì)算每個(gè)網(wǎng)頁(yè)對(duì)其出站鏈接的貢獻(xiàn)值,即將每項(xiàng)網(wǎng)頁(yè)信息轉(zhuǎn)換成鍵值對(duì)[出站鏈接,{PR_L,貢獻(xiàn)值}]Reduce過(guò)程:計(jì)算每個(gè)網(wǎng)頁(yè)的排名值如果不是最后一次迭代,則輸出更新排名值后的網(wǎng)頁(yè)信息。否則,輸出網(wǎng)頁(yè)名及其排名值區(qū)分網(wǎng)頁(yè)信息和貢獻(xiàn)值單次迭代運(yùn)行過(guò)程101編寫ReducePageRankWritable102與關(guān)系表中類似,自定義類型以對(duì)不同信息進(jìn)行標(biāo)識(shí)編寫map方法103計(jì)算當(dāng)前網(wǎng)頁(yè)對(duì)出站鏈接的貢獻(xiàn)值,并以出站鏈接的網(wǎng)頁(yè)名為鍵進(jìn)行輸出以輸入的網(wǎng)頁(yè)信息的網(wǎng)絡(luò)名稱為鍵輸出網(wǎng)頁(yè)信息編寫reduce方法104從輸入值中分離網(wǎng)頁(yè)信息和貢獻(xiàn)值,計(jì)算排名值并更新網(wǎng)頁(yè)信息編寫主方法105

將每一次迭代的輸出設(shè)置為下一次迭代的輸入,循環(huán)提交作業(yè)以執(zhí)行迭代運(yùn)算

大綱106設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例詞頻統(tǒng)計(jì)關(guān)系表自然連接及其優(yōu)化網(wǎng)頁(yè)鏈接排名K均值聚類K均值聚類107輸入:兩個(gè)文本文件,分別保存數(shù)據(jù)集和聚類中心集數(shù)據(jù)集:每行為一個(gè)二維數(shù)據(jù)點(diǎn)及其類別標(biāo)簽聚類中心集:每行為一個(gè)二維數(shù)據(jù)點(diǎn)輸出:數(shù)據(jù)點(diǎn)及其類別標(biāo)簽數(shù)據(jù)集聚類中心集聚類結(jié)果0,0-11,2-13,1-18,8-19,10-110,7-11,23,10,01.01,21.010,72.03,11.08,82.09,102.0K均值聚類108算法執(zhí)行過(guò)程設(shè)定聚類中心數(shù)k。

例如,k=2K均值聚類109算法執(zhí)行過(guò)程設(shè)定聚類中心數(shù)k。

例如,k=2選取聚類中心K均值聚類110算法執(zhí)行過(guò)程設(shè)定聚類中心數(shù)k。

例如,k=2選取聚類中心尋找每個(gè)數(shù)據(jù)點(diǎn)距

離最近的聚類中心K均值聚類111算法執(zhí)行過(guò)程設(shè)定聚類中心數(shù)k。

例如,k=2選取聚類中心尋找每個(gè)數(shù)據(jù)點(diǎn)距

離最近的聚類中心計(jì)算同屬一個(gè)聚類

中心的數(shù)據(jù)點(diǎn)的新

的聚類中心K均值聚類112算法執(zhí)行過(guò)程設(shè)定聚類中心數(shù)k。

例如,k=2選取聚類中心尋找每個(gè)數(shù)據(jù)點(diǎn)距

離最近的聚類中心計(jì)算同屬一個(gè)聚類

中心的數(shù)據(jù)點(diǎn)的新

的聚類中心重復(fù)3-4直到滿足迭

代終止條件Na?ve解決方案113算法執(zhí)行過(guò)程設(shè)定聚類中心數(shù)k。

例如,k=2選取聚類中心尋找每個(gè)數(shù)據(jù)點(diǎn)距

離最近的聚類中心計(jì)算同屬一個(gè)聚類

中心的數(shù)據(jù)點(diǎn)的新

的聚類中心重復(fù)3-4直到滿足迭

代終止條件一次迭代兩個(gè)MapReduce作業(yè)一個(gè)MapReduce作業(yè)一個(gè)MapReduce作業(yè)Na?ve解決方案114第一個(gè)MapReduce作業(yè)Map過(guò)程:將數(shù)據(jù)點(diǎn)分配到不同的Reduce任務(wù),即將數(shù)據(jù)點(diǎn)轉(zhuǎn)換為鍵值對(duì)[Reduce任務(wù)標(biāo)識(shí),point數(shù)據(jù)點(diǎn)]將聚類中心發(fā)送到所有的Reduce任務(wù),即將聚類中心轉(zhuǎn)換為鍵值對(duì)[Reduce任務(wù)標(biāo)識(shí),center聚類中心]Reduce過(guò)程:分離數(shù)據(jù)點(diǎn)和聚類中心,計(jì)算每個(gè)數(shù)據(jù)點(diǎn)與聚類中心的距離,為數(shù)據(jù)點(diǎn)添加類別標(biāo)簽以[類別號(hào),數(shù)據(jù)點(diǎn)]形式的鍵值對(duì)輸出聚類結(jié)果Na?ve解決方案115第二個(gè)MapReduce作業(yè)Map過(guò)程:讀取第一個(gè)MapReduce作業(yè)產(chǎn)生的聚類結(jié)果,并將結(jié)果中的數(shù)據(jù)點(diǎn)轉(zhuǎn)換為[類別號(hào),數(shù)據(jù)點(diǎn)]的鍵值對(duì)Reduce過(guò)程:計(jì)算新的聚類中心,并輸出[新聚類中心,空值]的鍵值對(duì)Na?ve解決方案的單次迭代運(yùn)行過(guò)程116特點(diǎn):每個(gè)數(shù)據(jù)點(diǎn)都需要與所有聚類中心進(jìn)行計(jì)算單次迭代運(yùn)行過(guò)程117“廣播”聚類中心集解決方案118算法執(zhí)行過(guò)程設(shè)定聚類中心數(shù)k。

例如,k=2選取聚類中心尋找每個(gè)數(shù)據(jù)點(diǎn)距

離最近的聚類中心計(jì)算同屬一個(gè)聚類

中心的數(shù)據(jù)點(diǎn)的新

的聚類中心重復(fù)3-4直到滿足迭

代終止條件一次迭代一個(gè)MapReduce作業(yè)在Map階段實(shí)現(xiàn),其中采用廣播方式讀取聚類中心在Reduce階段實(shí)現(xiàn)解決方案119Map過(guò)程:廣播方式讀取聚類中心集計(jì)算每個(gè)數(shù)據(jù)點(diǎn)與各聚類中心之間的距離,為數(shù)據(jù)點(diǎn)添加類別標(biāo)簽,即將數(shù)據(jù)點(diǎn)轉(zhuǎn)換為[類別號(hào),數(shù)據(jù)點(diǎn)]的鍵值對(duì)Reduce過(guò)程:為屬于同一聚類中心的數(shù)據(jù)點(diǎn)計(jì)算新的聚類中心,并輸出[新聚類中心,空值]的鍵值對(duì)編寫map方法120讀取廣播的聚類中心集,為每個(gè)數(shù)據(jù)點(diǎn)添加類別標(biāo)簽編寫reduce方法121從輸入值中解析數(shù)據(jù)點(diǎn)并計(jì)算聚類中心編寫主方法122廣播聚類中心集,并在最后一次迭代時(shí)將Reduce任務(wù)數(shù)設(shè)置為0迭代計(jì)算的性能分析123一個(gè)步驟的計(jì)算過(guò)程由一個(gè)MapReduce作業(yè)來(lái)實(shí)現(xiàn),迭代次數(shù)決定了MapReduce作業(yè)的個(gè)數(shù)每一步驟結(jié)束時(shí)將結(jié)果寫入HDFS,下一步將該結(jié)果再次從HDFS讀出StepStepStepStepStepClient課后閱讀124論文Dean,J.,&Ghemawat,S.(2004).MapReduce:SimplifiedDataProcessingonLargeClusters.InOSDI(pp.137–149).本章小結(jié)設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例125第四章批處理系統(tǒng)SparkSpark簡(jiǎn)介Spark由美國(guó)加州伯克利大學(xué)(UCBerkeley)的AMP實(shí)驗(yàn)室于2009年開發(fā)最初是基于內(nèi)存計(jì)算的批處理系統(tǒng),用于構(gòu)建大型的、低延遲的數(shù)據(jù)分析應(yīng)用程序逐步發(fā)展成為內(nèi)外存同時(shí)使用的批處理系統(tǒng),并增加了SparkStreaming支持實(shí)時(shí)流計(jì)算,以及StructuredStreaming支持批流融合127Spark簡(jiǎn)介2013年Spark加入Apache孵化器項(xiàng)目后發(fā)展迅猛Spark在2014年打破了Hadoop保持的基準(zhǔn)排序紀(jì)錄Spark/206個(gè)節(jié)點(diǎn)/23分鐘/100TB數(shù)據(jù)Hadoop/2000個(gè)節(jié)點(diǎn)/72分鐘/100TB數(shù)據(jù)Spark用十分之一的計(jì)算資源,獲得了比Hadoop快3倍的速度128Spark軟件棧129Spark

CoreStructured

Streaming+Spark

SQL

Spark

StreamingMLlib(machinelearning)

GraphX(graph)大綱130設(shè)計(jì)思想MapReduce的局限性數(shù)據(jù)模型計(jì)算模型體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例MapReduce編程范型編程容易,不需要掌握分布式并行編程細(xì)節(jié),很容易把程序運(yùn)行在分布式系統(tǒng)上但是基礎(chǔ)算子太少:例如,如何實(shí)現(xiàn)join?ClassX{map(){//map函數(shù)的實(shí)現(xiàn)…}reduce(){//reduce函數(shù)的實(shí)現(xiàn)}main(){Jobjob=…//定義分布式作業(yè)job.config=//作業(yè)參數(shù)設(shè)置}}131單個(gè)MapReduce作業(yè)Map端的結(jié)果要先寫入本地磁盤,再由Reduce端來(lái)拉取文件A文件BMap+Reduce讀磁盤Shuffle階段Map端的結(jié)果需寫磁盤寫磁盤132多個(gè)MapReduce作業(yè)例子:迭代計(jì)算過(guò)程中每一迭代步結(jié)束時(shí)將結(jié)果寫入HDFS,下一步將該結(jié)果再次從HDFS讀出StepStepStepStepStepClient133MapReduce局限性編程框架的表達(dá)能力有限,用戶編程復(fù)雜僅map和reduce函數(shù),無(wú)法直接用join等操作單個(gè)作業(yè)中需要Shuffle的數(shù)據(jù)以阻塞方式傳輸,磁盤IO開銷大、延遲高Shuffle數(shù)據(jù)需要先寫磁盤多個(gè)作業(yè)之間銜接涉及IO開銷,應(yīng)用程序的延遲高特別是迭代計(jì)算,迭代中間結(jié)果的反復(fù)讀寫,使得整個(gè)應(yīng)用程序的延遲非常高134大綱135設(shè)計(jì)思想MapReduce的局限性數(shù)據(jù)模型計(jì)算模型體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例RDD概念136Resilient

Distributed

DatasetRDD是一個(gè)數(shù)據(jù)集(Dataset):與MapReduce不同,Spark操作對(duì)象是抽象的數(shù)據(jù)集,而不是文件RDD是分布式的(Distributed):每個(gè)RDD可分成多個(gè)分區(qū),每個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,一個(gè)RDD的不同分區(qū)可以存到集群中不同的節(jié)點(diǎn)上RDD具有彈性(Resilient):即具備可恢復(fù)的容錯(cuò)特性大綱137設(shè)計(jì)思想MapReduce的局限性數(shù)據(jù)模型計(jì)算模型體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例RDD操作算子138創(chuàng)建:從本地內(nèi)存或外部數(shù)據(jù)源創(chuàng)建RDD,提供了數(shù)據(jù)輸入的功能RDD操作算子139轉(zhuǎn)換(Transformation):描述RDD的轉(zhuǎn)換邏輯,提供對(duì)RDD進(jìn)行變換的功能RDD操作算子140行動(dòng)(Action):標(biāo)志轉(zhuǎn)換結(jié)束,觸發(fā)DAG生成邏輯計(jì)算模型:OperatorDAG141從算子操作的角度來(lái)描述計(jì)算的過(guò)程邏輯計(jì)算模型:RDDLineage142從RDD變換的角度來(lái)描述計(jì)算過(guò)程RDDLineageRDDLineage(DAG拓?fù)浣Y(jié)構(gòu))RDD讀入外部數(shù)據(jù)源進(jìn)行創(chuàng)建RDD經(jīng)過(guò)一系列的轉(zhuǎn)換(Transformation)操作,每一次都會(huì)產(chǎn)生不同的RDD,供給下一個(gè)轉(zhuǎn)換操作使用最后一個(gè)RDD經(jīng)過(guò)“動(dòng)作”操作進(jìn)行轉(zhuǎn)換,并輸出到外部數(shù)據(jù)源Spark系統(tǒng)保留RDDLineage的信息為什么?143非嚴(yán)格情況下兩個(gè)概念可以混用RDD只讀、不可變RDD是只讀的記錄分區(qū)的集合本質(zhì)上一個(gè)只讀的對(duì)象集合RDD經(jīng)創(chuàng)建后,不能進(jìn)行修改RDD不可變(Immutable)通過(guò)在其他RDD上執(zhí)行確定的轉(zhuǎn)換操作(如map、join和groupby)而得到新的RDD,而不是改變?cè)械腞DD遵循了函數(shù)式編程的特性變量的值是不可變的144物理計(jì)算模型:OperatorDAG145分布式架構(gòu)中,DAG中的操作算子實(shí)際上由若干個(gè)實(shí)例任務(wù)(Task)來(lái)實(shí)現(xiàn)問(wèn)題:右圖需要啟動(dòng)多少個(gè)Task?物理計(jì)算模型:RDDLineage146每個(gè)Task通常負(fù)責(zé)處理RDD的一個(gè)分區(qū)大綱147設(shè)計(jì)思想體系架構(gòu)架構(gòu)圖應(yīng)用程序執(zhí)行流程工作原理容錯(cuò)機(jī)制編程示例抽象架構(gòu)圖148DriverClusterManagerExecutorWorkerNodeTaskTaskExecutorWorkerNodeTaskTaskClusterManager149集群管理器:負(fù)責(zé)管理整個(gè)系統(tǒng)的資源、監(jiān)控工作節(jié)點(diǎn)根據(jù)Spark部署方式的不同在Standalone方式(即不使用Yarn或Mesos等其它資源管理系統(tǒng))中,集群管理器包含Master和Worker在Yarn方式中集群管理器包括ResourceManager和NodeManagerExecutor150執(zhí)行器:負(fù)責(zé)任務(wù)執(zhí)行Executor是運(yùn)行在工作節(jié)點(diǎn)上的一個(gè)進(jìn)程,它啟動(dòng)若干個(gè)線程Task或線程組TaskSet來(lái)進(jìn)行執(zhí)行任務(wù)在Standalone部署方式下,Executor進(jìn)程的名稱為CoarseGrainedExecutorBackend問(wèn)題:MapReduce中Task是線程還是進(jìn)程?Driver151驅(qū)動(dòng)器:負(fù)責(zé)啟動(dòng)應(yīng)用程序的主方法并管理作業(yè)運(yùn)行Spark的架構(gòu)實(shí)現(xiàn)了資源管理和作業(yè)管理兩大功能的分離ClusterManager負(fù)責(zé)集群資源管理Driver負(fù)責(zé)作業(yè)管理Standalone架構(gòu)圖152架構(gòu)圖比較153Standalone架構(gòu)圖抽象架構(gòu)圖SparkSubmit(Client)/MasterClusterManagerWorkerCoarseGrainedExecutorBackendExecutor物理節(jié)點(diǎn)(從節(jié)點(diǎn))WorkerNode?DriverStandalone中的Driver154邏輯上,Driver獨(dú)立于主節(jié)點(diǎn)、從節(jié)點(diǎn)以及客戶端但根據(jù)應(yīng)用程序的Client或Cluster運(yùn)行方式,Driver會(huì)以不同的形式存在Client方式:Driver和客戶端以同一個(gè)進(jìn)程存在Cluster方式:系統(tǒng)將由某一Worker啟動(dòng)一個(gè)進(jìn)程作為Driver客戶端提交應(yīng)用程序時(shí)可以選擇Client或Cluster方式StandaloneClient模式155Driver和客戶端以同一個(gè)進(jìn)程存在StandaloneCluster模式156某一Worker啟動(dòng)一個(gè)名為DriverWrapper的進(jìn)程作為DriverSparkvs.MapReduce157二者架構(gòu)比較MapReduceSpark系統(tǒng)進(jìn)程JobTrackerMasterTaskTrackerWorkerChildCoarseGrainedExecutorBackend工作線程/Task任務(wù)代碼Task基礎(chǔ)接口map/reduceRDDAPI大綱158設(shè)計(jì)思想體系架構(gòu)架構(gòu)圖應(yīng)用程序執(zhí)行流程工作原理容錯(cuò)機(jī)制編程示例抽象執(zhí)行流程159Driver(SparkContext)ExecutorWorkerNodeTaskTaskExecutorWorkerNodeTaskTask(1)(2)(3)(4)(3)(4)(5)(5)ClusterManager應(yīng)用程序執(zhí)行流程160啟動(dòng)Driver,以Standalone模式為例如果使用Client部署方式,客戶端直接啟動(dòng)Driver,并向Master注冊(cè)如果使用Cluster部署方式,客戶端將應(yīng)用程序提交給Master,由Master選擇一個(gè)Worker啟動(dòng)Driver進(jìn)程(DriverWrapper)構(gòu)建基本運(yùn)行環(huán)境,即由Driver創(chuàng)建SparkContext,向ClusterManager進(jìn)行資源申請(qǐng),并由Driver進(jìn)行任務(wù)分配和監(jiān)控應(yīng)用程序執(zhí)行流程(續(xù))161ClusterManager通知工作節(jié)點(diǎn)啟動(dòng)Executor進(jìn)程,該進(jìn)程內(nèi)部以多線程方式運(yùn)行任務(wù)Executor進(jìn)程向Driver注冊(cè)SparkContext構(gòu)建DAG并進(jìn)行任務(wù)劃分,從而交給Executor進(jìn)程中的線程來(lái)執(zhí)行任務(wù)大綱162設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例Driver內(nèi)部工作過(guò)程163ExecutorWorkerNodeTaskTaskExecutorWorkerNodeTaskTaskClusterManagerDriver(SparkContext)解析器DAGTask調(diào)度器Stage(TaskSet)DAG調(diào)度器RDD對(duì)象大綱164設(shè)計(jì)思想體系架構(gòu)工作原理Stage劃分Stage內(nèi)部數(shù)據(jù)傳輸Stage之間數(shù)據(jù)傳輸應(yīng)用與作業(yè)容錯(cuò)機(jī)制編程示例RDD依賴關(guān)系窄依賴表現(xiàn)為一個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū)或多個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū)寬依賴則表現(xiàn)為存在一個(gè)父RDD的一個(gè)分區(qū)對(duì)應(yīng)一個(gè)子RDD的多個(gè)分區(qū)165通過(guò)依賴關(guān)系進(jìn)行Stage劃分分析各個(gè)RDD的偏序關(guān)系生成DAG,再通過(guò)分析各個(gè)RDD中的分區(qū)之間的依賴關(guān)系來(lái)決定如何劃分Stage具體劃分方法:在DAG中進(jìn)行反向解析,遇到寬依賴就斷開遇到窄依賴就把當(dāng)前的RDD加入到Stage中為什么將窄依賴盡可能劃分在同一個(gè)Stage?166基于RDDLineage的Stage劃分167寬依賴基于OperatorDAG的Stage劃分168Stage類型ShuffleMapStage輸入/輸出輸入可以是從外部獲取數(shù)據(jù),也可以是另一個(gè)ShuffleMapStage的輸出以Shuffle為輸出,作為另一個(gè)Stage開始特點(diǎn)不是最終的Stage,在它之后還有其他Stage它的輸出一定需要經(jīng)過(guò)Shuffle過(guò)程,并作為后續(xù)Stage的輸入在一個(gè)DAG里可能有該類型的Stage,也可能沒(méi)有該類型Stage169Stage類型ResultStage輸入/輸出其輸入可以是從外部獲取數(shù)據(jù),也可以是另一個(gè)ShuffleMapStage的輸出輸出直接產(chǎn)生結(jié)果或存儲(chǔ)特點(diǎn)最終的Stage在一個(gè)DAG里必定有該類型Stage因此,一個(gè)DAG含有一個(gè)或多個(gè)Stage,其中至少含有一個(gè)ResultStage170ShuffleMapStagevs.ResultStage171大綱172設(shè)計(jì)思想體系架構(gòu)工作原理Stage劃分Stage內(nèi)部數(shù)據(jù)傳輸Stage之間數(shù)據(jù)傳輸應(yīng)用與作業(yè)容錯(cuò)機(jī)制編程示例Stage內(nèi)部的特點(diǎn)173所有依賴關(guān)系都是窄依賴,可以實(shí)現(xiàn)pipeline方式進(jìn)行數(shù)據(jù)傳輸流水線(Pipeline)方式174SparkPipelinevs.MapReduceShuffle175與MapReduce中Shuffle方式不同,流水線方式不要求物化前序算子的所有計(jì)算結(jié)果分區(qū)7通過(guò)map操作生成的分區(qū)9,并不需要物化分區(qū)9,而且可以不用等待分區(qū)8到分區(qū)10這個(gè)map操作的計(jì)算結(jié)束,繼續(xù)進(jìn)行union操作,得到分區(qū)13如果采用MapReduce中的Shuffle方式,那么意味著分區(qū)7、8經(jīng)map計(jì)算得到分區(qū)9、10并將這兩個(gè)分區(qū)進(jìn)行物化之后,才可以進(jìn)行unionShuffleMapTaskvs.ResultTask176大綱177設(shè)計(jì)思想體系架構(gòu)工作原理Stage劃分Stage內(nèi)部數(shù)據(jù)傳輸Stage之間數(shù)據(jù)傳輸應(yīng)用與作業(yè)容錯(cuò)機(jī)制編程示例Stage之間的特點(diǎn)178所有依賴關(guān)系都是寬依賴,不可以實(shí)現(xiàn)pipeline方式進(jìn)行數(shù)據(jù)傳輸,只能ShuffleStage之間的Shuffle179Stage之間的數(shù)據(jù)傳輸需要進(jìn)行Shuffle,該過(guò)程與MapReduce中的Shuffle類似Shuffle過(guò)程可能發(fā)生在兩個(gè)ShuffleMapStage之間,或者ShuffleMapStage與ResultStage之間從Task的層面來(lái)看,該過(guò)程表現(xiàn)為兩組ShuffleMapTask之間,或一組ShuffleMapTask與一組ResultTask之間的數(shù)據(jù)傳輸SparkShuffle180ShuffleWritevs.ShuffleRead181在ShuffleWrite階段,ShuffleMapTask需要將輸出RDD的記錄按照partition函數(shù)劃分到相應(yīng)的bucket當(dāng)中并物化到本地磁盤形成ShuffleblockFile,之后才可以在ShuffleRead階段被拉取在ShuffleRead階段,ShuffleMapTask或ResultTask根據(jù)partiton函數(shù)讀取相應(yīng)的ShuffleblockFile,存入buffer并進(jìn)行繼續(xù)后續(xù)的計(jì)算Shufflevs.Pipeline182Stage之間:ShuffleStage內(nèi)部:PipelinePipelineShuffleShufflePipeline大綱183設(shè)計(jì)思想體系架構(gòu)工作原理Stage劃分Stage內(nèi)部數(shù)據(jù)傳輸Stage之間數(shù)據(jù)傳輸應(yīng)用與作業(yè)容錯(cuò)機(jī)制編程示例應(yīng)用與作業(yè)Application:用戶編寫的Spark應(yīng)用程序Job:一個(gè)Job包含多個(gè)RDD及作用于相應(yīng)RDD轉(zhuǎn)換操作,其中最后一個(gè)為actionMapReducevs.SparkMapReduce中一個(gè)應(yīng)用就是一個(gè)作業(yè)Spark中的一個(gè)應(yīng)用可以由多個(gè)作業(yè)來(lái)構(gòu)成184名稱MapReduceSpark應(yīng)用JobApplication作業(yè)Job/DAG作業(yè)與任務(wù)Stage:一個(gè)Job會(huì)分為多組Task,每組Task被稱為Stage,或者也被稱為TaskSetJob的基本調(diào)度單位代表了一組關(guān)聯(lián)的、相互之間沒(méi)有Shuffle依賴關(guān)系的任務(wù)組成的任務(wù)集Task:運(yùn)行在Executor上的工作單元185應(yīng)用、作業(yè)與任務(wù)邏輯執(zhí)行角度一個(gè)Application=一個(gè)或多個(gè)DAG一個(gè)DAG=一個(gè)或多個(gè)Stage一個(gè)Stage=若干窄依賴的RDD操作物理執(zhí)行角度一個(gè)Application=一個(gè)或多個(gè)Job一個(gè)Job=一個(gè)或多個(gè)TaskSet一個(gè)TaskSet=多個(gè)沒(méi)有Shuffle關(guān)系的Task186Job=DAGStage=TaskSet邏輯概念與物理概念邏輯物理ApplicationDAGJobStageTaskSetOperatorTask187大綱188設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例故障類型Master故障:ZooKeeper配置多個(gè)MasterWorker故障Executor故障Driver故障:重啟189大綱190設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制RDD持久化故障恢復(fù)檢查點(diǎn)編程示例RDD存儲(chǔ)機(jī)制191由于計(jì)算過(guò)程中會(huì)不斷地產(chǎn)生新的RDD,所以系統(tǒng)不能將所有的RDD都存在內(nèi)存一旦達(dá)到相應(yīng)存儲(chǔ)空間的閾值,Spark會(huì)使用置換算法(例如,LRU)將部分RDD的內(nèi)存空間騰出如果不做任何聲明,這些RDD會(huì)被直接丟棄。但是,某些RDD在后續(xù)很可能會(huì)被再次使用RDD存儲(chǔ)機(jī)制RDD提供的持久化接口persist(StorageLevel)接受StorageLevel類型參數(shù),可配置各種級(jí)別持久化后的RDD將會(huì)保留在工作節(jié)點(diǎn)的中,可重復(fù)使用cache():緩存相當(dāng)于persist(MEMORY_ONLY)192StorageLevelMEMORY_ONLYMEMORY_AND_DISKMEMORY_ONLY_SERMEMORY_AND_DISK_SERDISK_ONLYMEMORY_ONLY_2,MEMORY_AND_DISK_2193StorageLevelMEMORY_ONLY:在JVM中緩存Java的對(duì)象。如果內(nèi)存不足,直接丟棄某些partitionMEMORY_AND_DISK:在JVM中緩存Java的對(duì)象。如果內(nèi)存不足,則將某些partitions寫入到磁盤中MEMORY_ONLY_SER:在內(nèi)存為每個(gè)partition存儲(chǔ)一個(gè)byte數(shù)組,數(shù)組內(nèi)容為當(dāng)前partition中Java對(duì)象的序列化結(jié)果194StorageLevelMEMORY_AND_DISK_SER:與MEMORY_AND_DISK類似,但每個(gè)分區(qū)存儲(chǔ)的是Java對(duì)象序列化后組成的byte數(shù)組DISK_ONLY:將每個(gè)分區(qū)的數(shù)據(jù)序列化到磁盤中195StorageLevel196MEMORY_ONLY_2:與MEMORY_ONLY相同,但是每個(gè)分區(qū)備份到兩臺(tái)機(jī)器上MEMORY_AND_DISK_2:與MEMORY_AND_DISK相同,但是每個(gè)分區(qū)備份到兩臺(tái)機(jī)器上大綱197設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制RDD持久化故障恢復(fù)檢查點(diǎn)編程示例Lineage機(jī)制RDDdependencies198Lineage機(jī)制紅色部分丟失,需重算紫色部分199基于RDDLineage恢復(fù)利用RDDLineage的故障恢復(fù)重新計(jì)算丟失分區(qū)重算過(guò)程在不同節(jié)點(diǎn)之間可以并行與數(shù)據(jù)庫(kù)恢復(fù)的比較RDDLineage:記錄粗粒度的操作數(shù)據(jù)復(fù)制或者日志:記錄細(xì)粒度的操作200大綱201設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制RDD持久化故障恢復(fù)檢查點(diǎn)編程示例檢查點(diǎn)機(jī)制前述機(jī)制的不足之處Lineage可能非常長(zhǎng)RDD持久化機(jī)制保存到集群內(nèi)機(jī)器的磁盤,并不完全可靠檢查點(diǎn)機(jī)制將RDD寫入外部可靠的(本身具有容錯(cuò)機(jī)制)分布式文件系統(tǒng),例如HDFS在實(shí)現(xiàn)層面,寫檢查點(diǎn)的過(guò)程是一個(gè)獨(dú)立的作業(yè),在用戶作業(yè)結(jié)束后運(yùn)行202大綱203設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例SparkAPI204SparkCoreSparkSQLDatasetDataFrameRDDStructuredStreamingSparkStreamingDStreamMllib(DataFrame-based)GraphX(DataFrame-based)MLlib(RDD-based)GraphX(RDD-based)SQLRDDAPI編程框架205大綱206設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例詞頻統(tǒng)計(jì)關(guān)系表自然連接及其優(yōu)化網(wǎng)頁(yè)鏈接排名K均值聚類檢查點(diǎn)詞頻統(tǒng)計(jì)207輸入:一個(gè)包含大量單詞的文本文件輸出:文件中每個(gè)單詞及其出現(xiàn)次數(shù)(頻數(shù))每個(gè)單詞和其頻數(shù)占一行,單詞和頻數(shù)之間有間隔輸入輸出AnAnMyMeAnAnMyHeMyMyAnMyAn5He1Me1My5轉(zhuǎn)換操作算子回顧208map、flatMap、groupByKey“Hadoopisgood”“Sparkisfast”“Sparkisbetter”{“Hadoop”,“is”,“good”}{“Spark”,“is”,“fast”}{“Spark”,“is”,“better”}map(line=>line.split(“”))“Hadoopisgood”“Sparkisfast”“Sparkisbetter”“Hadoop”“is”“good”“Spark”“is”“fast”“Spark”“is”“better”flatMap(line=>line.split(“”))[“Hadoop”,1][“is”,1][“good”,1][“Spark”,1][“is”,1][“fast”,1][“Spark”,1][“is”,1][“better”,1]groupByKey()[“Hadoop”,1][“is”,{1,1,1}][“good”,1][“Spark”,{1,1}][“fast”,1][“better”,1]解決方案209flatMap將每行文本按分隔符拆分成單詞map將每個(gè)單詞映射為[單詞,1]鍵值對(duì)groupByKey將[單詞,1]鍵值對(duì)按單詞進(jìn)行分組map對(duì)組內(nèi)每個(gè)單詞的頻數(shù)進(jìn)行求和RDDLineage210Spark提供比MapReduce更豐富的算子,更易于編程。代碼211優(yōu)化方案212減少Shuffle數(shù)據(jù)量(a)優(yōu)化前(b)優(yōu)化后優(yōu)化方案的代碼213MapReduce要求用戶自定義combine方法,而Spark內(nèi)置combine機(jī)制,簡(jiǎn)化了用戶編程。大綱214設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例詞頻統(tǒng)計(jì)關(guān)系表自然連接及其優(yōu)化網(wǎng)頁(yè)鏈接排名K均值聚類檢查點(diǎn)關(guān)系表的自然連接215輸入:兩個(gè)CSV文件,分別保存雇員表和部門表輸出:雇員表和部門表的自然連接結(jié)果雇員表部門表雇員表?部門表轉(zhuǎn)換操作算子回顧216cogroupcogroup[1,“Spark”][2,“MapReduce”][1,10][1,20][2,30][2,40][1,{{“Spark”},{10,20}}][2,{{“MapReduce”},{30,40}}]解決方案217map將來(lái)自部門表的每個(gè)元組映射為[DeptName,

Manager]鍵值對(duì)將來(lái)自雇員表的每個(gè)元組映射為[DeptName,

Name

EmpId]鍵值對(duì)cogroup將雇員表和部門表按DeptName進(jìn)行協(xié)同分組flatMap對(duì)組內(nèi)具有相同DeptName的元組兩兩組合,得到自然連接結(jié)果RDDLineage218MapReduce要求用戶顯示標(biāo)記每個(gè)元組來(lái)自哪張表,而Spark可由2個(gè)算子分別處理2張關(guān)系表得到不同的RDD。代碼219

220假如部門表比較小,雇員表非常大雇員表的Shuffle開銷非常大優(yōu)化方案將“小表”廣播出去,避免“大表”進(jìn)行Shuffle小表大表小表大表(a)Shuffle(b)廣播優(yōu)化方案的代碼221大綱222設(shè)計(jì)思想體系架構(gòu)工作原理容錯(cuò)機(jī)制編程示例詞頻統(tǒng)計(jì)關(guān)系表自然連接及其優(yōu)化網(wǎng)頁(yè)鏈接排名K均值聚類檢查點(diǎn)網(wǎng)頁(yè)鏈接排名223輸入:保存在文本文件中,一行為一項(xiàng)網(wǎng)頁(yè)信息網(wǎng)頁(yè)信息:(網(wǎng)頁(yè)名網(wǎng)頁(yè)排名值(出站鏈接出站鏈接的權(quán)重...))輸出:網(wǎng)頁(yè)名及其排名值輸入輸出A1.0B1.0D1.0B

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論