版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
廈門大學(xué)數(shù)據(jù)庫(kù)實(shí)驗(yàn)室
MapReduce連接報(bào)告人:李雨倩導(dǎo)師:林子雨
2014.07.12連接簡(jiǎn)介MapReduce連接策略連接連接是關(guān)系運(yùn)算,可以用于合并關(guān)系。在數(shù)據(jù)庫(kù)中,一般是表連接操作;在MapReduce中,連接可以用于合并兩個(gè)或多個(gè)數(shù)據(jù)集。例如,用戶基本信息和用戶活動(dòng)詳情。用戶基本信息來(lái)自于OLTP數(shù)據(jù)庫(kù)。用戶活動(dòng)詳情來(lái)自于日志文件。連接的類型最常用的兩個(gè)連接類型是內(nèi)連接和外連接。內(nèi)連接比較兩個(gè)關(guān)系中所有的數(shù)組,然后生成一個(gè)滿足連接條件的結(jié)果集。外連接外連接并不需要兩個(gè)關(guān)系的數(shù)組都滿足連接條件。在連接條件不滿足的時(shí)候,外連接可以將一方的數(shù)據(jù)保留在結(jié)果集中。內(nèi)連接左外連接右外連接全連接連接關(guān)系圖連接實(shí)例連接簡(jiǎn)介MapReduce連接策略連接連接是關(guān)系運(yùn)算,可以用于合并關(guān)系。在數(shù)據(jù)庫(kù)中,一般是表連接操作;在MapReduce中,連接可以用于合并兩個(gè)或多個(gè)數(shù)據(jù)集。例如,用戶基本信息和用戶活動(dòng)詳情。用戶基本信息來(lái)自于OLTP數(shù)據(jù)庫(kù)。用戶活動(dòng)詳情來(lái)自于日志文件。MapReduce的連接welcometousethesePowerPointtemplates,NewContentdesign,10yearsexperienceMapReduce連接的應(yīng)用場(chǎng)景用戶的人口統(tǒng)計(jì)信息的聚合操作(例如:青少年和中年人的習(xí)慣差異)當(dāng)用戶超過(guò)一定時(shí)間沒(méi)有使用網(wǎng)站后,發(fā)郵件提醒他們。分析用戶的瀏覽習(xí)慣,讓系統(tǒng)可以提示用戶有哪些網(wǎng)站特性還沒(méi)有使用到,形成一個(gè)反饋循環(huán)。MapReduce中的連接策略重分區(qū)連接復(fù)制連接半連接——reduce端連接。使用場(chǎng)景:連接兩個(gè)或多個(gè)大型數(shù)據(jù)集?!猰ap端連接。使用場(chǎng)景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集小到可以完全放在緩存中。——map端連接。使用場(chǎng)景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集非常大,但同時(shí)這個(gè)數(shù)據(jù)集可以被過(guò)濾成小到可以放在緩存中。重分區(qū)連接重分區(qū)連接利用MapReduce的排序-合并機(jī)制來(lái)分組數(shù)據(jù)。它被實(shí)現(xiàn)為使用一個(gè)單獨(dú)的MapReduce任務(wù),并支持多路連接(這里的多路指的是多個(gè)數(shù)據(jù)集)。Map階段負(fù)責(zé)從多個(gè)數(shù)據(jù)集中讀取數(shù)據(jù),決定每個(gè)數(shù)據(jù)的連接值,將連接值作為輸出鍵。輸出值則包含將在reduce階段被合并的值。Reduce階段,一個(gè)reducer接收map函數(shù)傳來(lái)的一個(gè)輸出鍵的所有輸出值,并將數(shù)據(jù)分為多個(gè)分區(qū)。在此之后,reducer對(duì)所有的分區(qū)進(jìn)行笛卡爾積連接運(yùn)算,并生成全部的結(jié)果集。在如下示例中,用戶數(shù)據(jù)中有用戶姓名,年齡和所在州$cattest-data/ch4/users.txt
anne22NY
joe39CO
alison35NY
mike69VA
marie27OR
jim21OR
bob71CA
mary53NY
dave36VA
dude50CA用戶活動(dòng)日志中有用戶姓名,進(jìn)行的動(dòng)作,來(lái)源IP。這個(gè)文件一般都要比用戶數(shù)據(jù)要大得多。$cattest-data/ch4/user-logs.txt
jimlogout93.24.237.12
mikenew_tweet87.124.79.252
bobnew_tweet58.133.120.100
mikelogout55.237.104.36
jimnew_tweet93.24.237.12
marieview_user122.158.130.90$hadoopfs-puttest-data/ch4/user-logs.txtuser-logs.txt$bin/run.shcom.manning.hip.ch4.joins.improved.SampleMainusers.txt,user-logs.txtoutput$hadoopfs-catoutput/part*
bob71CAnew_tweet58.133.120.100
jim21ORlogout93.24.237.12
jim21ORnew_tweet93.24.237.12
jim21ORlogin198.184.237.49
marie27ORlogin58.133.120.100
marie27ORview_user122.158.130.90
mike69VAnew_tweet87.124.79.252
mike69VAlogout55.237.104.36重分區(qū)連接過(guò)濾(
filter)指的是將map極端的輸入數(shù)據(jù)中不需要的部分丟棄。投影(
project)是關(guān)系代數(shù)的概念。投影用于減少發(fā)送給reducer的字段。優(yōu)化重分區(qū)連接傳統(tǒng)重分區(qū)方法的實(shí)現(xiàn)空間效率低下。它需要將連接的所有的輸出值都讀取到內(nèi)存中,然后進(jìn)行多路連接。事實(shí)上,如果僅僅將小數(shù)據(jù)集讀取到內(nèi)存中,然后用小數(shù)據(jù)集來(lái)遍歷大數(shù)據(jù)集,進(jìn)行連接,這樣將更加高效。下圖是優(yōu)化后的重分區(qū)連接的流程圖。Map輸出的組合鍵和組合值上圖說(shuō)明了map輸出的組合鍵和組合值。二次排序?qū)?huì)根據(jù)連接鍵(joinkey)進(jìn)行分區(qū),但會(huì)用整個(gè)組合鍵來(lái)進(jìn)行排序。組合鍵包括一個(gè)標(biāo)識(shí)源數(shù)據(jù)集(較大或較?。┑恼沃?,因此可以根據(jù)這個(gè)整形值來(lái)保證較小源數(shù)據(jù)集的值先于較大源數(shù)據(jù)的值被reducer接收。優(yōu)化重分區(qū)連接上圖是實(shí)現(xiàn)的類圖。類圖中包含兩個(gè)部分,一個(gè)通用框架和一些類的實(shí)現(xiàn)樣例。使用這個(gè)連接框架需要實(shí)現(xiàn)抽象類OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。OptimizedDataJoinMapperBaseprotectedabstractTextgenerateInputTag(StringinputFile);protectedabstractbooleanisInputSmaller(StringinputFile);publicvoidconfigure(JobConfjob){this.inputFile=job.get("map.input.file");this.inputTag=generateInputTag(this.inputFile);if(isInputSmaller(this.inputFile)){smaller=newBooleanWritable(true);outputKey.setOrder(0);}else{smaller=newBooleanWritable(false);outputKey.setOrder(1);}}這個(gè)類的作用是辨認(rèn)出較小的數(shù)據(jù)集,并生成輸出鍵和輸出值。Configure方法在mapper創(chuàng)建期調(diào)用。Configure方法的作用之一是標(biāo)識(shí)每一個(gè)數(shù)據(jù)集,讓reducer可以區(qū)分?jǐn)?shù)據(jù)的源數(shù)據(jù)集。另一個(gè)作用是辨認(rèn)當(dāng)前的輸入數(shù)據(jù)是否是較小的數(shù)據(jù)集。OptimizedDataJoinMapperBase(續(xù))protectedabstractOptimizedTaggedMapOutputgenerateTaggedMapOutput(Objectvalue);protectedabstractStringgenerateGroupKey(Objectkey,OptimizedTaggedMapOutputaRecord);publicvoidmap(Objectkey,Objectvalue,OutputCollectoroutput,Reporterreporter)throwsIOException{OptimizedTaggedMapOutputaRecord=generateTaggedMapOutput(value);if(aRecord==null){return;}aRecord.setSmaller(smaller);StringgroupKey=generateGroupKey(aRecord);if(groupKey==null){return;}outputKey.setKey(groupKey);output.collect(outputKey,aRecord);}
Map方法首先調(diào)用自定義的方法(generateTaggedMapOutput)來(lái)生成OutputValue對(duì)象。這個(gè)對(duì)象包含了在連接中需要使用的值,和一個(gè)標(biāo)識(shí)較大或較小數(shù)據(jù)集的布爾值。如果map方法可以調(diào)用自定義的方法(generateGroupKey)來(lái)得到可以在連接中使用的鍵,那么這個(gè)鍵就作為map的輸出鍵。OptimizedDataJoinReducerBasepublicvoidreduce(Objectkey,Iteratorvalues,OutputCollectoroutput,Reporterreporter)throwsIOException{CompositeKeyk=(CompositeKey)key;Listsmaller=newArrayList();while(values.hasNext()){Objectvalue=values.next();OptimizedTaggedMapOutputcloned=((OptimizedTaggedMapOutput)value).clone(job);if(cloned.isSmaller().get()){smaller.add(cloned);}else{joinAndCollect(k,smaller,cloned,output,reporter);}}}Map端處理后已經(jīng)可以保證較小源數(shù)據(jù)集的值將會(huì)先于較大源數(shù)據(jù)集的值被接收。這里就可以將所有的較小源數(shù)據(jù)集的值放到緩存中。在開始接收較大源數(shù)據(jù)集的值的時(shí)候,就開始和緩存中的值做連接操作。OptimizedDataJoinRuducerBase(續(xù))protectedabstractOptimizedTaggedMapOutputcombine(Stringkey,OptimizedTaggedMapOutputvalue1,OptimizedTaggedMapOutputvalue2);privatevoidjoinAndCollect(CompositeKeykey,Listsmaller,OptimizedTaggedMapOutputvalue,OutputCollectoroutput,Reporterreporter)throwsIOException{if(smaller.size()<1){OptimizedTaggedMapOutputcombined=combine(key.getKey(),null,value);collect(key,combined,output,reporter);}else{for(OptimizedTaggedMapOutputsmall:smaller){OptimizedTaggedMapOutputcombined=combine(key.getKey(),small,value);collect(key,combined,output,reporter);}}}方法joinAndCollect包含了兩個(gè)數(shù)據(jù)集的值,并輸出它們。優(yōu)化重分區(qū)連接實(shí)例例如,需要連接用戶詳情數(shù)據(jù)和用戶活動(dòng)日志。第一步,判斷兩個(gè)數(shù)據(jù)集中哪一個(gè)比較小。對(duì)于一般的網(wǎng)站來(lái)說(shuō),用戶詳情數(shù)據(jù)會(huì)比較小,用戶活動(dòng)日志會(huì)比較大。首先,實(shí)現(xiàn)抽象類OptimizedDataJoinMapperBase。這個(gè)將在map端被調(diào)用。這個(gè)類將創(chuàng)建map的輸出鍵和輸出值。同時(shí),它還將提示整個(gè)框架,當(dāng)前處理的文件是不是比較小的那個(gè)。Map端實(shí)現(xiàn)代碼publicclassSampleMapextendsOptimizedDataJoinMapperBase{privatebooleansmaller;@OverrideprotectedTextgenerateInputTag(StringinputFile){//tagtherowwithinputfilename(datasource)smaller=inputFile.contains("users.txt");returnnewText(inputFile);}@OverrideprotectedStringgenGroupKey(Objectkey,OutputValueoutput){returnkey.toString();}@OverrideprotectedbooleanisInputSmaller(StringinputFile){returnsmaller;}@OverrideprotectedOutputValuegenMapOutputValue(Objecto){returnnewTextTaggedOutputValue((Text)o);}}Reduce端實(shí)現(xiàn)代碼第二步,你需要實(shí)現(xiàn)抽象類OptimizedDataJoinReducerBase。它將在reduce端被調(diào)用。在這個(gè)類中,將從map端傳入不同數(shù)據(jù)集的輸出鍵和輸出值,然后返回reduce的輸出數(shù)組。publicclassSampleReduceextendsOptimizedDataJoinReducerBase{privateTextTaggedOutputValueoutput=newTextTaggedOutputValue();privateTexttextOutput=newText();@OverrideprotectedOutputValuecombine(Stringkey,OutputValuesmallValue,OutputValuelargeValue){if(smallValue==null||largeValue==null){returnnull;}Object[]values={smallValue.getData(),largeValue.getData()};textOutput.set(StringUtils.join(values,"\t"));output.setData(textOutput);returnoutput;}任務(wù)的主代碼最后,任務(wù)的主代碼需要指明InputFormat類,并設(shè)置二次排序。job.setInputFormat(KeyValueTextInputFormat.class);job.setMapOutputKeyClass(CompositeKey.class);job.setMapOutputValueClass(TextTaggedOutputValue.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setPartitionerClass(CompositeKeyPartitioner.class);job.setOutputKeyComparatorClass(CompositeKeyComparator.class);job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);MapReduce中的連接策略重分區(qū)連接復(fù)制連接半連接——reduce端連接。使用場(chǎng)景:連接兩個(gè)或多個(gè)大型數(shù)據(jù)集?!猰ap端連接。使用場(chǎng)景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集小到可以完全放在緩存中?!猰ap端連接。使用場(chǎng)景:待連接的數(shù)據(jù)集中有一個(gè)數(shù)據(jù)集非常大,但同時(shí)這個(gè)數(shù)據(jù)集可以被過(guò)濾成小到可以放在緩存中。復(fù)制連接復(fù)制連接得名于它的具體實(shí)現(xiàn):連接中最小的數(shù)據(jù)集將會(huì)被復(fù)制到所有的map主機(jī)節(jié)點(diǎn)。復(fù)制連接有一個(gè)假設(shè)前提:在被連接的數(shù)據(jù)集中,有一個(gè)數(shù)據(jù)集足夠小到可以緩存在內(nèi)存中。MapReduce的復(fù)制連接的工作原理如下:使用分布式緩存將這個(gè)小數(shù)據(jù)集復(fù)制到所有運(yùn)行map任務(wù)的節(jié)點(diǎn)。用各個(gè)map任務(wù)初始化方法將這個(gè)小數(shù)據(jù)集裝載到一個(gè)哈希表中。逐條用大數(shù)據(jù)集中的記錄遍歷這個(gè)哈希表,逐個(gè)判斷是否符合連接條件輸出符合連接條件的結(jié)果。復(fù)制連接一個(gè)復(fù)制連接通用框架該復(fù)制連接框架可以支持任意類型的數(shù)據(jù)集。這個(gè)框架中同樣提供了一個(gè)優(yōu)化的小功能:動(dòng)態(tài)監(jiān)測(cè)分布式緩存內(nèi)容和輸入塊的大小,并判斷哪個(gè)更大。如果輸入塊較小,那么就需要將map的輸入塊放到內(nèi)存緩沖中,然后在mapper的cleanup方法中執(zhí)行連接操作。下圖為該框架的類圖。并且提供了連接類(GenericReplicatedJoin)的具體實(shí)現(xiàn),假設(shè)前提:每個(gè)數(shù)據(jù)文件的第一個(gè)標(biāo)記是連接鍵。連接框架的算法Mapper的setup方法判斷在map的輸入塊和分布式緩存的內(nèi)容中哪個(gè)大。如果分布式緩存的內(nèi)容比較小,那么它將被裝載到內(nèi)存緩存中。Map函數(shù)開始連接操作。如果輸入塊比較小,map函數(shù)將輸入塊的鍵\值對(duì)裝載到內(nèi)存緩存中。Map的cleanup方法將從分布式緩存中讀取記錄,逐條記錄和在內(nèi)存緩存中的鍵\值對(duì)進(jìn)行連接操作。。GenericReplicatedJoin以下代碼為GenericReplicatedJoin中的setup方法,它是在map的初始化階段調(diào)用的。這個(gè)方法判斷分布式緩存中的文件和輸入塊哪個(gè)大。如果文件比較小,則將文件裝載到HashMap中。protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{distributedCacheFiles=DistributedCache.getLocalCacheFiles(context.getConfiguration());intdistCacheSizes=0;for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());distCacheSizes+=distributedCacheFile.length();}if(context.getInputSplit()instanceofFileSplit){FileSplitsplit=(FileSplit)context.getInputSplit();longinputSplitSize=split.getLength();distributedCacheIsSmaller=(distCacheSizes<inputSplitSize);}else{distributedCacheIsSmaller=true;}if(distributedCacheIsSmaller){for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());DistributedCacheFileReaderreader=getDistributedCacheReader();reader.init(distributedCacheFile);for(Pairp:(Iterable<Pair>)reader){addToCache(p);}reader.close();}}}GenericReplicatedJoin(續(xù))以下代碼為GenericReplicatedJoin中的Map方法。它將會(huì)根據(jù)setup方法是否將了分布式緩存的內(nèi)容裝載到內(nèi)存的緩存中來(lái)選擇行為。如果分布式緩存的內(nèi)容被裝載到內(nèi)存中,那么map方法就將輸入塊的記錄和內(nèi)存中的緩存做連接操作。如果分布式緩存的內(nèi)容沒(méi)有被裝載到內(nèi)存中,那么map方法就將輸入塊的記錄裝載到內(nèi)存中,然后在cleanup方法中使用。protectedvoidmap(Objectkey,Objectvalue,Contextcontext)throwsIOException,InterruptedException{Pairpair=readFromInputFormat(key,value);if(distributedCacheIsSmaller){joinAndCollect(pair,context);}else{addToCache(pair);}}publicvoidjoinAndCollect(Pairp,Contextcontext)throwsIOException,InterruptedException{List<Pair>cached=cachedRecords.get(p.getKey());if(cached!=null){for(Paircp:cached){Pairresult;if(distributedCacheIsSmaller){result=join(p,cp);}else{result=join(cp,p);}if(result!=null){context.write(result.getKey(),result.getData());}}}}publicPairjoin(PairinputSplitPair,PairdistCachePair){StringBuildersb=newStringBuilder();if(inputSplitPair.getData()!=null){sb.append(inputSplitPair.getData());}sb.append("\t");if(distCachePair.getData()!=null){sb.append(distCachePair.getData());}returnnewPair<Text,Text>(newText(inputSplitPair.getKey().toString()),newText(sb.toString()));}GenericReplicatedJoin(續(xù))當(dāng)所有的記錄都被傳輸給map方法,MapReduce將會(huì)調(diào)用cleanup方法。如果分布式緩存的內(nèi)容比輸入塊大,連接將會(huì)在cleanup中進(jìn)行。連接的對(duì)象是map函數(shù)的緩存中的輸入塊的記錄和分布式緩存中的記錄。protectedvoidcleanup(Contextcontext)throwsIOException,InterruptedException{if(!distributedCacheIsSmaller){for(PathdistFile:distributedCacheFiles){FiledistributedCacheFile=newFile(distFile.toString());DistributedCacheFileReaderreader
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年江蘇省鎮(zhèn)江市中考語(yǔ)文真題卷含答案解析
- 幼兒園保育工作計(jì)劃總結(jié)
- 2025年楚雄市高壓電工證理論考試練習(xí)題含答案
- 安環(huán)部員工2025年度工作總結(jié)模版
- 小學(xué)六年級(jí)語(yǔ)文教師教學(xué)工作總結(jié)
- 腳手架工程量計(jì)算方法
- 2025年市場(chǎng)監(jiān)督管理局業(yè)務(wù)考試復(fù)習(xí)題集及答案解析
- 花卉栽培試題庫(kù)及答案
- 2025年社區(qū)公共衛(wèi)生服務(wù)培訓(xùn)試題集含答案
- 電工三級(jí)(高級(jí)工)試題含答案
- 2025年大學(xué)大一(法學(xué))法理學(xué)試題及答案
- 膽囊癌課件教學(xué)課件
- 廣西2025年高等職業(yè)教育考試全區(qū)模擬測(cè)試 能源動(dòng)力與材料 大類試題及逐題答案解說(shuō)
- 2026江蘇省公務(wù)員考試公安機(jī)關(guān)公務(wù)員(人民警察)歷年真題匯編附答案解析
- 孕婦貧血教學(xué)課件
- 超市冷庫(kù)應(yīng)急預(yù)案(3篇)
- 5年(2021-2025)山東高考生物真題分類匯編:專題17 基因工程(解析版)
- 2025年10月自考00610高級(jí)日語(yǔ)(二)試題及答案
- 新華資產(chǎn)招聘筆試題庫(kù)2025
- 2025年中國(guó)潛孔鉆機(jī)行業(yè)細(xì)分市場(chǎng)研究及重點(diǎn)企業(yè)深度調(diào)查分析報(bào)告
- 食品經(jīng)營(yíng)場(chǎng)所及設(shè)施設(shè)備清洗消毒和維修保養(yǎng)制度
評(píng)論
0/150
提交評(píng)論