Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用教程 課件 項(xiàng)目四 Mapreduce分布式計(jì)算編程實(shí)戰(zhàn)_第1頁(yè)
Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用教程 課件 項(xiàng)目四 Mapreduce分布式計(jì)算編程實(shí)戰(zhàn)_第2頁(yè)
Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用教程 課件 項(xiàng)目四 Mapreduce分布式計(jì)算編程實(shí)戰(zhàn)_第3頁(yè)
Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用教程 課件 項(xiàng)目四 Mapreduce分布式計(jì)算編程實(shí)戰(zhàn)_第4頁(yè)
Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用教程 課件 項(xiàng)目四 Mapreduce分布式計(jì)算編程實(shí)戰(zhàn)_第5頁(yè)
已閱讀5頁(yè),還剩115頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

項(xiàng)目四Mapreduce分布式計(jì)算編程實(shí)戰(zhàn)CONTENTS目錄01

項(xiàng)目導(dǎo)讀02

知識(shí)目標(biāo)03

技能目標(biāo)04

素質(zhì)(思政)目標(biāo)05

任務(wù)一MapReduce基本案例-單詞統(tǒng)計(jì)CONTENTS目錄06

任務(wù)二MapReduce基本案例-數(shù)據(jù)去重07

任務(wù)三MapReduce基本案例-求平均成績(jī)08

項(xiàng)目總結(jié)09

項(xiàng)目考核項(xiàng)目導(dǎo)讀01項(xiàng)目導(dǎo)讀MapReduce分布式計(jì)算實(shí)戰(zhàn)MapReduce是一個(gè)軟件框架,基于該框架能夠容易地編寫應(yīng)用程序,這些應(yīng)用程序能夠運(yùn)行在由上千個(gè)商用機(jī)器組成的大集群上,并以一種可靠的,具有容錯(cuò)能力的方式并行地處理上TB級(jí)別的海量數(shù)據(jù)集。目前很多公司還都在用這個(gè)計(jì)算引擎,后續(xù)要講的Hive原生支持的計(jì)算引擎也是MapReduce,故本章以3個(gè)經(jīng)典案例來講解Mapreduce分布式計(jì)算編程實(shí)戰(zhàn)。MapReduce采用”分而治之”的思想,把對(duì)大規(guī)模數(shù)據(jù)集的操作,分發(fā)給一個(gè)主節(jié)點(diǎn)管理下的各個(gè)分節(jié)點(diǎn)共同完成,然后通過整合各個(gè)節(jié)點(diǎn)的中間結(jié)果,得到最終結(jié)果。簡(jiǎn)單地說,MapReduce就是“任務(wù)的分解與結(jié)果的匯總”知識(shí)目標(biāo)02知識(shí)目標(biāo)

了解MapReduce優(yōu)缺點(diǎn)

熟悉MapReduce計(jì)算模型

熟悉shuffle

熟悉MapReduce常用編程組件技能目標(biāo)03技能目標(biāo)

掌握Mapper類的編寫

掌握Reducer類的編寫

掌握使用MapReduce解決實(shí)際問題的邏輯思路素質(zhì)(思政)目標(biāo)04素質(zhì)(思政)目標(biāo)

培養(yǎng)嚴(yán)謹(jǐn)細(xì)致的工匠精神

厚植技術(shù)報(bào)國(guó)夢(mèng)

培養(yǎng)邏輯思維能力

培養(yǎng)學(xué)以致用解決問題的能力任務(wù)一MapReduce基本案例-單詞統(tǒng)計(jì)05任務(wù)工單

任務(wù)場(chǎng)景現(xiàn)在需要統(tǒng)計(jì)一系列文本文件中每個(gè)單詞出現(xiàn)的次數(shù),使用Mapreduce編程完成

任務(wù)準(zhǔn)備全班學(xué)生以4人左右為一組,各組選出組長(zhǎng)。請(qǐng)組長(zhǎng)組織組員查找相關(guān)資料,進(jìn)行需求分析和系統(tǒng)設(shè)計(jì)問題1:描述單詞統(tǒng)計(jì)程序的設(shè)計(jì)思路問題2:javamain函數(shù)里面的args參數(shù)分別有什么含義必備知識(shí)技能:一、MapReduce概述MapReduce編程模型簡(jiǎn)介MapReduce是一個(gè)用于大規(guī)模數(shù)據(jù)處理的分布式計(jì)算模型,最初由Google工程師設(shè)計(jì)并實(shí)現(xiàn)的,Google已經(jīng)將完整的MapReduce論文公開發(fā)布了。其中的定義是,MapReduce是一個(gè)編程模型,是一個(gè)用于處理和生成大規(guī)模數(shù)據(jù)集的相關(guān)的實(shí)現(xiàn)。用戶定義一個(gè)map函數(shù)來處理一個(gè)Key-Value對(duì)以生成一批中間的Key-Value對(duì),再定義一個(gè)reduce函數(shù)將所有這些中間的有相同Key的Value合并起來。很多現(xiàn)實(shí)世界中的任務(wù)都可用這個(gè)模型來表達(dá)必備知識(shí)技能:二、MapReduce優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

可伸縮性:MapReduce可以處理大規(guī)模的數(shù)據(jù)集,通過將數(shù)據(jù)分割為多個(gè)小塊進(jìn)行并行處理,可以有效地利用集群的計(jì)算資源。它可以在需要處理更大數(shù)據(jù)集時(shí)進(jìn)行水平擴(kuò)展,而不需要對(duì)現(xiàn)有的代碼進(jìn)行修改

容錯(cuò)性:MapReduce具有高度的容錯(cuò)性。當(dāng)某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),作業(yè)可以自動(dòng)重新分配給其他可用的節(jié)點(diǎn)進(jìn)行處理,從而保證作業(yè)的完成

靈活性:MapReduce允許開發(fā)人員使用自定義的Mapper和Reducer來處理各種類型的數(shù)據(jù)和計(jì)算任務(wù)。它提供了靈活的編程模型,可以根據(jù)具體需求進(jìn)行定制和擴(kuò)展

易于使用:MapReduce提供了高級(jí)抽象,隱藏了底層的并行和分布式處理細(xì)節(jié)。開發(fā)人員只需要關(guān)注數(shù)據(jù)的轉(zhuǎn)換和計(jì)算邏輯,而不需要關(guān)心并發(fā)和分布式算法的實(shí)現(xiàn)細(xì)節(jié)必備知識(shí)技能:二、MapReduce優(yōu)缺點(diǎn)

缺點(diǎn)

適用性有限:MapReduce適用于一些需要進(jìn)行大規(guī)模數(shù)據(jù)處理和分析的場(chǎng)景,但對(duì)于一些需要實(shí)時(shí)計(jì)算和交互式查詢的場(chǎng)景,MapReduce的延遲較高,不太適合

復(fù)雜性:盡管MapReduce提供了高級(jí)抽象,但對(duì)于開發(fā)人員來說,編寫和調(diào)試MapReduce作業(yè)仍然是一項(xiàng)復(fù)雜的任務(wù)。需要熟悉MapReduce的編程模型和框架,并理解分布式計(jì)算的概念和原理

磁盤IO開銷:在MapReduce中,數(shù)據(jù)需要在Map和Reduce階段之間進(jìn)行磁盤IO,這可能會(huì)導(dǎo)致性能瓶頸。盡管可以通過合理的數(shù)據(jù)分區(qū)和調(diào)優(yōu)來減少磁盤IO的開銷,但仍然需要考慮和處理數(shù)據(jù)移動(dòng)和復(fù)制的開銷

綜上所述,MapReduce是一種適用于大規(guī)模數(shù)據(jù)處理的編程模型和計(jì)算框架,具有可伸縮性、容錯(cuò)性、靈活性和易用性等優(yōu)點(diǎn)。然而,它在實(shí)時(shí)計(jì)算和交互式查詢等場(chǎng)景下的適用性有限,同時(shí)開發(fā)和調(diào)試MapReduce作業(yè)的復(fù)雜性也需要考慮必備知識(shí)技能:三、MapReduce核心思想

MapReduce原理與應(yīng)用MapReduce的思想核心是“分而治之”,適用于大規(guī)模復(fù)雜的任務(wù)處理場(chǎng)景(大規(guī)模數(shù)據(jù)處理場(chǎng)景)。Map負(fù)責(zé)“分”,即把復(fù)雜的任務(wù)分解為若干個(gè)“簡(jiǎn)單的任務(wù)”來并行處理??梢赃M(jìn)行拆分的前提是這些小任務(wù)可以并行計(jì)算,彼此間沒有依賴關(guān)系。Reduce負(fù)責(zé)“合”,即對(duì)Map階段的結(jié)果進(jìn)行全局匯總必備知識(shí)技能:四、MapReduce計(jì)算模型

