講義03-mapreduce入門講義note03_W_第1頁
講義03-mapreduce入門講義note03_W_第2頁
講義03-mapreduce入門講義note03_W_第3頁
講義03-mapreduce入門講義note03_W_第4頁
講義03-mapreduce入門講義note03_W_第5頁
已閱讀5頁,還剩11頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、Hadoop Day 031. MapReduce 介紹MapReduce思想在生活中處處可見?;蚨嗷蛏俣荚佑|過這種思想。MapReduce的思 想核心是“分而治之”,適用于大量復(fù)雜的任務(wù)處理場景(大規(guī)模數(shù)據(jù)處理場景)。 Map負(fù)責(zé)“分”,即把復(fù)雜的任務(wù)分解為若干個“簡單的任務(wù)”來并行處理??梢赃M(jìn)行拆分的前提是這些小任務(wù)可以并行計算,彼此間幾乎沒有依賴關(guān)系。 Reduce負(fù)責(zé)“合”,即對map階段的結(jié)果進(jìn)行全局匯總。MapReduce運(yùn)行在yarn集群 1. ResourceManager2. NodeManager這兩個階段合起來正是MapReduce思想的體現(xiàn)。 還有一個比較形象的語言解

2、釋MapReduce:我們要數(shù)圖書館中的所有書。你數(shù)1號書架,我數(shù)2號書架。這就是“Map”。我們?nèi)嗽蕉啵?數(shù)書就更快。 現(xiàn)在我們到一起,把所有人的統(tǒng)計數(shù)加在一起。這就是“Reduce”。 1.1. MapReduce 設(shè)計構(gòu)思和框架結(jié)構(gòu)MapReduce是一個分布式運(yùn)算程序的編程框架,核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在Hadoop集群上。 既然是做計算的框架,那么表現(xiàn)形式就是有個輸入(input),MapReduce操作這個輸 入(input),通過本身定義好的計算模型,得到一個輸出(output)。 Hadoop MapReduce構(gòu)

3、思:分而治之對相互間不具有計算依賴關(guān)系的大數(shù)據(jù),實現(xiàn)并行最自然的辦法就是采取分而治之的策略。并行計算的第一個重要問題是如何劃分計算任務(wù)或者計算數(shù)據(jù)以便對劃分 的子任務(wù)或數(shù)據(jù)塊同時進(jìn)行計算。不可分拆的計算任務(wù)或相互間有依賴關(guān)系 的數(shù)據(jù)無法進(jìn)行并行計算! 統(tǒng)一構(gòu)架,隱藏系統(tǒng)層細(xì)節(jié) 如何提供統(tǒng)一的計算框架,如果沒有統(tǒng)一封裝底層細(xì)節(jié),那么程序員則需要考慮諸如數(shù)據(jù)存儲、劃分、分發(fā)、結(jié)果收集、錯誤恢復(fù)等諸多細(xì)節(jié);為此, MapReduce設(shè)計并提供了統(tǒng)一的計算框架,為程序員隱藏了絕大多數(shù)系統(tǒng)層面的處理細(xì)節(jié)。 MapReduce最大的亮點在于通過抽象模型和計算框架把需要做什么(what need to do

4、)與具體怎么做(how to do)分開了,為程序員提供一個抽象和高層的編程接口和框架。程序員僅需要關(guān)心其應(yīng)用層的具體計算問題,僅需編寫少量的處理應(yīng)用本身計算問題的程序代碼。如何具體完成這個并行計算任務(wù)所相關(guān)的諸多系統(tǒng)層細(xì)節(jié)被隱藏起來,交給計算框架去處理:從分布代碼的執(zhí)行,到大到數(shù)千小到單個節(jié)點集群的自動調(diào)度使用。 構(gòu)建抽象模型:Map和ReduceMapReduce借鑒了函數(shù)式語言中的思想,用Map和Reduce兩個函數(shù)提供了高層 的并行編程抽象模型 Map: 對一組數(shù)據(jù)元素進(jìn)行某種重復(fù)式的處理; Reduce: 對Map的中間結(jié)果進(jìn)行某種進(jìn)一步的結(jié)果整理。 Map和Reduce為程序員提供

