MapReduce的執(zhí)行過程(很詳細很清楚)_第1頁
MapReduce的執(zhí)行過程(很詳細很清楚)_第2頁
MapReduce的執(zhí)行過程(很詳細很清楚)_第3頁
MapReduce的執(zhí)行過程(很詳細很清楚)_第4頁
MapReduce的執(zhí)行過程(很詳細很清楚)_第5頁
已閱讀5頁,還剩6頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

MapReduce執(zhí)行過程

2011-07-1217:06:28|

分類:

\o"默認分類"默認分類

|

標簽:mapreduce執(zhí)行過程

|舉報|字號

訂閱1、Map-Reduce的邏輯過程假設我們需要處理一批有關天氣的數(shù)據(jù),其格式如下:按照ASCII碼存儲,每行一條記錄每一行字符從0開始計數(shù),第15個到第18個字符為年第25個到第29個字符為溫度,其中第25位是符號+/-0067011990999991950051507+0000+0043011990999991950051512+0022+0043011990999991950051518-0011+0043012650999991949032412+0111+0043012650999991949032418+0078+0067011990999991937051507+0001+0043011990999991937051512-0002+0043011990999991945051518+0001+0043012650999991945032412+0002+0043012650999991945032418+0078+現(xiàn)在需要統(tǒng)計出每年的最高溫度。Map-Reduce主要包括兩個步驟:Map和Reduce每一步都有key-value對作為輸入和輸出:map階段的key-value對的格式是由輸入的格式所決定的,如果是默認的TextInputFormat,則每行作為一個記錄進程處理,其中key為此行的開頭相對于文件的起始位置,value就是此行的字符文本map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應對于上面的例子,在map過程,輸入的key-value對如下:(0,0067011990999991950051507+0000+)(33,0043011990999991950051512+0022+)(66,0043011990999991950051518-0011+)(99,0043012650999991949032412+0111+)(132,0043012650999991949032418+0078+)(165,0067011990999991937051507+0001+)(198,0043011990999991937051512-0002+)(231,0043011990999991945051518+0001+)(264,0043012650999991945032412+0002+)(297,0043012650999991945032418+0078+)在map過程中,通過對每一行字符串的解析,得到年-溫度的key-value對作為輸出:(1950,0)(1950,22)(1950,-11)(1949,111)(1949,78)(1937,1)(1937,-2)(1945,1)(1945,2)(1945,78)在reduce過程,將map過程中的輸出,按照相同的key將value放到同一個列表中作為reduce的輸入(1950,[0,22,–11])(1949,[111,78])(1937,[1,-2])(1945,[1,2,78])在reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:(1950,22)(1949,111)(1937,1)(1945,78)其邏輯過程可用如下圖表示:2、編寫Map-Reduce程序編寫Map-Reduce程序,一般需要實現(xiàn)兩個函數(shù):mapper中的map函數(shù)和reducer中的reduce函數(shù)。一般遵循以下格式:map:(K1,V1)

->

list(K2,V2)publicinterfaceMapper<K1,V1,K2,V2>extendsJobConfigurable,Closeable{

voidmap(K1key,V1value,OutputCollector<K2,V2>output,Reporterreporter)

throwsIOException;}reduce:(K2,list(V))

->

list(K3,V3)

publicinterfaceReducer<K2,V2,K3,V3>extendsJobConfigurable,Closeable{

voidreduce(K2key,Iterator<V2>values,

OutputCollector<K3,V3>output,Reporterreporter)

throwsIOException;}

對于上面的例子,則實現(xiàn)的mapper如下:

publicclassMaxTemperatureMapperextendsMapReduceBaseimplementsMapper<LongWritable,Text,Text,IntWritable>{

@Override

publicvoidmap(LongWritablekey,Textvalue,OutputCollector<Text,IntWritable>output,Reporterreporter)throwsIOException{

Stringline=value.toString();

Stringyear=line.substring(15,19);

intairTemperature;

if(line.charAt(25)=='+'){

airTemperature=Integer.parseInt(line.substring(26,30));

}else{

airTemperature=Integer.parseInt(line.substring(25,30));

}

output.collect(newText(year),newIntWritable(airTemperature));

}}實現(xiàn)的reducer如下:publicclassMaxTemperatureReducerextendsMapReduceBaseimplementsReducer<Text,IntWritable,Text,IntWritable>{

publicvoidreduce(Textkey,Iterator<IntWritable>values,OutputCollector<Text,IntWritable>output,Reporterreporter)throwsIOException{

intmaxValue=Integer.MIN_VALUE;

while(values.hasNext()){

maxValue=Math.max(maxValue,values.next().get());

}

output.collect(key,newIntWritable(maxValue));

}}

欲運行上面實現(xiàn)的Mapper和Reduce,則需要生成一個Map-Reduce得任務(Job),其基本包括以下三部分:輸入的數(shù)據(jù),也即需要處理的數(shù)據(jù)Map-Reduce程序,也即上面實現(xiàn)的Mapper和Reducer此任務的配置項JobConf欲配置JobConf,需要大致了解Hadoop運行job的基本原理:Hadoop將Job分成task進行處理,共兩種task:maptask和reducetaskHadoop有兩類的節(jié)點控制job的運行:JobTracker和TaskTrackerJobTracker協(xié)調整個job的運行,將task分配到不同的TaskTracker上TaskTracker負責運行task,并將結果返回給JobTrackerHadoop將輸入數(shù)據(jù)分成固定大小的塊,我們稱之inputsplitHadoop為每一個inputsplit創(chuàng)建一個task,在此task中依次處理此split中的一個個記錄(record)Hadoop會盡量讓輸入數(shù)據(jù)塊所在的DataNode和task所執(zhí)行的DataNode(每個DataNode上都有一個TaskTracker)為同一個,可以提高運行效率,所以inputsplit的大小也一般是HDFS的block的大小。Reducetask的輸入一般為MapTask的輸出,ReduceTask的輸出為整個job的輸出,保存在HDFS上。在reduce中,相同key的所有的記錄一定會到同一個TaskTracker上面運行,然而不同的key可以在不同的TaskTracker上面運行,我們稱之為partitionpartition的規(guī)則為:(K2,V2)–>Integer,也即根據(jù)K2,生成一個partition的id,具有相同id的K2則進入同一個partition,被同一個TaskTracker上被同一個Reducer進行處理。publicinterfacePartitioner<K2,V2>extendsJobConfigurable{

intgetPartition(K2key,V2value,intnumPartitions);}下圖大概描述了Map-Reduce的Job運行的基本原理:

下面我們討論JobConf,其有很多的項可以進行配置:setInputFormat:設置map的輸入格式,默認為TextInputFormat,key為LongWritable,value為TextsetNumMapTasks:設置map任務的個數(shù),此設置通常不起作用,map任務的個數(shù)取決于輸入的數(shù)據(jù)所能分成的inputsplit的個數(shù)setMapperClass:設置Mapper,默認為IdentityMappersetMapRunnerClass:設置MapRunner,maptask是由MapRunner運行的,默認為MapRunnable,其功能為讀取inputsplit的一個個record,依次調用Mapper的map函數(shù)setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式setPartitionerClass和setNumReduceTasks:設置Partitioner,默認為HashPartitioner,其根據(jù)key的hash值來決定進入哪個partition,每個partition被一個reducetask處理,所以partition的個數(shù)等于reducetask的個數(shù)setReducerClass:設置Reducer,默認為IdentityReducersetOutputFormat:設置任務的輸出格式,默認為TextOutputFormatFileInputFormat.addInputPath:設置輸入文件的路徑,可以使一個文件,一個路徑,一個通配符??梢员徽{用多次添加多個路徑FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job運行前此路徑不應該存在當然不用所有的都設置,由上面的例子,可以編寫Map-Reduce程序如下:publicclassMaxTemperature{

publicstaticvoidmain(String[]args)throwsIOException{

if(args.length!=2){

System.err.println("Usage:MaxTemperature<inputpath><outputpath>");

System.exit(-1);

}

JobConfconf=newJobConf(MaxTemperature.class);

conf.setJobName("Maxtemperature");

FileInputFormat.addInputPath(conf,newPath(args[0]));

FileOutputFormat.setOutputPath(conf,newPath(args[1]));

conf.setMapperClass(MaxTemperatureMapper.class);

conf.setReducerClass(MaxTemperatureReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);

}}3、Map-Reduce數(shù)據(jù)流(dataflow)Map-Reduce的處理過程主要涉及以下四個部分:客戶端Client:用于提交Map-reduce任務jobJobTracker:協(xié)調整個job的運行,其為一個Java進程,其mainclass為JobTrackerTaskTracker:運行此job的task,處理inputsplit,其為一個Java進程,其mainclass為TaskTrackerHDFS:hadoop分布式文件系統(tǒng),用于在各個進程間共享Job相關的文件3.1、任務提交JobClient.runJob()創(chuàng)建一個新的JobClient實例,調用其submitJob()函數(shù)。向JobTracker請求一個新的jobID檢測此job的output配置計算此job的inputsplits將Job運行所需的資源拷貝到JobTracker的文件系統(tǒng)中的文件夾中,包括jobjar文件,job.xml配置文件,inputsplits通知JobTracker此Job已經(jīng)可以運行了提交任務后,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。

3.2、任務初始化

當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務并初始化任務。初始化首先創(chuàng)建一個對象來封裝job運行的tasks,status以及progress。在創(chuàng)建task之前,job調度器首先從共享文件系統(tǒng)中獲得JobClient計算出的inputsplits。其為每個inputsplit創(chuàng)建一個maptask。每個task被分配一個ID。

3.3、任務分配

TaskTracker周期性的向JobTracker發(fā)送heartbeat。在heartbeat中,TaskTracker告知JobTracker其已經(jīng)準備運行一個新的task,JobTracker將分配給其一個task。在JobTracker為TaskTracker選擇一個task之前,JobTracker必須首先按照優(yōu)先級選擇一個Job,在最高優(yōu)先級的Job中選擇一個task。TaskTracker有固定數(shù)量的位置來運行maptask或者reducetask。默認的調度器對待maptask優(yōu)先于reducetask當選擇reducetask的時候,JobTracker并不在多個task之間進行選擇,而是直接取下一個,因為reducetask沒有數(shù)據(jù)本地化的概念。

3.4、任務執(zhí)行

TaskTracker被分配了一個task,下面便要運行此task。首先,TaskTracker將此job的jar從共享文件系統(tǒng)中拷貝到TaskTracker的文件系統(tǒng)中。TaskTracker從distributedcache中將job運行所需要的文件拷貝到本地磁盤。其次,其為每個task創(chuàng)建一個本地的工作目錄,將jar解壓縮到文件目錄中。其三,其創(chuàng)建一個TaskRunner來運行task。TaskRunner創(chuàng)建一個新的JVM來運行task。被創(chuàng)建的childJVM和TaskTracker通信來報告運行進度。

3.4.1、Map的過程MapRunnable從inputsplit中讀取一個個的record,然后依次調用Mapper的map函數(shù),將結果輸出。map的輸出并不是直接寫入硬盤,而是將其寫入緩存memorybuffer。當buffer中數(shù)據(jù)的到達一定的大小,一個背景線程將數(shù)據(jù)開始寫入硬盤。在寫入硬盤之前,內存中的數(shù)據(jù)通過partitioner分成多個partition。在

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論