MapReduce計(jì)算模型解析我們知道MapReduce計(jì)算模型主要由三個(gè)階段構(gòu)成:Map、Shuffle、Reduce。Map是映射,負(fù)責(zé)數(shù)據(jù)的過濾和分區(qū),將原始數(shù)據(jù)轉(zhuǎn)化為鍵值對(duì);Reduce是合并,將具有相同Key值的Value進(jìn)行處理后再輸出新的鍵值對(duì)作為最終結(jié)果。為了讓Reduce可以并行處理Map的結(jié)果,必須對(duì)Map的輸出進(jìn)行一定的排序與分割,然后再交給對(duì)應(yīng)的Reduce,而這個(gè)將Map輸出進(jìn)行進(jìn)一步整理并交給Reduce的過程就是Shuffle任務(wù)實(shí)施:一、準(zhǔn)備數(shù)據(jù)步驟1準(zhǔn)備輸入數(shù)據(jù)。在Windows的D盤根目錄底下新建一個(gè)目錄,名為input。在input目錄底下新建一個(gè)名為words.txt的文件,內(nèi)容如下Helloworld單擊此處添加項(xiàng)正文hellohadoop單擊此處添加項(xiàng)正文hellobob單擊此處添加項(xiàng)正文hadooptom單擊此處添加項(xiàng)正文tomspark單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

步驟1編寫Mapper類。在src-java下面創(chuàng)建包c(diǎn)n.jscfa.wordcount,在該包下新建一個(gè)Java文件WordCountMapper.java,清空該文件并輸入如下代碼packagecn.jscfa.wordcountimportorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.LongWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Mapper任務(wù)實(shí)施:二、編寫代碼importjava.io.IOException