5、了一個清晰的操作接口抽象描述。MapReduce處理的數(shù)據(jù)類型是鍵值對。 MapReduce中定義了如下的Map和Reduce兩個抽象的編程接口,由用戶去編程 實現(xiàn):Map:(k1; v1) (k2; v2)Reduce:(k2; v2) (k3; v3)MapReduce 框架結(jié)構(gòu) 一個完整的mapreduce程序在分布式運(yùn)行時有三類實例進(jìn)程: 1.2.3.MRAppMaster負(fù)責(zé)整個程序的過程調(diào)度及狀態(tài)協(xié)調(diào)負(fù)責(zé)map階段的整個數(shù)據(jù)處理流程MapTaskReduceTask 負(fù)責(zé)reduce階段的整個數(shù)據(jù)處理流程2. MapReduce 編程規(guī)范 MapReduce 的開發(fā)一共有八個步驟,

6、 其中 Map 階段分為 2 個步驟,Shuffle 階段 4個步驟,Reduce 階段分為 2 個步驟 Map 階段 2 個步驟 1. 設(shè)置 InputFormat 類, 將數(shù)據(jù)切分為 Key-Value(K1和V1) 對, 輸入到第二步 2. 自定義 Map 邏輯, 將第一步的結(jié)果轉(zhuǎn)換成另外的 Key-Value(K2和V2) 對, 輸出結(jié)果 Shuffle 階段 4 個步驟.對輸出的 Key-Value 對進(jìn)行分區(qū) 對不同分區(qū)的數(shù)據(jù)按照相同的 Key 排序 (可選) 對分組過的數(shù)據(jù)初步規(guī)約, 降低數(shù)據(jù)的網(wǎng)絡(luò)拷貝對數(shù)據(jù)進(jìn)行分組, 相同 Key 的 Value 放入一個集合中

7、Reduce 階段 2 個步驟 1.對多個 Map 任務(wù)的結(jié)果進(jìn)行排序以及合并, 編寫 Reduce 函數(shù)實現(xiàn)自己的邏輯, 對輸入的 Key-Value 進(jìn)行處理, 轉(zhuǎn)為新的 Key-Value(K3和V3)輸出設(shè)置 OutputFormat 處理并保存 Reduce 輸出的 Key-Value 數(shù)據(jù)2. 3.WordCount需求: 在一堆給定的文本文件中統(tǒng)計輸出每一個單詞出現(xiàn)的總次數(shù) Step 1. 數(shù)據(jù)格式準(zhǔn)備1.創(chuàng)建一個新的文件 1.向其中放入以下內(nèi)容并保存 1.上傳到 HDFShdfs dfs mkdir /wordcount/hdfs dfs put wordcount.txt/w