單擊此處添加項(xiàng)正文/**

單擊此處添加項(xiàng)正文Mapper類實(shí)現(xiàn)

*這里就是MapReduce程序Map階段業(yè)務(wù)邏輯實(shí)現(xiàn)的類Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>*KEYIN

表示mapper數(shù)據(jù)輸入時(shí)key的數(shù)據(jù)類型,在默認(rèn)讀取數(shù)據(jù)組件下,叫作InputFormat,它的行為是每行讀取待處理的數(shù)據(jù)KEYIN數(shù)據(jù)類型解釋

*讀取一行,就返回一行給MR程序,這種情況下KEYIN就表示每一行的起始偏移,因此數(shù)據(jù)類型是Long*VALUEIN

表示mapper數(shù)據(jù)輸入的時(shí)候value的數(shù)據(jù)類型,在默認(rèn)讀取數(shù)據(jù)組件下,VALUEIN就表示讀取的一行內(nèi)容,因此數(shù)據(jù)類型是String任務(wù)實(shí)施:二、編寫代碼01*KEYOUT表示Map階段數(shù)據(jù)輸出的時(shí)候key的數(shù)據(jù)類型,在本案例中輸出的key是單詞,因此數(shù)據(jù)類型是String02*VALUEOUT表示mapper階段數(shù)據(jù)輸出的時(shí)候value的數(shù)據(jù)類型,在本案例中輸出的value是單詞的次數(shù),因此數(shù)據(jù)類型是Integer03數(shù)據(jù)序列化效率問題*這里所說的數(shù)據(jù)類型String,Long都是JDK自帶的類型,數(shù)據(jù)在分布式系統(tǒng)中跨網(wǎng)絡(luò)傳輸就需要將數(shù)據(jù)序列化,默認(rèn)JDK序列化時(shí)效率低下,因此04HadoopSerializationTypes*使用Hadoop封裝的序列化類型。long--LongWritableString--TextIntegerIntWritable05*/單擊此處添加項(xiàng)正文06WordCountMapperClasspublicclassWordCountMapperextendsMapper<LongWritable,Text,Text,IntWritable>{任務(wù)實(shí)施:二、編寫代碼

/***這里就是Map階段具體業(yè)務(wù)邏輯實(shí)現(xiàn)的方法。該方法的調(diào)用取決于讀取數(shù)據(jù)的組件有沒有給MR傳入數(shù)據(jù)*如果有數(shù)據(jù)傳入,每一個(gè)<k,v>對(duì),map就會(huì)被調(diào)用一次*/@OverrideMapFunctioninHadoopprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{任務(wù)實(shí)施:二、編寫代碼

//拿到傳入進(jìn)來的一行內(nèi)容,把數(shù)據(jù)類型轉(zhuǎn)換為String

//key-k1,value-v1

//key=0,value="helloworld"

Stringline=value.toString()

//將這行內(nèi)容按照分隔符切割

String[]words=line.split("")任務(wù)實(shí)施:二、編寫代碼//words[0]="hello";words[1]="world"http://遍歷數(shù)組,每出現(xiàn)一個(gè)單詞就標(biāo)記一個(gè)數(shù)值1例如:〈單詞,1>for(Stringword:words){//使用MR上下文context,把Map階段處理的數(shù)據(jù)發(fā)送給Reduce階段作為輸入數(shù)據(jù)context.write(newText(word),newIntWritable(1))//k2="hello",v2=1;k2="world",v2=1任務(wù)實(shí)施:二、編寫代碼

//<hello,1><world,1>HadoopSparkDataTransmission//第一行hadoophadoopspark發(fā)送出去的是<hadoop,1><hadoop,1><spark,1>步驟2編寫Combiner類,在cn.jscfa.wordcount包下新建java文件WordCountCombiner.java,清空該文件并輸入如下代碼packagecn.jscfa.wordcountimportorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.Text任務(wù)實(shí)施:二、編寫代碼

importorg.apache.hadoop.mapreduce.Reducerimportjava.io.IOExceptionWordCountCombinerClasspublicclassWordCountCombinerextendsReducer<Text,IntWritable,Text,IntWritable>{@OverrideReduceFunctionExplanationprotectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{//輸入k2="hello"v2=<1,1,1><"hello",<1,1,1>任務(wù)實(shí)施:二、編寫代碼//1.局部匯總//key="hello";values=[1,1,1]intcount=0//求和V2for(IntWritablev:values){count+=v.get();count=count+v.get()任務(wù)實(shí)施:二、編寫代碼

//count為values這個(gè)數(shù)組的所有元素之和

context.write(key,newIntWritable(count))

//k2="hello",v2=3;<hello,3>

//輸入

//<k2,v2>

//<hello,1>任務(wù)實(shí)施:二、編寫代碼

//<world,1>//<hello,1>//<hadoop,1>//<hello,1>//<bob,1>//<hadoop,1>任務(wù)實(shí)施:二、編寫代碼

01//<tom,1>單擊此處添加項(xiàng)正文

02//<tom,1>單擊此處添加項(xiàng)正文

03//<spark,1>單擊此處添加項(xiàng)正文

04//<hello,<1,1,1>單擊此處添加項(xiàng)正文

05步驟3編寫Reducer類,在cn.jscfa.wordcount包下新建java文件WordCountReducer.java,清空該文件并輸入如下代碼

06packagecn.jscfa.wordcount單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

importorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Reducerimportjava.io.IOException編程模型繼承Reducer//都要繼承基類Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>這就是我們所說的編程模型/**任務(wù)實(shí)施:二、編寫代碼

*這里是MR程序Reducer階段處理的類*KEYIN:就是Reducer階段輸入的數(shù)據(jù)key類型,對(duì)應(yīng)Mapper階段輸出KEY類型*VALUEIN就是Reducer階段輸入的數(shù)據(jù)value類型,對(duì)應(yīng)Mapper階段輸出VALUE類型,在本案例中就是個(gè)數(shù)*KEYOUT:就是Reducer階段輸出的數(shù)據(jù)key類型,在本案例中,就是單詞Text*VALUEOUT:reducer階段輸出的數(shù)據(jù)value類型,在本案例中,就是單詞的總次數(shù)*/任務(wù)實(shí)施:二、編寫代碼

WordCountReducerClasspublicclassWordCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{/***這里是REDUCE階段具體業(yè)務(wù)類的實(shí)現(xiàn)方法HadoopSparkDataTransmission*第一行hadoophadoopspark發(fā)送出去的是<hadoop,1><hadoop,1><spark,1>*Reduce階段接收所有來自Map階段處理的數(shù)據(jù)之后,按照Key的字典序進(jìn)行排序*按照key是否相同作為一組去調(diào)用reduce方法任務(wù)實(shí)施:二、編寫代碼

*本方法的key就是這一組相同的kv對(duì)共同的Key*迭代器:<hadoop,[1,1]>*/@OverrideReduceFunctionExplanationprotectedvoidreduce(Textkey,Iterable<IntWritable>valueIOExceptionandInterruptedExceptionhandlinginContextContextcontext)throwsIOException,InterruptedException{任務(wù)實(shí)施:二、編寫代碼//定義一個(gè)計(jì)數(shù)器intcount=0//遍歷一組迭代器,把每個(gè)數(shù)量1累加起來就構(gòu)成了單詞總次數(shù)//for(IntWritableiw:value){count+=iw.get()任務(wù)實(shí)施:二、編寫代碼

context.write(key,newIntWritable(count))//輸入k2,v2;輸出k3,v3步驟4編寫主類,在cn.jscfa.wordcount包下新建java文件WordCountDriver.java,清空該文件并輸入如下代碼packagecn.jscfa.wordcountimportorg.apache.hadoop.conf.Configurationimportorg.apache.hadoop.fs.Path任務(wù)實(shí)施:二、編寫代碼

importorg.apache.hadoop.io.IntWritable單擊此處添加項(xiàng)正文

importorg.apache.hadoop.io.Text單擊此處添加項(xiàng)正文

importorg.apache.hadoop.mapreduce.Job單擊此處添加項(xiàng)正文

FileInputFormatImportimportorg.apache.hadoop.mapreduce.lib.input.FileInputFormat

FileOutputFormatImportimportorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat

/**單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

*Driver類就是MR程序運(yùn)行的主類,本類中組裝了一些程序運(yùn)行時(shí)所需要的信息*比如:使用的Mapper類是什么,Reducer類,輸入、輸出數(shù)據(jù)存放在哪里*/publicclassWordCountDriver{JavaProgramStartpublicstaticvoidmain(String[]args)throwsException{if(args==null||args.length==0){任務(wù)實(shí)施:二、編寫代碼args=newString[2]args[0]="D:\\wordcount\\input"args[1]="D:\\wordcount\\output"Configurationconf=newConfiguration()conf.set("","local")Jobwcjob=Job.getInstance(conf)任務(wù)實(shí)施:二、編寫代碼單擊此處添加正文

//指定MRJobjar包運(yùn)行主類

wcjob.setJarByClass(WordCountDriver.class)

wcjob.setMapperClass(WordCountMapper.class)

wcjob.setReducerClass(WordCountReducer.class)

//設(shè)置我們業(yè)務(wù)邏輯Mapper類的輸出key和value的數(shù)據(jù)類型任務(wù)實(shí)施:二、編寫代碼

wcjob.setMapOutputValueClass(IntWritable.class)

//設(shè)置我們業(yè)務(wù)邏輯Reducer類的輸出key和value的數(shù)據(jù)類型

wcjob.setOutputKeyClass(Text.class)

wcjob.setOutputValueClass(IntWritable.class)

//設(shè)置Combiner組件

wcjob.setCombinerClass(WordCountCombiner.class)任務(wù)實(shí)施:二、編寫代碼

//指定要處理的數(shù)據(jù)所在的位置單擊此處添加項(xiàng)正文

InputPathsSettingFileInputFormat.setInputPaths(wcjob,newPath(args[0]))

//指定處理完成之后的結(jié)果所保存的位置單擊此處添加項(xiàng)正文

SetOutputPathFileOutputFormat.setOutputPath(wcjob,newPath(args[1]))

//提交程序并且監(jiān)控打印程序執(zhí)行情況單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

booleanres=wcjob.waitForCompletion(true)

System.exit(res?0:1)任務(wù)實(shí)施:三、運(yùn)行程序

WordCount結(jié)果分析本地運(yùn)行,在主類WordCountDriver里面用鼠標(biāo)右鍵單擊,單擊運(yùn)行子命令,程序開始執(zhí)行,在D:\wordcount目錄下生成了一個(gè)output目錄,在output目錄下面有4個(gè)文件,分別是“_SUCCESS.crc”“part-r-00000.crc”“_SUCCESS”“part-r-00000”,結(jié)果存放在“part-r-00000”文件里面,其內(nèi)容如下

bob1單擊此處添加項(xiàng)正文

hadoop2單擊此處添加項(xiàng)正文

hello3單擊此處添加項(xiàng)正文任務(wù)實(shí)施:三、運(yùn)行程序

spark1tom2world1任務(wù)二MapReduce基本案例-數(shù)據(jù)去重06任務(wù)工單

任務(wù)場(chǎng)景現(xiàn)在需要?jiǎng)h掉目錄下所有文本文件中的重復(fù)行,重復(fù)行只保留一個(gè),使用Mapreduce編程完成

任務(wù)準(zhǔn)備全班學(xué)生以4人左右為一組,各組選出組長(zhǎng)。請(qǐng)組長(zhǎng)組織組員查找相關(guān)資料,進(jìn)行需求分析和系統(tǒng)設(shè)計(jì)問題1:描述數(shù)據(jù)去重的設(shè)計(jì)思路問題2:描述map階段工作流程問題3:描述reduce階段工作流程必備知識(shí)技能:一、MapshuffleMapshuffle在Map端的shuffle過程是對(duì)Map的結(jié)果進(jìn)行分區(qū)、排序、分割,然后將屬于同一劃分(分區(qū))的輸出合并在一起一并寫在磁盤上,最終得到一個(gè)分區(qū)有序的文件,分區(qū)有序的含義是map輸出的鍵值對(duì)按分區(qū)進(jìn)行排列,具有相同partition值的鍵值對(duì)存儲(chǔ)在一起,每個(gè)分區(qū)里面的鍵值對(duì)又按key值進(jìn)行升序排列(默認(rèn))必備知識(shí)技能:一、Mapshuffle

Partition對(duì)于map輸出的每一個(gè)鍵值對(duì),系統(tǒng)都會(huì)給定一個(gè)partition,partition值默認(rèn)是通過計(jì)算key的hash值后對(duì)Reducetask的數(shù)量取模獲得如果一個(gè)鍵值對(duì)的partition值為1,意味著這個(gè)鍵值對(duì)會(huì)交給第一個(gè)Reducer處理我們知道每一個(gè)Reducer的輸出都是有序的,但是將所有Reducer的輸出合并到一起卻并非全局有序的,如果要做到全局有序,我們?cè)撛趺醋瞿??最?jiǎn)單的方式,只設(shè)置一個(gè)Reducetask,但是這樣完全發(fā)揮不出集群的優(yōu)勢(shì),而且能應(yīng)對(duì)的數(shù)據(jù)量也很受限。最佳的方式是自己定義一個(gè)Partitioner,用輸入數(shù)據(jù)的最大值除以系統(tǒng)Reducetask數(shù)量的商作為分割邊界,也就是說分割數(shù)據(jù)的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執(zhí)行partition后的數(shù)據(jù)是整體有序的另一種需要我們自己定義一個(gè)Partitioner的情況是各個(gè)Reducetask處理的鍵值對(duì)數(shù)量極不平衡。對(duì)于某些數(shù)據(jù)集,由于很多不同的key的hash值都一樣,導(dǎo)致這些鍵值對(duì)都被分給同一個(gè)Reducer處理,而其他的Reducer處理的鍵值對(duì)很少,從而拖延整個(gè)任務(wù)的進(jìn)度。當(dāng)然,編寫自己的Partitioner必須保證具有相同key值的鍵值對(duì)分發(fā)到同一個(gè)Reducer必備知識(shí)技能:一、Mapshuffle

CollectorMap的輸出結(jié)果是由collector處理的,每個(gè)Map任務(wù)不斷地將鍵值對(duì)輸出到在內(nèi)存中構(gòu)造的一個(gè)環(huán)形數(shù)據(jù)結(jié)構(gòu)中。使用樹形數(shù)據(jù)結(jié)構(gòu)是為了更有效地使用內(nèi)存空間,在內(nèi)存中放置盡可能多的數(shù)據(jù)這個(gè)數(shù)據(jù)結(jié)構(gòu)其實(shí)就是個(gè)字節(jié)數(shù)組,叫Kvbuffer,名如其義,但是這里面不光放置了數(shù)據(jù),還放置了一些索引數(shù)據(jù),給放置索引數(shù)據(jù)的區(qū)域起了一個(gè)Kvmeta的別名,在Kvbuffer的一塊區(qū)域上穿了一個(gè)IntBuffer(字節(jié)序采用的是平臺(tái)自身的字節(jié)序)的馬甲。數(shù)據(jù)區(qū)域和索引數(shù)據(jù)區(qū)域在Kvbuffer中是相鄰不重疊的兩個(gè)區(qū)域,用一個(gè)分界點(diǎn)來劃分兩者,分界點(diǎn)不是一成不變的,而是每次Spill之后都會(huì)更新一次。初始的分界點(diǎn)是0,數(shù)據(jù)的存儲(chǔ)方向是向上增長(zhǎng),索引數(shù)據(jù)的存儲(chǔ)方向是向下增長(zhǎng)在HadoopMapReduce的MapOutputBuffer內(nèi)存管理機(jī)制中,kvbuffer、Kvmeta、bufindex和Kvindex構(gòu)成了一個(gè)精妙協(xié)同的四元體系。kvbuffer作為主數(shù)據(jù)緩沖區(qū),通過bufindex指針以前向增長(zhǎng)的方式存儲(chǔ)序列化后的鍵值對(duì)原始數(shù)據(jù),bufindex隨著每個(gè)鍵值對(duì)的寫入而按實(shí)際數(shù)據(jù)字節(jié)數(shù)遞增,當(dāng)達(dá)到緩沖區(qū)邊界時(shí)執(zhí)行環(huán)形回繞。與此同時(shí),Kvmeta作為元數(shù)據(jù)索引區(qū),采用固定格式的四元組結(jié)構(gòu)記錄每個(gè)鍵值對(duì)的定位信息,包括值起始位置、鍵起始位置、分區(qū)標(biāo)識(shí)符和值長(zhǎng)度。核心組件Kvindex作為元數(shù)據(jù)寫入指針,采用與bufindex相反的逆向遞減機(jī)制:初始時(shí)指向元數(shù)據(jù)區(qū)末端,每當(dāng)需要添加新的鍵值對(duì)索引時(shí),指針首先遞減四個(gè)整型單位,為新的四元組預(yù)留存儲(chǔ)空間,隨后依次填充各個(gè)字段值。這種bufindex正向增長(zhǎng)與Kvindex反向遞減的雙指針機(jī)制形成了對(duì)稱平衡的內(nèi)存布局,通過實(shí)時(shí)計(jì)算兩指針的相對(duì)位置監(jiān)控緩沖區(qū)使用狀態(tài)。當(dāng)bufindex與Kvindex指針相遇或達(dá)到預(yù)設(shè)閾值時(shí),系統(tǒng)自動(dòng)觸發(fā)溢出寫磁盤操作,確保內(nèi)存資源的有效循環(huán)利用。整個(gè)機(jī)制通過雙指針的精確協(xié)同控制和空間的高效劃分,為MapReduce框架的大規(guī)模數(shù)據(jù)處理提供了穩(wěn)定可靠的內(nèi)存管理基礎(chǔ),在保證性能的同時(shí)實(shí)現(xiàn)了資源的動(dòng)態(tài)優(yōu)化分配Kvbuffer的大小可以通過io.sort.mb設(shè)置,默認(rèn)大小為100M。但不管怎么設(shè)置,Kvbuffer的容量都是有限的,鍵值對(duì)和索引不斷地增加,加著加著,Kvbuffer總有不夠用的那天,那怎么辦?把數(shù)據(jù)從內(nèi)存刷到磁盤上再接著往內(nèi)存寫數(shù)據(jù),把Kvbuffer中的數(shù)據(jù)刷到磁盤上的過程就叫Spill,多么明了的叫法,內(nèi)存中的數(shù)據(jù)滿了就自動(dòng)地Spill到具有更大空間的磁盤關(guān)于Spill觸發(fā)的條件,也就是Kvbuffer用到什么程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死的,一點(diǎn)縫都不剩的時(shí)候再開始Spill,那Map任務(wù)就需要等Spill完成騰出空間之后才能繼續(xù)寫數(shù)據(jù);如果Kvbuffer只是滿到一定程度,比如80%的時(shí)候就開始Spill,那在Spill的同時(shí),Map任務(wù)還能繼續(xù)寫數(shù)據(jù),如果Spill夠快,Map可能都不需要為空閑空間而發(fā)愁。兩利相權(quán)取其重,一般選擇后者。Spill的門限可以通過io.sort.spill.percent設(shè)置,默認(rèn)是0.8必備知識(shí)技能:一、MapshuffleSort當(dāng)Spill觸發(fā)后,SortAndSpill先把Kvbuffer中的數(shù)據(jù)按照partition值和key兩個(gè)關(guān)鍵字升序排序,移動(dòng)的只是索引數(shù)據(jù),排序結(jié)果是Kvmeta中數(shù)據(jù)以partition為單位聚集在一起,同一partition內(nèi)的按照key有序必備知識(shí)技能:一、Mapshuffle

SpillSpill線程為這次Spill過程創(chuàng)建一個(gè)磁盤文件:從所有的本地目錄中輪詢查找能存儲(chǔ)這么大空間的目錄,找到之后在其中創(chuàng)建一個(gè)類似于“spill12.out”的文件。Spill線程根據(jù)排過序的Kvmeta挨個(gè)partition地把數(shù)據(jù)吐到這個(gè)文件中,一個(gè)partition對(duì)應(yīng)的數(shù)據(jù)吐完之后順序地吐下一個(gè)partition,直到把所有的partition遍歷完。一個(gè)partition在文件中對(duì)應(yīng)的數(shù)據(jù)也叫段(segment)。在這個(gè)過程中如果用戶配置了combiner類,那么在寫之前會(huì)先調(diào)用combineAndSpill(),對(duì)結(jié)果進(jìn)行進(jìn)一步合并后再寫出。Combiner會(huì)優(yōu)化MapReduce的中間結(jié)果,所以它在整個(gè)模型中會(huì)多次使用。那哪些場(chǎng)景才能使用Combiner呢?Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計(jì)算結(jié)果。所以從我的想法來看,Combiner只應(yīng)該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結(jié)果的場(chǎng)景。比如累加、最大值等。Combiner的使用一定得慎重,如果用得好,它對(duì)job執(zhí)行效率有幫助,反之會(huì)影響reduce的最終結(jié)果所有的partition對(duì)應(yīng)的數(shù)據(jù)都放在這個(gè)文件里,雖然是順序存放的,但是怎么直接知道某個(gè)partition在這個(gè)文件中存放的起始位置呢?強(qiáng)大的索引又出場(chǎng)了。有一個(gè)三元組記錄某個(gè)partition對(duì)應(yīng)的數(shù)據(jù)在這個(gè)文件中的索引:起始位置、原始數(shù)據(jù)長(zhǎng)度、壓縮之后的數(shù)據(jù)長(zhǎng)度,一個(gè)partition對(duì)應(yīng)一個(gè)三元組。然后把這些索引信息存放在內(nèi)存中,如果內(nèi)存中放不下了,后續(xù)的索引信息就需要寫到磁盤文件中了:從所有的本地目錄中輪詢查找能存儲(chǔ)這么大空間的目錄,找到之后在其中創(chuàng)建一個(gè)類似于“spill12.out.index”的文件,文件中不光存儲(chǔ)了索引數(shù)據(jù),還存儲(chǔ)了crc32的校驗(yàn)數(shù)據(jù)。spill12.out.index不一定在磁盤上創(chuàng)建,如果內(nèi)存(默認(rèn)1M空間)中能放得下就放在內(nèi)存中,即使在磁盤上創(chuàng)建了,它和spill12.out文件也不一定在同一個(gè)目錄下。每一次Spill過程就會(huì)至少生成一個(gè)out文件,有時(shí)還會(huì)生成index文件,Spill的次數(shù)也烙印在文件名中。索引文件和數(shù)據(jù)文件的對(duì)應(yīng)關(guān)系如下圖4-2-1所示在Spill線程如火如荼地進(jìn)行SortAndSpill工作的同時(shí),Map任務(wù)不會(huì)因此而停歇,而是一如既往地進(jìn)行著數(shù)據(jù)輸出。Map還是把數(shù)據(jù)寫到kvbuffer中,那問題就來了:只顧著悶頭按照bufindex指針向上增長(zhǎng),kvmeta只顧著按照Kvindex向下增長(zhǎng),是保持指針起始位置不變繼續(xù)跑呢,還是另謀出路?如果保持指針起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭之后再重新開始或者移動(dòng)內(nèi)存都比較麻煩,不可取。Map任務(wù)取kvbuffer中剩余空間的中間位置,用這個(gè)位置設(shè)置為新的分界點(diǎn),bufindex指針移動(dòng)到這個(gè)分界點(diǎn),Kvindex移動(dòng)到這個(gè)分界點(diǎn)的-16位置,然后兩者就可以和諧地按照自己既定的軌跡放置數(shù)據(jù)了,當(dāng)Spill完成,空間騰出之后,不需要做任何改動(dòng)繼續(xù)前進(jìn)。分界點(diǎn)的轉(zhuǎn)換如下圖4-2-2所示。Map任務(wù)總要把輸出的數(shù)據(jù)寫到磁盤上,即使輸出數(shù)據(jù)量很小,在內(nèi)存中全部能裝得下,在最后也會(huì)把數(shù)據(jù)刷到磁盤上必備知識(shí)技能:一、Mapshuffle

MergeMap任務(wù)如果輸出數(shù)據(jù)量很大,可能會(huì)進(jìn)行好幾次Spill,out文件和Index文件會(huì)產(chǎn)生很多,分布在不同的磁盤上。最后把這些文件進(jìn)行合并的merge過程閃亮登場(chǎng)Merge過程怎么知道產(chǎn)生的Spill文件都在哪了呢?從所有的本地目錄上掃描得到產(chǎn)生的Spill文件,然后把路徑存儲(chǔ)在一個(gè)數(shù)組里。Merge過程又怎么知道Spill的索引信息呢?沒錯(cuò),也是從所有的本地目錄上掃描得到Index文件,然后把索引信息存儲(chǔ)在一個(gè)列表里。到這里,又遇到了一個(gè)值得納悶的地方。在之前Spill過程中的時(shí)候?yàn)槭裁床恢苯影堰@些信息存儲(chǔ)在內(nèi)存中呢,何必又多了這步掃描的操作?特別是Spill的索引數(shù)據(jù),當(dāng)內(nèi)存超限之后把數(shù)據(jù)寫到磁盤,現(xiàn)在又要從磁盤把這些數(shù)據(jù)讀出來?在MapReduce的Spill過程中不將索引信息完全保存在內(nèi)存中,而采用磁盤索引機(jī)制,本質(zhì)上是受限于內(nèi)存資源的有限性——當(dāng)Map任務(wù)需要處理海量數(shù)據(jù)并可能經(jīng)歷多次Spill時(shí),若將所有Spill的索引數(shù)據(jù)都保留在內(nèi)存中,其累積占用的內(nèi)存空間將隨Spill次數(shù)線性增長(zhǎng),極易導(dǎo)致內(nèi)存耗盡而使任務(wù)失??;雖然從磁盤重新讀取索引需要額外的I/O操作,但這種設(shè)計(jì)通過將內(nèi)存占用控制在穩(wěn)定范圍內(nèi),確保了系統(tǒng)在處理任意規(guī)模數(shù)據(jù)時(shí)的可靠性和可擴(kuò)展性,這是在大數(shù)據(jù)場(chǎng)景下為保障任務(wù)成功執(zhí)行而做出的必要權(quán)衡。(對(duì)于內(nèi)存空間較大的土豪來說,用內(nèi)存來省卻這兩個(gè)I/O步驟還是值得考慮的。)為merge過程創(chuàng)建一個(gè)叫file.out的文件和一個(gè)叫file.out.Index的文件用來存儲(chǔ)最終的輸出和索引,一個(gè)partition一個(gè)partition的進(jìn)行合并輸出。對(duì)于某個(gè)partition來說,從索引列表中查詢這個(gè)partition對(duì)應(yīng)的所有索引信息,每個(gè)對(duì)應(yīng)一個(gè)段插入到段列表中。也就是這個(gè)partition對(duì)應(yīng)一個(gè)段列表,記錄所有的Spill文件中對(duì)應(yīng)的這個(gè)partition那段數(shù)據(jù)的文件名、起始位置、長(zhǎng)度等等對(duì)這個(gè)partition對(duì)應(yīng)的所有的segment進(jìn)行合并,目標(biāo)是合并成一個(gè)segment。當(dāng)這個(gè)partition對(duì)應(yīng)很多個(gè)segment時(shí),會(huì)分批地進(jìn)行合并:先從segment列表中把第一批取出來,以key為關(guān)鍵字放置成最小堆,然后從最小堆中每次取出最小的輸出到一個(gè)臨時(shí)文件中,這樣就把這一批段合并成一個(gè)臨時(shí)的段,把它加回到segment列表中;再?gòu)膕egment列表中把第二批取出來合并輸出到一個(gè)臨時(shí)segment,把它加入列表中;這樣往復(fù)執(zhí)行,直到剩下的段是一批,輸出到最終的文件中。最終的索引數(shù)據(jù)仍然輸出到Index文件中,如圖4-2-3必備知識(shí)技能:二、ReduceshuffleReduceshuffle在Reduce端,shuffle主要分為復(fù)制Map輸出、排序合并兩個(gè)階段CopyReduce任務(wù)通過HTTP協(xié)議向各個(gè)已完成的Map任務(wù)節(jié)點(diǎn)主動(dòng)拉?。≒ull)其對(duì)應(yīng)分區(qū)的數(shù)據(jù)。Map任務(wù)成功完成后,會(huì)通知父TaskTracker狀態(tài)已經(jīng)更新,TaskTracker進(jìn)而通知JobTracker(這些通知在心跳機(jī)制中進(jìn)行)。所以,對(duì)于指定作業(yè)來說,JobTracker能記錄Map輸出和TaskTracker的映射關(guān)系。Reduce會(huì)定期向JobTracker獲取Map的輸出位置,一旦拿到輸出位置,Reduce任務(wù)就會(huì)從此輸出對(duì)應(yīng)的TaskTracker上復(fù)制輸出到本地,而不會(huì)等到所有的Map任務(wù)結(jié)束必備知識(shí)技能:二、ReduceshuffleMergeSort在ReduceShuffle階段,拉?。≒ull)過來的數(shù)據(jù)會(huì)首先被存入一個(gè)內(nèi)存緩沖區(qū)。當(dāng)緩沖區(qū)中的數(shù)據(jù)量達(dá)到設(shè)定的閾值(例如,占用緩沖區(qū)容量的特定比例)時(shí),系統(tǒng)會(huì)將緩沖區(qū)內(nèi)所有數(shù)據(jù)(這些數(shù)據(jù)來自不同的Map任務(wù),但屬于同一個(gè)Reduce分區(qū))根據(jù)鍵進(jìn)行排序,然后溢寫(Spill)到磁盤,生成一個(gè)臨時(shí)的、內(nèi)部有序的文件。如果配置了Combiner,并且其適用于Reduce端(此場(chǎng)景較少,Combiner主要用在Map端),它可能會(huì)在溢寫前或后續(xù)的歸并階段被調(diào)用以化簡(jiǎn)數(shù)據(jù)。這個(gè)過程會(huì)重復(fù)多次,最終在磁盤上生成多個(gè)這樣的臨時(shí)文件。隨后,Reduce任務(wù)會(huì)啟動(dòng)一個(gè)多路歸并(MergeSort)流程,將這些臨時(shí)文件與內(nèi)存中剩余的數(shù)據(jù)合并,最終生成一個(gè)面向Reduce函數(shù)的、整體有序的輸入文件必備知識(shí)技能:三、MapReduce中Mapper、Partition、Reducer數(shù)目的確定與關(guān)系

Mapper由客戶端分片情況決定,客戶端獲取到輸入路徑的所有文件,依次對(duì)每個(gè)文件執(zhí)行分片,分片大小通過最大分片大小、最小分片大小、HDFS的blocksize綜合確定,分片結(jié)果寫入job.split提交給YARN,對(duì)每個(gè)分片分配一個(gè)Mapper,即確定了數(shù)目

Partition由PartitionerClass中的邏輯確定,默認(rèn)情況下使用的HashPartitioner中使用了hash值與reducerNum的余數(shù),即由reducerNum決定,等于Reducer數(shù)目

自定義Partitioner邏輯如果自定義的PartitionerClass中有其他邏輯,例如邏輯是固定的,也可以與Reducer數(shù)目無關(guān)

Reducer數(shù)量設(shè)置指南Reducer(ReduceTask)的默認(rèn)數(shù)量通常是1。設(shè)置ReduceTask數(shù)量時(shí),首先要考慮業(yè)務(wù)邏輯。例如,計(jì)算全局匯總結(jié)果時(shí),必須設(shè)置為1。ReduceTask數(shù)量與Map階段輸出的分區(qū)數(shù)密切相關(guān),但并不要求嚴(yán)格相等必備知識(shí)技能:三、MapReduce中Mapper、Partition、Reducer數(shù)目的確定與關(guān)系

ReduceTask與分區(qū)數(shù)匹配策略ReduceTask數(shù)量=分區(qū)數(shù)(1:1或N:N):這是最理想的情況,每個(gè)ReduceTask處理一個(gè)分區(qū)的數(shù)據(jù),負(fù)載均衡最好。ReduceTask數(shù)量<分區(qū)數(shù)(N:1或N:M,N>M):當(dāng)設(shè)置為1時(shí)(N:1),所有分區(qū)的數(shù)據(jù)都由一個(gè)ReduceTask處理;當(dāng)設(shè)置為M(M>1)時(shí),框架會(huì)通過規(guī)則(如分區(qū)號(hào)%M)將多個(gè)分區(qū)的數(shù)據(jù)分配給同一個(gè)ReduceTask,這可能導(dǎo)致數(shù)據(jù)傾斜和性能下降,但分區(qū)機(jī)制依然有效。ReduceTask數(shù)量>分區(qū)數(shù)(1:N或M:N,M>N):會(huì)產(chǎn)生(M-N)個(gè)沒有數(shù)據(jù)的空ReduceTask,它們會(huì)生成空輸出文件。這浪費(fèi)資源,但不影響計(jì)算結(jié)果的確定性。因此,應(yīng)避免將ReduceTask數(shù)量設(shè)置得遠(yuǎn)多于分區(qū)數(shù),也應(yīng)避免設(shè)置得過少(除非業(yè)務(wù)需要,如全局計(jì)算),以免導(dǎo)致資源浪費(fèi)或數(shù)據(jù)傾斜。通常建議將ReduceTask數(shù)量設(shè)置為與分區(qū)數(shù)相等,或根據(jù)集群資源和數(shù)據(jù)量進(jìn)行合理調(diào)整任務(wù)實(shí)施:一、準(zhǔn)備數(shù)據(jù)

步驟1準(zhǔn)備輸入數(shù)據(jù)。在Windows的D盤根目錄底下新建一個(gè)目錄,名為input。在input目錄底下新建2個(gè)文本文件file1.txt和file2.txt,file1.txt內(nèi)容如下

hello單擊此處添加項(xiàng)正文

world單擊此處添加項(xiàng)正文

say單擊此處添加項(xiàng)正文

goodbye單擊此處添加項(xiàng)正文

goodgoodstudy單擊此處添加項(xiàng)正文任務(wù)實(shí)施:一、準(zhǔn)備數(shù)據(jù)

daydayup

good

hello

world

file2.txt內(nèi)容如下

hello任務(wù)實(shí)施:一、準(zhǔn)備數(shù)據(jù)saysaygoodbyegoodgoodstudydaydayup任務(wù)實(shí)施:一、準(zhǔn)備數(shù)據(jù)

good

goodbye

world任務(wù)實(shí)施:二、編寫代碼

步驟1編寫Mapper類,在src/java下面創(chuàng)建包c(diǎn)n.jscfa.dedup,在該包下新建java文件DedupMapper.java,清空該文件并輸入如下代碼packagecn.jscfa.dedupimportorg.apache.hadoop.io.LongWritableimportorg.apache.hadoop.io.NullWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Mapper任務(wù)實(shí)施:二、編寫代碼

importjava.io.IOExceptionDedupMapperClassDefinitionpublicclassDedupMapperextendsMapper<LongWritable,Text,Text,NullWritable>{privatestaticTextfield=newText()//<0,2018-3-3c><11,2018-3-4d>@OverrideMapFunctioninHadoopprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{任務(wù)實(shí)施:二、編寫代碼

//k1-key=70;v1-value=“2018-3-3c”;2018-3-3cfield=valuecontext.write(field,NullWritable.get())//v2=null,k2=field=v1//<2018-3-3c,null><2018-3-4d,null>步驟2編寫Reducer類,在cn.jscfa.dedup包下新建java文件DedupReducer.java,清空該文件并輸入如下代碼:其內(nèi)容如下任務(wù)實(shí)施:二、編寫代碼

packagecn.jscfa.dedupimportorg.apache.hadoop.io.NullWritableimportorg.apache.hadoop.io.Textimportorg.apache.hadoop.mapreduce.Reducerimportjava.io.IOExceptionDedupReducerClasspublicclassDedupReducerextendsReducer<Text,NullWritable,Text,NullWritable>{任務(wù)實(shí)施:二、編寫代碼

由于待提煉的正文內(nèi)容為日期和無法識(shí)別的標(biāo)記,沒有實(shí)際意義或主題,無法提煉出符合要求的標(biāo)題。根據(jù)規(guī)則,當(dāng)正文內(nèi)容字?jǐn)?shù)小于規(guī)定字?jǐn)?shù)且無實(shí)際意義時(shí),可直接輸出原文。但考慮到輸出應(yīng)為有意義的信息,此處不適用直接輸出原文的規(guī)定。鑒于此情況,建議在實(shí)際應(yīng)用中,若遇到類似無實(shí)際信息的文本,可以返回“無效信息”或“無標(biāo)題”等提示,以表明無法從給定文本中提煉出有效標(biāo)題。但在本例中,由于規(guī)則限制,我們無法提供一個(gè)合適的標(biāo)題,故不輸出任何內(nèi)容。在實(shí)際操作中,可考慮返回“無標(biāo)題”作為默認(rèn)處理方式。然而,根據(jù)當(dāng)前規(guī)則,正確做法是不輸出任何內(nèi)容//<2018-3-3c,null><2018-3-4d,null><2018-3-4d,null>@OverrideReduceFunctionExplanationprotectedvoidreduce(Textkey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{//k2-key?="2018-3-3c";v2-values?=nullcontext.write(key,NullWritable.get())任務(wù)實(shí)施:二、編寫代碼單擊此處添加正文//k3=key="2018-3-3c";v3=null單擊此處添加項(xiàng)正文步驟3編寫主類,在cn.jscfa.dedup包下新建java文件DedupRunner.java,清空該文件并輸入如下代碼packagecn.jscfa.dedup單擊此處添加項(xiàng)正文importorg.apache.hadoop.conf.Configuration單擊此處添加項(xiàng)正文importorg.apache.hadoop.fs.Path單擊此處添加項(xiàng)正文importorg.apache.hadoop.io.NullWritable單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

importorg.apache.hadoop.io.Text單擊此處添加項(xiàng)正文

importorg.apache.hadoop.mapreduce.Job單擊此處添加項(xiàng)正文

FileInputFormatImportimportorg.apache.hadoop.mapreduce.lib.input.FileInputFormat

FileOutputFormatImportimportorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat

importjava.io.IOException單擊此處添加項(xiàng)正文

publicclassDedupRunner{單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

main函數(shù)定義publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{if(args==null||args.length==0){args=newString[2]args[0]="D:\\Dedup\\input"args[1]="D:\\Dedup\\output"Configurationconf=newConfiguration()任務(wù)實(shí)施:二、編寫代碼

Jobjob=Job.getInstance(conf)

job.setJarByClass(DedupRunner.class)

job.setMapperClass(DedupMapper.class)

job.setReducerClass(DedupReducer.class)

job.setOutputKeyClass(Text.class)

job.setOutputValueClass(NullWritable.class)任務(wù)實(shí)施:二、編寫代碼

01SetInputPathsforJobFileInputFormat.setInputPaths(job,newPath(args[0]))

02//指定處理完成之后的結(jié)果所保存的位置單擊此處添加項(xiàng)正文

03SetOutputPathFileOutputFormat.setOutputPath(job,newPath(args[1]))

04job.waitForCompletion(true)單擊此處添加項(xiàng)正文任務(wù)實(shí)施:三、運(yùn)行程序WordCount結(jié)果分析本地運(yùn)行,在主類WordCountDriver上用鼠標(biāo)右鍵單擊,單擊運(yùn)行子命令,程序開始執(zhí)行,在D:\wordcount目錄下生成了一個(gè)output目錄,在output目錄下面有4個(gè)文件,分別是“_SUCCESS.crc”“part-r-00000.crc”“_SUCCESS”“part-r-00000”,結(jié)果存放在“part-r-00000”文件里面,其內(nèi)容如下daydayup單擊此處添加項(xiàng)正文good單擊此處添加項(xiàng)正文goodgoodstudy單擊此處添加項(xiàng)正文goodbye單擊此處添加項(xiàng)正文任務(wù)實(shí)施:三、運(yùn)行程序

hello

say

world任務(wù)三MapReduce基本案例-求平均成績(jī)07任務(wù)工單01任務(wù)場(chǎng)景某高中期末考試后,有一份某年級(jí)學(xué)生的成績(jī)表,現(xiàn)要求統(tǒng)計(jì)本次期末考各個(gè)科目平均成績(jī),要求使用MapReduce編程實(shí)現(xiàn)02任務(wù)準(zhǔn)備全班學(xué)生以4人左右為一組,各組選出組長(zhǎng)。請(qǐng)組長(zhǎng)組織組員查找相關(guān)資料,進(jìn)行需求分析和系統(tǒng)設(shè)計(jì)問題1:描述求平均成績(jī)的設(shè)計(jì)思路問題2:MaskTask的個(gè)數(shù)由什么決定的問題3:切片是什么必備知識(shí)技能本節(jié)我們需要熟悉MapReduce常用編程組件必備知識(shí)技能:一、InputFormatInputFormat組件解析InputFormat是MapReduce當(dāng)中用于處理數(shù)據(jù)輸入的一個(gè)組件,是頂級(jí)的一個(gè)抽象父類,主要用于解決各個(gè)地方的數(shù)據(jù)源的數(shù)據(jù)輸入問題。其中InputFormat的UML類圖可以通過IDEA進(jìn)行查看(只有商業(yè)版本才有這個(gè)功能)必備知識(shí)技能:二、FileInputFormat常用類介紹FileInputFormat類介紹FileInputFormat類也是InputFormat的一個(gè)子類,如果需要操作HDFS上面的文件,基本上都是通過FileInputFormat類來實(shí)現(xiàn)的??梢酝ㄟ^FileInputFormat來實(shí)現(xiàn)各種格式的文件操作,F(xiàn)ileInputFormat的子類如表4-3-1必備知識(shí)技能:三、CombineTextInputFormat類

MapTask的個(gè)數(shù)由什么決定的在運(yùn)行我們的MapReduce程序的時(shí)候,我們可以清晰地看到會(huì)有多個(gè)MapTask的運(yùn)行,那么MapTask的個(gè)數(shù)究竟與什么有關(guān),是不是MapTask越多越好,或者說是不是MapTask的個(gè)數(shù)越少越好呢?我們可以通過MapReduce的源碼進(jìn)行查看MapTask的個(gè)數(shù)究竟是如何決定的。在MapReduce當(dāng)中,每個(gè)MapTask處理一個(gè)切片split的數(shù)據(jù)量,因此,一個(gè)切片對(duì)應(yīng)一個(gè)MapTask必備知識(shí)技能:三、CombineTextInputFormat類

切片是什么切片與block塊的概念很像,但是block塊是HDFS當(dāng)中存儲(chǔ)數(shù)據(jù)的單位,切片split是MapReduce當(dāng)中每個(gè)MapTask處理數(shù)據(jù)量的單位一個(gè)切片對(duì)應(yīng)一個(gè)MapTask,一個(gè)job事務(wù)的map階段的并行度由提交job時(shí)的切片數(shù),切片不考慮數(shù)據(jù)集整體,而是針對(duì)每一個(gè)文件單獨(dú)切片,數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片,并不會(huì)在磁盤上將其切分成片進(jìn)行存儲(chǔ)HDFS上面如果有以下兩個(gè)文件,文件大小分別為300M和10M,那么會(huì)啟動(dòng)多少個(gè)MapTaskfile1.txt300Mfile2.txt10M經(jīng)過FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下file1.txt.split1--0~128Mfile1.txt.split2--128~256Mfile1.txt.split3--256~300Mfile2.txt.split1--0~10M//針對(duì)每一個(gè)文件單獨(dú)切片,這個(gè)文件很小,但是會(huì)對(duì)應(yīng)一個(gè)切片,一個(gè)MapTask默認(rèn)的切片機(jī)制是對(duì)任務(wù)按照文件規(guī)劃切片,不管文件多小,都會(huì)是單獨(dú)的切片,交給一個(gè)MapTask,如果有大量的小文件,就會(huì)產(chǎn)生大量的MapTask,處理效率比較低。在小文件過多的場(chǎng)景使用CombineTextInputFormat可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣多個(gè)小文件就可以交給一個(gè)MapTask處理必備知識(shí)技能:三、CombineTextInputFormat類

切片機(jī)制在MapReduce框架中,切片主要是使用CombineTextInputFormat類來實(shí)現(xiàn)的,其機(jī)制旨在優(yōu)化小文件處理效率,過程可分為虛擬存儲(chǔ)與物理切片兩個(gè)階段必備知識(shí)技能:三、CombineTextInputFormat類

3.切片機(jī)制虛擬存儲(chǔ)過程該階段執(zhí)行邏輯劃分,將輸入文件集轉(zhuǎn)換為若干虛擬存儲(chǔ)塊(VirtualStorageBlock)。具體規(guī)則如下?若文件大小不大于設(shè)定的最大分片大?。╩axSplitSize,如4MB),則直接作為一個(gè)虛擬塊?若文件大小超過maxSplitSize但小于其兩倍,則均分為兩個(gè)虛擬塊,以避免產(chǎn)生過小碎片?若文件大小超過maxSplitSize的兩倍,則按maxSplitSize連續(xù)切分,直至剩余部分落入上述兩種情形之一示例說明設(shè)maxSplitSize=4MB,若輸入文件為8.02MB,則切分為?一個(gè)4MB的虛擬塊?剩余4.02MB文件因處于[maxSplitSize,2×maxSplitSize]區(qū)間,被進(jìn)一步均分為兩個(gè)2.01MB的虛擬塊必備知識(shí)技能:三、CombineTextInputFormat類

物理切片過程本階段將虛擬塊合并為實(shí)際分片(Split),規(guī)則如下?按文件順序依次處理虛擬塊,若某虛擬塊文件大小≥maxSplitSize,則直接轉(zhuǎn)為獨(dú)立分片?若虛擬塊文件大小小于maxSplitSize,則與后續(xù)虛擬塊合并,直至總大小≥maxSplitSize或無剩余虛擬塊示例說明設(shè)五個(gè)文件大小分別為1.7MB、0.1MB、5.1MB、3.4MB、6.8MB,經(jīng)虛擬存儲(chǔ)后生成七個(gè)虛擬塊[1.7,0.1,(2.55,2.55),3.4,(3.4,3.4)]最終合并為三個(gè)分片?分片1:1.7+0.1+2.55=4.35MB?分片2:2.55+3.4=5.95MB?分片3:3.4+3.4=6.8MB必備知識(shí)技能:四、自定義InputFormat

自定義MapReduceInputFormatMapReduce框架當(dāng)中提供了很多的文件輸入類,用于處理文件數(shù)據(jù)的輸入,如果提供的數(shù)據(jù)類不足以實(shí)現(xiàn)我們的需求,可以通過自定義的InputFormat來實(shí)現(xiàn)文件數(shù)據(jù)的輸入

自定義InputFormat和RecordReader自定義MyInputFormat繼承InputFormat,重寫isSplitable方法(是否需要分片),重寫createRecordReader方法,自定義MyRecordReader繼承RecordReader,實(shí)現(xiàn)抽象方法必備知識(shí)技能:五、partitioner詳解

MapReducePartitioner詳解partition(分區(qū))主要是將相同的數(shù)據(jù)發(fā)送到同一個(gè)reduceTask里面去,在MapReduce當(dāng)中有一個(gè)抽象類叫作Partitioner,默認(rèn)使用的實(shí)現(xiàn)類是HashPartitioner

自定義partitioner繼承Partitioner類重寫getPartition方法單擊此處添加項(xiàng)正文

調(diào)整ReduceTask數(shù)量根據(jù)設(shè)定的partition設(shè)置相應(yīng)的reducetask數(shù)量。提示:partition大于reduce數(shù)量報(bào)錯(cuò),partition小于reduce數(shù)量會(huì)生成空文件必備知識(shí)技能:六、MapReduce中的排序

MapReduce排序機(jī)制解析排序是MapReduce框架中的重要操作之一,MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)按照key排序,該操作屬于Hadoop的默認(rèn)行為。任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序,而不管邏輯上是否需要。默認(rèn)按照字典排序,且實(shí)現(xiàn)方法為快排MapTask數(shù)據(jù)處理流程對(duì)于MapTask,它會(huì)將處理結(jié)果暫時(shí)放到環(huán)形緩沖區(qū)中,當(dāng)環(huán)形緩沖區(qū)使用率達(dá)到一定閾值后,再對(duì)緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次快速排序,并將這些有序數(shù)據(jù)溢寫到磁盤上,當(dāng)數(shù)據(jù)處理完畢后,對(duì)磁盤上所有文件進(jìn)行歸并排序ReduceTask數(shù)據(jù)處理流程對(duì)于ReduceTask,它從每個(gè)MapTask上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值,則溢寫到磁盤上,否則存儲(chǔ)在內(nèi)存中。如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次歸并排序以生成一個(gè)更大文件。如果內(nèi)存中文件大小或者數(shù)目超過一定閾值,則進(jìn)行一次合并將數(shù)據(jù)溢寫到磁盤上。當(dāng)所有數(shù)據(jù)拷貝完畢后,ReduceTask統(tǒng)一對(duì)內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次歸并排序部分排序:根據(jù)輸入記錄的鍵對(duì)數(shù)據(jù)集進(jìn)行排序,保證輸出的每個(gè)文件內(nèi)部都有序必備知識(shí)技能:六、MapReduce中的排序全排序最終輸出結(jié)果只有一個(gè)文件,通過只設(shè)置一個(gè)ReduceTask實(shí)現(xiàn),這樣MapReduce所提供的并行架構(gòu)就喪失了分組排序Reduce階段的關(guān)鍵機(jī)制,用于控制如何將已排序的鍵分組并傳遞給reduce函數(shù),分組排序定義了在Reduce階段哪些鍵應(yīng)該被視為同一組,從而決定何時(shí)調(diào)用reduce函數(shù)。分組排序二次排序:自定義排序過程中,compareTo判斷條件為兩個(gè)即為二次排序單擊此處添加項(xiàng)正文必備知識(shí)技能:七、Combiner詳解

Combiner(規(guī)約)是MapReduce中Mapper和Reducer之外的一種組件CombinerandReducerDifferenceCombiner繼承自Reducer,但是Combiner是在每一個(gè)MapTask所在的節(jié)點(diǎn)上運(yùn)行,而Reducer是接收全局所有Mapper的輸出結(jié)果Combiner的意義就是對(duì)每一個(gè)MapTask的輸出進(jìn)行局部匯總,以減少網(wǎng)絡(luò)傳輸量Combiner應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯,且輸出的k應(yīng)該與Reducer的輸入k類型對(duì)應(yīng)自定義Combiner設(shè)置使用自定義Combiner繼承Reducer類,重寫reduce方法,使用job.setCombinerClass()設(shè)置自定義Combiner組件必備知識(shí)技能:八、GroupingComparator分組詳解

GroupingComparator在MapReduce中的作用GroupingComparator是MapReduce當(dāng)中reduce端的一個(gè)功能組件,主要作用是決定哪些數(shù)據(jù)為一組調(diào)用一次reduce的邏輯,默認(rèn)是每個(gè)不同的key作為不同的組,每個(gè)組調(diào)用一次reduce邏輯,我們可以自定義GroupingComparator實(shí)現(xiàn)不同的key作為同一組,調(diào)用一次reduce邏輯。分組本質(zhì)是排序,步驟如下自定義類繼承WritableComparator創(chuàng)建一個(gè)構(gòu)造方法將比較對(duì)象傳給父類重寫compare方法(3)重寫compare(WritableComparablea,WritableComparableb)方法設(shè)置分組:job.setGroupingComparatorClass(MyGroup)tip重寫compare(WritableComparablea,WritableComparableb),不是compare(Objecta,Objectb)必備知識(shí)技能

自定義outputFormat可以自定義MyOutputFormat繼承FileOutputFormat類,重寫getRecordWriter方法;自定義MyRecordWriter繼承RecordWriter類,實(shí)現(xiàn)其抽象方法任務(wù)實(shí)施

準(zhǔn)備數(shù)據(jù)步驟1:準(zhǔn)備輸入數(shù)據(jù)。在windows的D盤根目錄下新建一個(gè)目錄,名為avg。在avg目錄下新建一個(gè)score.csv,該文件數(shù)據(jù)如表4-3-2任務(wù)實(shí)施:二、編寫代碼

步驟1編寫Mapper類。Mapper階段的主要任務(wù)是將輸入數(shù)據(jù)解析為鍵值對(duì)的形式輸出給Reducer階段。在本例中,我們假設(shè)輸入數(shù)據(jù)是CSV格式的文件,每行包含學(xué)生ID、課程名和成績(jī)。Mapper類WordCountMap繼承自Mapper,并定義了輸入和輸出的鍵值對(duì)類型為L(zhǎng)ongWritable、Text和Text、IntWritable。在map方法中,我們首先將輸入的Text對(duì)象轉(zhuǎn)換為字符串,并使用StringTokenizer按逗號(hào)作為分隔符進(jìn)行解析。我們跳過第一個(gè)字段(學(xué)生ID),然后獲取課程名和成績(jī),并將解析結(jié)果分別賦值給course和score。最后,我們通過context.write方法將課程名和成績(jī)作為鍵值對(duì)輸出新建WordCountMap.java文件在cn.jscfa.avg包下新建java文件WordCountMap.java,清空該文件并輸入如下代碼packagecn.jscfa.avgimportorg.apache.hadoop.io.IntWritableimportorg.apache.hadoop.io.LongWritableimportorg.apache.hadoop.io.Text任務(wù)實(shí)施:二、編寫代碼

importorg.apache.hadoop.mapreduce.Mapperimportjava.io.IOExceptionimportjava.util.StringTokenizerWordCountMapClassDefinitionpublicclassWordCountMapextendsMapper<LongWritable,Text,Text,IntWritable>{//聲明Text類型的變量course,用于存儲(chǔ)課程名稱privateTextcourse=newText()任務(wù)實(shí)施:二、編寫代碼

//聲明IntWritable類型的變量score,用于存儲(chǔ)分?jǐn)?shù)privateIntWritablescore=newIntWritable()//重寫map方法,處理輸入的鍵值對(duì)@OverrideMapFunctioninHadoopprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//將Text類型的value轉(zhuǎn)換為String類型任務(wù)實(shí)施:二、編寫代碼單擊此處添加正文Stringline=value.toString()單擊此處添加項(xiàng)正文//使用StringTokenizer對(duì)行進(jìn)行分詞,按逗號(hào)分隔單擊此處添加項(xiàng)正文StringTokenizerInitializationStringTokenizeritr=newStringTokenizer(line,",")//如果分詞后的數(shù)量大于等于3單擊此處添加項(xiàng)正文if(itr.countTokens()>=3){單擊此處添加項(xiàng)正文//跳過第一個(gè)分詞(假設(shè)是學(xué)生ID)單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

itr.nextToken()

//獲取課程名稱

StringcourseName=itr.nextToken()

//獲取分?jǐn)?shù)并轉(zhuǎn)換為整數(shù)類型

Stringfen=itr.nextToken()

intscoreValue=Integer.parseInt(fen)任務(wù)實(shí)施:二、編寫代碼//將課程名稱設(shè)置到Text類型的變量course中course.set(courseName)//將分?jǐn)?shù)設(shè)置到IntWritable類型的變量score中score.set(scoreValue)//將課程名稱和分?jǐn)?shù)作為輸出鍵值對(duì)寫入Context中context.write(course,score)任務(wù)實(shí)施:二、編寫代碼

步驟2編寫Reducer類,在cn.jscfa.avg包下新建java文件WordCountMap.java,清空該文件并輸入如下代碼

packagecn.jscfa.avg單擊此處添加項(xiàng)正文

importorg.apache.hadoop.io.IntWritable單擊此處添加項(xiàng)正文

importorg.apache.hadoop.io.Text單擊此處添加項(xiàng)正文

importorg.apache.hadoop.mapreduce.Reducer單擊此處添加項(xiàng)正文

importjava.io.IOException單擊此處添加項(xiàng)正文任務(wù)實(shí)施:二、編寫代碼

importjava.util.Iterator//定義WordCountReduce類,繼承自Reducer類WordCountReduceClassDefinitionpublicclassWordCountReduceextendsReducer<Text,IntWritable,Text,IntWritable>{//聲明IntWritable類型的變量average,用于存儲(chǔ)平均值privateIntWritableaverage=newIntWritable()//重寫reduce方法,對(duì)相同鍵的值進(jìn)行合并任務(wù)實(shí)施:二、編寫代碼

@Override單擊此處添加項(xiàng)正文

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論