8、ordcount/Step 2. Mapperpublic class WordCountMapper extendsMapper Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException String line = value.toString(); String split = line.split(,); for (String word : split) context.write(new Text(word),new L

9、ongWritable(1);Step 3. Reducerhello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoopcd /export/servers vim wordcount.txtpublic class WordCountReducer extendsReducer /*自定義我們的reduce邏輯 所有的key都是我們的單詞,所有的values都是我們單詞出現(xiàn)的次數(shù)*paramparam paramkeyvalues contextthrows IOExceptionthrows Interrupt

10、edException*/Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException long count = 0;for (LongWritable value : values) count += value.get();context.write(key,new LongWritable(count);Step 4. 定義主類, 描述 Job 并提交 Jobpublic class JobMain extends Con

11、figured implements Tool Overridepublic int run(String args) throws Exception Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName();/打包到集群上面運(yùn)行時候,必須要添加以下配置,指定程序的main函數(shù)job.setJarByClass(JobMain.class);/第一步:讀取輸入文件解析成key,value對job.setInputFormatClass(TextInputFormat.class); TextInputFo

12、rmat.addInputPath(job,newPath(hdfs:/50:8020/wordcount);/第二步:設(shè)置我們的mapper類job.setMapperClass(WordCountMapper.class);/設(shè)置我們map階段完成之后的輸出類型job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);/第三步,第四步,第五步,第六步,省略 /第七步:設(shè)置我們的reduce類job.setReducerClass(WordCountRedu

13、cer.class);/設(shè)置我們reduce階段完成之后的輸出類型job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class);/第八步:設(shè)置輸出類以及輸出路徑j(luò)ob.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,newPath(hdfs:/50:8020/wordcount_out); boolean b = job.waitForCompletion(tru

14、e); return b?0:1;/* 程序main函數(shù)的入口類 * param args* throws Exception*/public static void main(String args) throws Exception Configuration configuration = new Configuration();Tool tool=new JobMain();int run = ToolRunner.run(configuration, tool, args);System.exit(run);4. MapReduce 運(yùn)行模式 集群運(yùn)行模式 1.2.3.將 MapRe

15、duce 程序提交給 Yarn 集群, 分發(fā)到很多的節(jié)點上并發(fā)執(zhí)行處理的數(shù)據(jù)和輸出結(jié)果應(yīng)該位于 HDFS 文件系統(tǒng) 提交集群的實現(xiàn)步驟: 將程序打成JAR包,然后在集群的任意一個節(jié)點上用hadoop命 令啟動 yarn jar hadoop_hdfs_operate1.0SNAPSHOT.jarcn.itcast.hdfs.demo1.JobMain5. MapReduce 分區(qū)在 MapReduce 中, 通過我們指定分區(qū), 會將同一個分區(qū)的數(shù)據(jù)發(fā)送到同一個 Reduce 當(dāng)中進(jìn)行處理 例如: 為了數(shù)據(jù)的統(tǒng)計, 可以把一批類似的數(shù)據(jù)發(fā)送到同一個 Reduce 當(dāng)中, 在同一個Reduce 當(dāng)

16、中統(tǒng)計相同類型的數(shù)據(jù), 就可以實現(xiàn)類似的數(shù)據(jù)分區(qū)和統(tǒng)計等其實就是相同類型的數(shù)據(jù), 有共性的數(shù)據(jù), 送到一起去處理 Reduce 當(dāng)中默認(rèn)的分區(qū)只有一個 自定義 Partitioner主要的邏輯就在這里, 這也是這個案例的意義, 通過 Partitioner 將數(shù)據(jù)分發(fā)給不同的Reducer/* 這里的輸入類型與我們map階段的輸出類型相同 */public class MyPartitioner extends Partitioner/* 返回值表示我們的數(shù)據(jù)要去到哪個分區(qū) * 返回值只是一個分區(qū)的標(biāo)記,標(biāo)記所有相同的數(shù)據(jù)去到指定的分區(qū) */ Overridepublic int getPar

17、tition(Text text, NullWritable nullWritable,String result = text.toString().split(t)5; System.out.println(result);if (Integer.parseInt(result) 15) return 1;elsereturn 0;int i)Step 4. Main 入口public class PartitionMainextends Configured implements Toolpublic static void main(String args) throwsExcepti

18、onint run = ToolRunner.run(new Configuration(), newPartitionMain(), args);System.exit(run);Overridepublic int run(String args) throws Exception Job job = Job.getInstance(super.getConf(), PartitionMain.class.getSimpleName();job.setJarByClass(PartitionMain.class); job.setInputFormatClass(TextInputForm

19、at.class); job.setOutputFormatClass(TextOutputFormat.class); TextInputFormat.addInputPath(job,newPath(hdfs:/50:8020/partitioner); TextOutputFormat.setOutputPath(job,newPath(hdfs:/50:8020/outpartition); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class);

20、job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(MyReducer.class);/* 設(shè)置我們的分區(qū)類,以及我們的reducetask的個數(shù),注意reduceTask的個數(shù)一定要與我們的 * 分區(qū)數(shù)保持一致 */ job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTas

21、ks(2);boolean b = job.waitForCompletion(true); return b?0:1;6. MapReduce序列化和排序序列化 (Serialization) 是指把結(jié)構(gòu)化對象轉(zhuǎn)化為字節(jié)流 反序列化 (Deserialization) 是序列化的逆過程. 把字節(jié)流轉(zhuǎn)為結(jié)構(gòu)化對象. 當(dāng)要在進(jìn)程間傳遞對象或持久化對象的時候, 就需要序列化對象成字節(jié)流, 反之當(dāng)要將接收到或從磁盤讀取的字節(jié)流轉(zhuǎn)換為對象, 就要進(jìn)行反序列化 Java 的序列化 (Serializable) 是一個重量級序列化框架, 一個對象被序列化后, 會附帶很多額外的信息 (各種校驗信息, hea

22、der, 繼承體系等), 不便于在網(wǎng)絡(luò)中高效傳輸. 所以, Hadoop 自己開發(fā)了一套序列化機(jī)制(Writable), 精簡高效. 不用像 Java 對象類一樣傳輸多層的父子關(guān)系, 需要哪個屬性就傳輸哪個屬性值, 大大的減少網(wǎng)絡(luò)傳輸?shù)拈_銷Writable 是 Hadoop 的序列化格式, Hadoop 定義了這樣一個 Writable 接口. 一個類要支持可序列化只需實現(xiàn)這個接口即可 另外 Writable 有一個子接口是 WritableComparable, WritableComparable 是既可實現(xiàn)序列化, 也可以對key進(jìn)行比較, 我們這里可以通過自定義 Key 實現(xiàn)Writa

23、bleComparable 來實現(xiàn)我們的排序功能 數(shù)據(jù)格式如下aa b a b ba19378105要求:第一列按照字典順序進(jìn)行排列 第一列相同的時候, 第二列按照升序進(jìn)行排列解決思路:將 Map 端輸出的 中的 key 和 value 組合成一個新的 key (newKey),value值不變 , 在針對 newKey 排序的時候, 如果 key 相同, 就再這里就變成對value進(jìn)行排序Step 1. 自定義類型和比較器public class PairWritable implements WritableComparable/ 組合key,第一部分是我們第一列,第二部分是我們第二列 p

24、rivate String first; private int second; public PairWritable() public PairWritable(String first, int second) this.set(first, second);/* 方便設(shè)置字段 */public void set(String first, int second) this.first = first; this.second = second;/* 反序列化 */ Overridepublic void readFields(DataInput input) throwsthis.fi

25、rst = input.readUTF(); this.second = input.readInt();/* 序列化 */ OverrideIOExceptionpublic void write(DataOutput output) throws IOException output.writeUTF(first); output.writeInt(second);/* 重寫比較器 */public int compareTo(PairWritable o) /每次比較都是調(diào)用該方法的對象與傳遞的參數(shù)進(jìn)行比較,說白了就是第一行與第二行比較完了之后的結(jié)果與第三行比較, /得出來的結(jié)果再去與第

26、四行比較,依次類推 System.out.println(o.toString();System.out.println(this.toString();int comp = pareTo(o.first); if (comp != 0) return comp; else / 若第一個字段相等,則比較第二個字段 returnInteger.valueOf(this.second).compareTo ( Integer.valueOf(o.getSecond();public int getSecond() return second;public void se

27、tSecond(int second) this.second = second;public String getFirst() return first;public void setFirst(String first) this.first = first;Overridepublic String toString() return PairWritable+first= + first + , second= + second +;+Step 2. MapperStep 3. ReducerStep 4. Main 入口public class SortReducer extend

28、s Reducer private Text outPutKey = new Text(); Overridepublic void reduce(PairWritable key, Iterable values, Context context) throws IOException, InterruptedException /迭代輸出 for(IntWritable value : values) outPutKey.set(key.getFirst(); context.write(outPutKey, value);public class SortMapper extends M

29、apper private PairWritable mapOutKey = new PairWritable(); private IntWritable mapOutValue = new IntWritable();Overridepublicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException String lineValue = value.toString(); String strs = lineValue.split(t);/設(shè)置組合key和value = mapOutKey.set(strs0, Integer.valueOf(strs1); mapOutValue.set(Integer.valueOf(strs1); context.write(mapOutKey, mapOutValue);public class SecondarySortextends Configured implements Tool Overridepublic int run(String args) throws Exception Configu

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論