分布式系統(tǒng)組件原理介紹_第1頁
分布式系統(tǒng)組件原理介紹_第2頁
分布式系統(tǒng)組件原理介紹_第3頁
分布式系統(tǒng)組件原理介紹_第4頁
分布式系統(tǒng)組件原理介紹_第5頁
已閱讀5頁,還剩27頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)

文檔簡介

分布式系統(tǒng)組件介紹

Hadoop原理

分為HDFS與Yarn兩個部分。HDFS有Namenode和Datanode兩個部分。每個節(jié)點占

用一個電腦。Datanode定時向Namenode發(fā)送心跳包,心跳包中包含Datanode的校驗等信

息,用來監(jiān)控Datanode。HDFS將數(shù)據(jù)分為塊,默認(rèn)為64M每個塊信息按照配置的參數(shù)分

別備份在不同的Datanode,而數(shù)據(jù)塊在哪個節(jié)點上,這些信息都存儲到Narnenode上面cYarn

是MapReduce2,可以集成更多的組件,如spark>mpi等。MapReduce包括Jobtraker與

Tasktrakcr兩個部分。其中JobTrakcr是在主節(jié)點上,負(fù)責(zé)整體的調(diào)度。Tasktrakcr在slave

節(jié)點上,當(dāng)提交任務(wù)后,將其交給Jobtraker進行調(diào)度,調(diào)度到該任務(wù)之后就會將jar包發(fā)送

到響應(yīng)的Tasktraker,以實現(xiàn)分布式中移動計算資源而非移動數(shù)據(jù)。因為這些任務(wù)都是并發(fā)

執(zhí)行的,所以每個任務(wù)需要調(diào)用哪個節(jié)點的數(shù)據(jù)必須非常的明確,通常這一部分是通過

Jobtraker進行指揮。

在MapReduce的開始,每個block作為一個Map輸入,Map輸入后就進入shuffle階段。

Shuffle階段主:要分為Map和Reduce兩個階段。在MapShuffle階段,Map輸入按用戶的程

序進行處理,生成的結(jié)果按key排序,然后進入內(nèi)存,溢出后形成一個spill寫入磁盤,這

樣多個spill在這個節(jié)點中進行多路歸并排序(勝者樹)形成一個排序文件寫入磁盤,這期

間那些Map文件交給那些節(jié)點處理由Jobtraker進行調(diào)度,最后生成一個大排序的文件,并

且刪除spilL之后,再將多個節(jié)點上已經(jīng)排序的文件進行行多路歸并排序(一個大文件N

分到n個節(jié)點,每個節(jié)點分為k個spill,?個spill長度s,時間復(fù)雜度是N(logn(n個節(jié)點

多路歸并排序)+k)gk(每個節(jié)點內(nèi)k個spill排序)-logs(每個spill內(nèi)部進行排序)),

N=nks,所以最后的復(fù)雜度亦是NlogN)。

完成MapShuffle階段后通知Jobtraker進入ReduceShuffle階段。在這個階段,因為已

經(jīng)排序,很容易將用戶的程序直接作用到相同key的數(shù)據(jù)上,這些數(shù)據(jù)作為Reduce的輸入

進行處理,最終將輸出的結(jié)果數(shù)據(jù)寫入到HDFS上,并刪除磁盤數(shù)據(jù)。Map一般多,Reduce

少,所以通過Hash的方法將Map文件映射到Reduce上,進行處理,這個階段叫做Pa(ition<,

為了避免譬如所有數(shù)據(jù)相加這種操作使得數(shù)據(jù)負(fù)載移動的數(shù)量少的Reduce階段,造成效率

低下的結(jié)果,我們可以在在MapShuffle階段加一個Combine階段,這個Combine是在每一

臺節(jié)點上將已經(jīng)排序好的文件進行一次Reduce并將結(jié)果作為ReduceShuffle階段的輸入,

這樣可以大大減少數(shù)據(jù)的輸入量。通常Reduce的個數(shù)通過用戶來指定,通常和CPU個數(shù)相

適應(yīng)才能使其效率達到最大。

HBase原理

Hbase是列存儲數(shù)據(jù)庫。其存儲的組織結(jié)構(gòu)就是將相同的列族存儲在?起,因此得名的。

Hbase存儲有行鍵,作為唯一標(biāo)識,列表示為〈列族〉:〈列〉存儲信息,如address:city,address:

provice,然后是時間戳。

Hbase物理模型中,有一個總結(jié)點HMaster,通過其自帶的zookeeper與客戶端相連接。

Hbse作為分布式每一個節(jié)點作為一個RegionServer,維乎Region的狀態(tài)和管理。Region是

數(shù)據(jù)管理的基本單位。最初只有一-個,通過擴充后達到閾值然后分裂,通過Server控制其

規(guī)模。在RegionServer中,每一個store作為一個列族。當(dāng)數(shù)據(jù)插入進來,新數(shù)據(jù)成為

Memstore,寫入內(nèi)存,當(dāng)Memstore達到閾值后,通過Flashcache進程將數(shù)據(jù)寫入storeFile,

也就是當(dāng)內(nèi)存數(shù)據(jù)增多后溢出成一個SloreFile寫入磁盤,這里和Hadoop的spill類似,這

據(jù)后一并寫入。當(dāng)StoreFile數(shù)量過多時,進行合并,將形成一個大的StoreFile并且刪

除掉原來的StoreFile。再當(dāng)SloreFile大小超過一定閾值后,分裂成Region。

HBase有一個ROOT表和META表。META表記錄用戶Region的信息,但是隨著數(shù)據(jù)

增多,META也會增大,進而分裂成多個Region,那我們用ROOT表記錄下META的信

息,是一個二級表,而zookeeper中記錄ROOT表的locationo當(dāng)我們需找找到一條信息時,

先去zookccpcr看找ROOT,從ROOT中查找META找到META位置,在進入META表中

尋找該數(shù)據(jù)所在Region,再讀取該Region的信息。HBase適合大量插入乂同時讀的情況,

其瓶頸是硬盤的傳輸速度而不再是像Oracle一樣瓶頸在硬盤的尋道速度。

Zookeeper原理

Zookeepcr是一個資源管理庫,對節(jié)點進行協(xié)調(diào)、通信、失敗處理、節(jié)點損壞的處理等,

是一個無中心設(shè)計,主節(jié)點通過選舉產(chǎn)生。Zookeeper的節(jié)點是Znode。每一個節(jié)點可以存

放1M的數(shù)據(jù),client訪問服務(wù)器時創(chuàng)建一個Znode,可以是短暫的Znode,其上可以放.上觀

察Waicher對node進行監(jiān)控。Zookeeper有高可用性,每個機器復(fù)制一份數(shù)據(jù),只要有一般

以上的機器可以正常的運行,整個集群就可以工作。比如6臺的集群容忍2臺斷開,超過兩

臺達到一般的數(shù)最就不可以,因此集群通常都是奇數(shù)來節(jié)約資源。

Zookeeper使用zab協(xié)議,是一個無中心協(xié)議,通過選舉的方式產(chǎn)生leader,通過每臺

機器的信息擴散選舉最閑的資源利用較少的節(jié)點作為主控。同時當(dāng)配置數(shù)據(jù)有更改更新時,

在每個節(jié)點上有配置watcher并觸發(fā)讀取更改,。因此能?哆保證一致性。每個節(jié)點通過leader

廣播的方式,使所有follower同步。

Zookeeper可以實現(xiàn)分布式鎖機制。通過waicher監(jiān)控,對每個Znode的鎖都有一個獨

一的編號,按照序號的大小比較,來分配鎖。當(dāng)一個暫時Znode完結(jié)后刪除本節(jié)點,通知

leader完結(jié),之后下一個Znode獲取鎖進行操作。

Kafka原理

Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由Linkedln公司開發(fā),之后成為Apache項

目的一部分。Kafka是一種快速、可擴展的、設(shè)計內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提

交日志服務(wù)。它被設(shè)計為一個分布式系統(tǒng),易于向外擴展;它何時為發(fā)布和訂閱提供高吞吐

量;它支持多訂閱者,當(dāng)失敗時能自動平衡消費者;它將消息持久化到磁盤,因此可用于批

量消費,例如ETL,以及實時應(yīng)用程序。broker和生產(chǎn)者、消費者各自都是集群,集群中的

各個實例他們之間是對等的,集群擴充節(jié)點很方便。

Kafka的基本概念包括話題、生產(chǎn)者、消費者、代理或者kafka集群。話題是特定類型

的消息流。消息是字節(jié)的有效負(fù)載,話題是消息的分類名或種子名。生產(chǎn)者是能夠發(fā)布消息

到話題的任何對象。已發(fā)布的消息保存在?組服務(wù)器中,它們被稱為代理或Kafka集群。消

費者可以訂閱一個或多個話題,并從Brokei?拉數(shù)據(jù),從而消費這些已發(fā)布的消息、。

Kafka的存儲布局非常簡單。話題的每個分區(qū)對應(yīng)一個邏輯日志。物理上,一個日志為

相同大小的一組分段文件,每次生產(chǎn)者發(fā)布消息到一個分區(qū),代理就將消息追加到最后一個

段文件中。當(dāng)發(fā)布的消息數(shù)量達到設(shè)定值或者經(jīng)過一定的時間后,段文件真正寫入磁盤中。

寫入完成后,消息公開給消費者。段文件機制和Hadoop中spill類似。消費者始終從特定分

區(qū)順序地獲取消息,如果消費者知道特定消息的偏移量,也就說明消費者已經(jīng)消費了之前的

所有消息。消費者向代理發(fā)出異步拉請求,準(zhǔn)備字節(jié)緩沖區(qū)用于消費。每個異步拉請求都包

含要消費的消息偏移量與其它消息系統(tǒng)不同,Kafka代理是無狀態(tài)的。這意味著消費者必須

維護已消費的狀態(tài)信息。這些信息由消費者自己維護,代理完全不管。消費者可以故意倒回

到老的偏移量再次消費數(shù)據(jù)。這違反了隊列的常見約定,但被證明是許多消費者的基本特征。

kaflca的broker在配置文件中可以配置最多保存多少小時的數(shù)據(jù)和分區(qū)最大的空間占

用,過期的和超量的數(shù)據(jù)會被broker自動清除掉。

Kafka會記錄offset到zk,同時又在內(nèi)存中維護offset,允許快速的checkpoint,如果

consumer比partition多是浪費,因為kafka不允許partition上并行consumer讀取。同時,

consumer比partition少,一個consumer會對應(yīng)多個partition,有可能導(dǎo)致partition中數(shù)據(jù)的

讀取不均勻,也不能保證數(shù)據(jù)間的順序性,kafka只有在一個partition讀取的時候才能保證

時間上是有順序的。增加partition或者consumer或者broker會導(dǎo)致rebalance,所以rebalance

后consumer對應(yīng)的partition會發(fā)生變化。

Spark原理

spark可以很容易和yam結(jié)合,直接調(diào)用HDFS、Hbase上面的數(shù)據(jù),和hadoop結(jié)合。

配置很容易。spark發(fā)展迅猛,框架比hadoop更加靈活實用。減少了延時處理,提高性能效

率實用靈活性。也可以與hadoop切實相互結(jié)合。

spark核心部分分為RDD。SparkSQL、SparkStreamingsMLlib、GraphX、SparkR等

核心組件解決了很多的大數(shù)據(jù)問題,其完美的框架日受歡迎。其相應(yīng)的生態(tài)環(huán)境包括zepplin

等可視化方面,正日益壯大。大型公司爭相實用spark來代替原有hadoop上相應(yīng)的功能模

塊。Spark讀寫過程不像hadoop溢出寫入磁盤,都是基于內(nèi)存,因此速度很快。另外DAG

作業(yè)調(diào)度系統(tǒng)的寬窄依賴讓Spark速度提高。

RDD是彈性分布式數(shù)據(jù)也是spark的核心,完全彈性的,如果數(shù)據(jù)丟失一部分還可以重

建。有自動容錯、位置感知調(diào)度和可伸縮性,通過數(shù)據(jù)檢杳點和記錄數(shù)據(jù)更新金象容錯性檢

查。通過SparkConlexl.texlFile。加載文件變成RDD,然后通過Iransformalion構(gòu)建新的RDD,

通過action將RDD存儲到外部系統(tǒng)。

RDD使用延遲加載,也就是懶加載,只有當(dāng)用到的時候才加載數(shù)據(jù)。如果加載存儲所

有的中間過程會浪費空間,因此要延遲加載。一旦spark看到整個變換鏈,他可以計算僅需

的結(jié)果數(shù)據(jù),如果下面的函數(shù)不需要數(shù)據(jù)那么數(shù)據(jù)也不會再加載。轉(zhuǎn)換RDD是惰性的,只

有在動作中才可以使用它們。

Spark分為driver和executor,driver提交作業(yè),executor是application早worknodc上的

進程,運行task,driver對應(yīng)為sparkcontext。Spark的RDD操作有iransformalion、action

Transformation對RDD進行依賴包裝,RDD所對應(yīng)的依賴都進行DAG的構(gòu)建并保存,在

worknode掛掉之后除了通過備份恢復(fù)還可以通過元數(shù)據(jù)對其保存的依賴再計算一次得到。

當(dāng)作業(yè)提交也就是調(diào)用runJob時,spark會根據(jù)RDD構(gòu)建DAG圖,提交給DAGScheduler,

這個DAGSchcduler是在SparkContext創(chuàng)建時一同初始化的,他會對作業(yè)進行調(diào)度處理。當(dāng)

依賴圖構(gòu)建好以后,從action開始進行解析,每一個操作作為一個task,每遇到shuffle就

切割成為一個taskSet,并把數(shù)據(jù)輸出到磁盤,如果不是shuffle數(shù)據(jù)還在內(nèi)存中存儲。就這

樣再往前推進,直到?jīng)]有算子,然后運行從前面開始,如果沒有action的算子在這里不會執(zhí)

行,直到遇到action為止才開始運行,這就形成了spark的懶加載,taskset提交給TaskSheduler

生成TaskSetManager并且提交給Executor運行,運行結(jié)束后反饋給DAGScheduler完成一個

taskSet,之后再提交下一個,當(dāng)TaskSet運行失敗時就返回DAGScheduler并重新再次創(chuàng)建。

一個job里面可能有多個TaskSet,一個application可能包含多個job。

HDFS介紹

HDFS(HadoopDistributedFileSystem,Hadoop分布式文件系統(tǒng)),它是一個高度容錯性的

系統(tǒng),適合部署在廉價的磯器上。HDFS能提供高吞吐量的數(shù)據(jù)訪問,適合那些有著超大數(shù)

據(jù)集(largedataset)的應(yīng)用程序。

HDFS的設(shè)計特點是:

1、大數(shù)據(jù)文件。

2、文件分塊存儲,HDFS會將一個完整的大文件平均分塊存儲到不同計算器上,讀寫效率

高。

3、流式數(shù)據(jù)討問,一次寫入多次讀寫,不支持修改(hadoop0.21之前支持Append),

4、廉價硬件,HDFS可以應(yīng)用在普通PC機上。

5、硬件故障,HDFS認(rèn)為所有計算機都可能會出問題,通過多個副本解決。

RacklRack2

HDFS讀寫文件流程-讀

2:獷bkxklocations

1:openDistributedNameNode

HDFSFHeSystem

dient3:read

FSOatanamenode

Inputstream

clientJVM

dientnode

4:read5:read

DataNodeDataNodeDataNode

datanodedatanodedatanode

HDFS讀寫文件流程-寫

HDFS常見操作:

hadoopfs-Is/data/查看所有文件

hadoopfs-du-h/data/列出匹配pattern的指定的文件系統(tǒng)空間總量(單位bytes),等

價于unix下的針對目錄的du

hadoopfs-chmod[-R]777/data/修改文件的權(quán)限,-上標(biāo)記遞歸修改。

hadoopfs-mkdir/data/在指定位置創(chuàng)建目錄。

hadoopfs-putxxx.log/data/從本地系統(tǒng)拷貝文件到DFS。

hadoopfs-rmrZdala/xxx.log歸刪掉所有的文件和目錄,等價于unix下的rm-rf<src>o

hadoopfs-get/data/xxx.log從DFS拷貝文件到本地文彳匕系統(tǒng)

hadoopfs-getmerge/data/*Jogdata.log顧名思義,從DFS拷貝多個文件、合并排序為一個

文件到本地文件系統(tǒng)。

hadoopfs-mv<src><dsl>:將制定格式的文件move到指定的目標(biāo)位置。當(dāng)src為多個文

件時,dst必須是個目錄

HDFS實踐?HDFSjavaAPI讀操作

Configurationconf=newConfiguration();

PathuriPath=newPath(uri);

try(

hdfs=FileSystem.get(|uriPath|.toUri(),conf);

ycatch(lOExceptxone)<

e.printStackTrace();

hdfs=null;

y

if(hdfs=null)return;

List<String>configPaths=newLinkedList<String>();

SDatalnputStreamdis=null;

BufferedReaderin=null;

try<_________

if(hdfs.exists(uriPath)){

d±s=hdfs.open(uriPath);

in=newBufferedReader(newInputStreamReader(dis,"LFTF-8"));

Stringline=null;

while((line=in.readLine())!=null){

line=line.trim();

configPaths.addCline):

?

}catch(lOExceptxone)<

e.printStackTrace();

>finally<

if(in!=nulLX

try{

±n.close();

}catch(TOExceptione){

e.printStackTrace();

y

y

_____Hd±s!=_______________________________________________________________

HDFS實踐?HDFSjavaAPI寫操作

//寫入一個文件到HDFS

Conf±gurationconf=newConf±guration();

FileSystemfileS=FileSystem.get(conf);

FileSystemlocaLFile=FileSystem.getLocal(conf);

Pathinput=newPath("/User/ryan/1.txt");

Pathout=newPath("/dara/l.txt");

try<

-FileStatus(]inpirtFiLe=LocalFile.TistStatus(input);

FSDataOutputStreamoutStream=fileS.create(out);

for(inti=0;i<inputFile.length;{

System.out.printLn(inputFile[i]-getPath().getName());

FSDa±aInputS±reamin-locatFilc.opcn(inpu±Filc(i].gc±Pa±h());

bytebuffer[]=newbyte[1024];

intbytesRead=0;

//邊讀邊寫

while((bytesRead=in.read(buffer))=>0){

outStream.write(buffer,0,bytesRead);

in.cTose();

>catch(Exceptione){

e.printStackTrace();

HDFS數(shù)據(jù)采用壓縮,減少磁盤空間占,和傳輸時候的帶寬占用,用(LZO,SNAPPY,GZIP

等),對于可以拆分的數(shù)據(jù)塊大小>=blocksize,不支持分拆文件采用接近blocksize大小

(Mapreduce)<,數(shù)據(jù)分布在各個節(jié)點需要均勻,避免熱點(讀寫性能)

MapReduce經(jīng)驗與教訓(xùn)

數(shù)據(jù)傾斜問題

背景:H前消息統(tǒng)計采用以下流程messageid維度二>taskid_叩pid維度=>1)taskid維度2)

appid維度

問題:由于部分taskid或者部分appid推送量很大,導(dǎo)致以taskid,appid為key的mapreduce

出現(xiàn)部分reducer待處理的數(shù)據(jù)量很多,造成reducer之間處理數(shù)據(jù)量嚴(yán)重不平衡,任務(wù)在最

后99%時候卡在個別reducertask上。

解決:依據(jù)數(shù)據(jù)特性在以taskid,appid為key的mapreduce中添加combiner,讓數(shù)據(jù)在map

完成之后直接在本地機器上進行一次合并,減少數(shù)據(jù)本身數(shù)量級,進而降低部分reducer負(fù)

載過高問題

數(shù)據(jù)大量重復(fù)在reduce中部分日志出現(xiàn)大量重復(fù),但是一個reducer進行行處理的時候出現(xiàn)

內(nèi)存使用開銷很大或者部分reducer出現(xiàn)數(shù)據(jù)傾斜,解決:采用combiner在本地做一次數(shù)據(jù)

去重?zé)?/p>

shuffle階段出現(xiàn)內(nèi)存不足

例如任務(wù)提示:Error:java.lang.OutOfMcmoryError:Javaheapspaceat

org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOulpu(Copier.shuffleInMemory(Red

uceTask.java:1640)

解決://設(shè)置sort完成后reduce計算階段用來緩存數(shù)據(jù)的百分比

conf.setFloat("mapred.job.shuffle.input.buffer.percent",0.30f);

conf.sctFloat("maprcducc.job.shufflc.input.buffcr.pcrccnt",0.30f);

一次計算很多天數(shù)據(jù)占用集群大部分資源并且無法保證任務(wù)成功完成

背景:

標(biāo)注LBS數(shù)據(jù)時,一次性將I個月行為數(shù)據(jù)放入在一個MRJob中執(zhí)行,導(dǎo)致占用掉Hado。

集群大部分資源,和其他任務(wù)搶占資源,執(zhí)行3個小時最后IO超時異常。

解決:

避免蠻力計算,將超大規(guī)模數(shù)據(jù)分拆多次計算,得到降低數(shù)量級后的中間結(jié)果,然后再合并

計算。好處:1、延長時叵,保證程序穩(wěn)定。2、程序即使異常數(shù)據(jù)也不用從頭開始計算,可

以利用中間結(jié)果。

問題:

輸入的數(shù)據(jù)中部分文件不可分割,導(dǎo)致單個reducer負(fù)載過高,task卡住不進行下去。

解決:

I、一個是數(shù)據(jù)在導(dǎo)入之前分割好來

2、轉(zhuǎn)化為可以分割的格式。

Hive簡介

Hive是基于Hadoop的一個數(shù)據(jù)倉庫工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,

并提供簡單的sql查詢功能,可以將sql語句轉(zhuǎn)換為MapReduce任務(wù)進行運行。

優(yōu)點是學(xué)習(xí)成本低,可以通過類SQL語句快速實現(xiàn)簡單的MapReduce統(tǒng)計,不必開發(fā)專門

的M叩Reduce應(yīng)用,十分適合數(shù)據(jù)倉庫的統(tǒng)計分析。

H2c體系結(jié)構(gòu)

Hive實踐

分區(qū)添加

altertableuscr_qucry_poi_qunaraddpartition(day=20141125)location

7data_resul(/lbs/user_query_poi_qunar/20141125,;

統(tǒng)計

useuser;

droptableuser_qu

ROWFORMATDELIMITED

FIELDSTERMINATEDBYT

LOCATION7data_result/lbs/usery_poi_qunar_airport_cid_count;

CREATEEXTERNALTABLEuser_query_poi_qunar_airport_cid_coun((namesiring,count

bigint)

er_query_poi_qunar_airpon_cid_count';

insertoverwritetableuser_queiy_poi_qunar_airport_cid_countselect

count(distinct(cid)),concat(poi_name,"\t",city)fromuser_query_poi_qunarwhereday=20141125

andtag='bt_airport'groupbyconcat(poi_name,"\t",city);

UDF

例如我們有個自定義jar支持md5方法,udf.jar

1、添加jar

hive>addjar/app/yuankai/hadoopjars/gexin_hive_udf.jar;

2、關(guān)聯(lián)內(nèi)置函數(shù)hive>createtemporaryfunctionmd5ascom.igexin.hive.udf.Md5,;

3、執(zhí)行內(nèi)置函數(shù)hivc>sclcctmd5('tcst')fromtestlimit1;

其它內(nèi)置函數(shù)類實現(xiàn)方式:

publicclassXXXUDFextendsUDF{

publicTextevahiate(Object...args){〃實現(xiàn)操作returnnewText("yourresult");}}

comigexinhiveudf

orgapache.hadoophiveqlexec.UDF

orgapachehadoopioText

bl■cSelectlntsalledAppUDF{

Textevaluate(Objectargs){

(args.length4){

//com.tencent.pao;116;20141119;20141119;20141119#com.tencent.qq;112;20141118

String[]apps((String)args[0])spliti"#")

(appslength0)0

Stringpackageld(String)args[1]

startDayLongparseLongt(String)args[2])

endDayLongparseLongt(String)args[3])

(Stringappapps){

//com.tencent.pao;116;20141119;20141119;20141119

String[]arrappsplit。';"1)

(arr.length4){

(packageld.equals(arr[0])){

installed_dayLongparseLongi(String)arr[2])

(installed_daystartDayinstall.ed_dayendDay){

returnneuText(app);

}

)

}

}

}

returnnewTextC,H);

}

}

Hive實踐-遠(yuǎn)程調(diào)用

javasqlSQLException

HiveJdbcClient(

privat*finalStringdriverName"org.apache.hive.jdbc.HiveDriver"

privatefinalst?iStringjdbcUrl-jdbc:hive2://192.168.2.249:19999/defauXf

privatefinalStringjdbcUrl_user"user"

privatefinalstaticStringjdbcurl_pwd"yuanKai”

pu()li'.sf.:i.voidnain(String[]args)thrczSQLException{

tr(

Class.forA/ame(driverName)

)(ClassNotFoundExceptione){

Systemoutprintln(HClassNotFoundException:"egetMessage())

eprintStackTrace()

System.exit(l);

)

ConnectionconDriverManagergetConnection(jdbcUrIjdbcUrl_userjdbcllrl_pwd)

StatementstmtconcreateStatement()

Stringsqlnull;

ResultSet-null;|

stmtexecuteC'usereport_ods_mdp")

StringsubTaskidargs(0]

StringstartActionargs(1)

StringendActionargs(2)

StringstartDayargs(3]

StringendDayargs(4]

sql“selectdistinct(token_md5)fromreport_ods_mdp.msg_aswheretask_id=

subTaskidandaction_id>='*startAction"andaction_id<="endAction

?'andday>="startDay"andday<="endDay

resstmt.executeOuery(sql);

whili(res.next()){

Systemoutprintln(resgetString(l))

〃保存獲取到Cd到對應(yīng)表格中

//...

用戶是不是在某個時間內(nèi)安裝了的內(nèi)置函數(shù)

\addjar/app/yuankai/hadoopjars/hive-udf.jar;

createtemporaryfunctiongx_selectinstallas'com.igexin.hive.udf.SelectlntsallcdApp';

使用舉例:

select*fromuser_app_infowhere

gx.selectinstalKinstalled:'com.tencent.pao\'^OMIII9";'2O141119")!=nulllimiti;

褊一部分:產(chǎn)生背景

產(chǎn)生背景

?為了滿足客戶個性化的需求,Hive被設(shè)計成一個很開放的系統(tǒng),很多內(nèi)容都支持用戶定制,包括:

?文件格式:TextFile.SequenceFile

?內(nèi)存中的數(shù)據(jù)格式:JavaInteger/String,HadoopIntWritable/Text

?用戶提供的map/reduce腳本:不管什么語言,利用stdin/stdout傳輸數(shù)據(jù)

?用戶自定義函數(shù)

自定義函數(shù)

?雖然Hive提供了很多函數(shù),但是有些還是難以滿足我們的需求。因此Hive提供了自定義函數(shù)開發(fā)

?自定義函數(shù)包括三種UDF、JADF、UDTF

?UDF(User-Defined-Functon)

?UDAF(User-DefinedAggregationFuncation)

?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入一行輸出多行(On-to-many

maping)的需求。

HIVE中使用定義的函數(shù)的三種方式

?在HIVE會話中add自定義函數(shù)的jar文件,然后創(chuàng)建function,繼而使用函數(shù)

?在進入HIVE會話之前先自動執(zhí)行創(chuàng)建function,不用用戶手工創(chuàng)建

?把自定義的函數(shù)寫到系統(tǒng)函數(shù)中,使之成為HIVE的一個默認(rèn)函數(shù),這樣就不需要createtemporary

function

第二部分:UDF

UDF用法

?UDF(User-Defined-Function)

?UDF函數(shù)可以直接應(yīng)用于select語句,對查詢結(jié)構(gòu)做格式化處理后,再輸出內(nèi)容

?編寫UDF函數(shù)的時候需要注意一下幾點

?自定義UDF需要繼承org.apache.hadoop.hive.ql.UDF

?需要實現(xiàn)evaluate函數(shù)

?evaluate函數(shù)支持重載

?UDF只能實現(xiàn)一進一出的操作,如果需要實現(xiàn)多進一出,則需要實現(xiàn)UDAF

UDF用法代碼示例

importorg.apache.Hadoop.hive.ql.exec.UDF

publicclassHellowordextendsUDF(

publicStringevaluate(){

return"helloworld!";

}

publicStringevaluate(Stringstr){

return"helloworld:"+str;

}

)

開發(fā)步驟

?開發(fā)代碼

?把程序打包放到目標(biāo)機器上去

?進入hive客戶端

?添加jar包:hive>addjar/run/jar/udf_test.jar;

?創(chuàng)建臨時函數(shù):hive〉CREATETEMPORARYFUNCTIONmy_addAS'com.hive.udf.Add'

?查詢HQL語句:

?SELECTmy_add(8,9)FROMscores;

?SELECTmy_add(scores.math,scores.art)FROMscores;

?銷毀臨時函數(shù):hive>DROPTEMPORARYFUNCTIONmy_add;

?細(xì)節(jié)

?在使用UDF的時候,會自動進行類型轉(zhuǎn)換,例如:

SELECTmy_add(8,9.1)FROMscores;

?結(jié)果是17.1,UDF將類型為Int的參數(shù)轉(zhuǎn)化成double。類型的飲食轉(zhuǎn)換是通過UDFResolver來進行

控制的

第三部分:UDAF

UDAF

?Hive查詢數(shù)據(jù)時,有些聚類函數(shù)在HQL沒有自帶,需要用戶自定義實現(xiàn)

?用戶自定義聚合函數(shù):Sum,Averagen-1

?UDAF(User-DefinedAggregationFuncation)

用法

??下兩個包是必須的importorg.apache.hadoop.hive.ql.exec.UDAF和

org.apache,hadoop.hive.ql.exec.UDAFEvaluator

開發(fā)步驟

?函數(shù)類需要繼承UDAF類,為部類Evaluator實UDAFEvaluator接口

?Evaluator"需要實現(xiàn)init、iterate、terminatePartiaKmerge>terminate這幾個函數(shù)

a)init函數(shù)實現(xiàn)接口UDAFEvaluator的init函數(shù)。

b)iterate接收傳入的參數(shù),并進行內(nèi)部的輪轉(zhuǎn)。其返回類型為boolean。

c)terminatePartial無參數(shù),其為iterate函數(shù)輪轉(zhuǎn)結(jié)束后,返回輪轉(zhuǎn)數(shù)據(jù),terminatePartial類似于

hadoop的Combiner,

d)merge接收terminatePartial的返回結(jié)果,進行數(shù)據(jù)merge操作,其返回類型為boolean。

e)terminate返回最終的聚親函數(shù)結(jié)果。

執(zhí)行步驟

?執(zhí)行求平均數(shù)函數(shù)的步驟

a)將java文件編譯成Avg_test.jar。

b)進入hive客戶端添加jar包:

hive>addjar/run/jar/Avg_test.jaro

c)創(chuàng)建臨時函數(shù):

hive>createtemporaryfunctionavg_test'hive.udaf.Avg';

d)查詢語句:

hivoselectavg_test(scoies.math)fromscores;

e)銷毀臨時函數(shù):

hive>droptemporaryfunctionavg_test;

UDAF代碼示例

publicclassMyAvgextendsUDAF{

publicstaticclassAvgEvauatorimplementsUDAFEvalustor(

}

publicvoidinit()

publicbooleaniterate(Doubleo){}

publicAvgStateterminatePartial(){}

publicbooleanterminateFartial(Doubleo){}

publicDoubleterminate(){}

)

第四部分:UDTF

UDTF

?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入?行輸出多行(On-to-many

maping)的需求。

開發(fā)步驟

?UDTF步驟:

?必須繼承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF

?實現(xiàn)initialize,process,close三個方法

?UDTF首先會

?調(diào)用initialize方法,此方法返回UDTF的返回行的信息(返回個數(shù),類型)

初始化完成后,會調(diào)用process方法,對傳入的參數(shù)進行處理,可以通過forword。方法把結(jié)果返回

?最后close。方法調(diào)用,對辭要清理的方法進行清理

使用方法

?UDTF有兩種使用方法,一種直接放到select后面,一種和lateralview一起使用

?直接select中使用:selectexplode_map(properties)as(coll,col2)fromsrc;

?不可以添加其他字段使用:selecta,explode_map(properties)as(coll,col2)fromsrc

?不可以嵌套調(diào)用:selectexplode_map(explode_map(properties))fromsrc

?不可以和groupby/clusterby/distributeby/sortby一起使用:selectexplode_map(properties)

as(collzcol2)fromsrcgroupbycoll,col2

?和lateralview一起使用:selectsrc.id,mytable.coll,mytable.col2fromsrclateralview

explode_map(properties)mytableascoll,col2;

此方法史為方便H常使用。執(zhí)行過程相當(dāng)于單獨執(zhí)行了兩次抽取,然后union到一個表里。

lateralview

?LateralView語法

?lateralview:LATERALVIEWudtf(expression)tableAliasAScolumnAliascolumnAlias)*

fromClause:FROMbaseTable(lateralview)*

?LateralView用于UDTF(user-definedtablegeneratingfunctions)中將行轉(zhuǎn)成列,例如explode().

?目前LateralView不支持有上而下的優(yōu)化。如果使用Where子句,查詢可能將不被編譯。解決方法見:

此時,在查詢之前執(zhí)行sethive.optimize.ppd=false;

?例子

?pageAdy。它有兩個列

stringpageidArray<int>adidlist

"front_page"[1,2,3]

"contact_page"[3,4,5]

?SELECTpageid,adidFROMpageAdsLATERALVIEWexplode(adidjist)adTableASadid;

?將輸出如下結(jié)果

stringpageidintadid

"front_page"1

Mcontact_page"3

代碼示例

publicclassMyUDTFextendsGenericUDTF{

publicStructObjectlnspectorinitialize(ObjectInspector[]args){}

publicvoidprocess(Object[]args)throwsHiveException(}

}

實現(xiàn):切分"key:value;key:value;”這種字符串,返回結(jié)果為key,value兩個字段

Hive使用經(jīng)驗

/結(jié)果顯示設(shè)置列名

sethive.cli.print.header=true;

〃壓縮設(shè)置

seterniediate=true;

setprcss.output=false;

〃設(shè)置并行開啟和數(shù)量

sethive.exec.parallel=true;

sethive.exec.parallel.thread.number=10;

1)使用外表,避免drop的時候?qū)⒃磾?shù)據(jù)刪除。

2)對于較大兩個表格進行關(guān)聯(lián)查詢的時候,盡可能將數(shù)據(jù)預(yù)先處理

(例如篩選到單獨表格),然后再join。

3)在數(shù)據(jù)入庫的時候,依照時間或者某個屬性進行分區(qū),方便查詢

時能夠在一個分區(qū)內(nèi)檢索,提高效率。

4)復(fù)雜字段使用自定義內(nèi)置函數(shù),避免超級復(fù)雜的sql。

分桶

CREATETABLEbucketeduser(idINT)nameSTRING)

CLUSTEREDBY(id)INTO4BUCKETS;

對于每一個表(table)或者分區(qū),Hive可以進一步組織成桶,也就是說桶是更為細(xì)粒度的

數(shù)據(jù)范圍劃分。Hive也是針對某一列進行桶的組織。Hive采用對列值哈希,然后除以桶的

個數(shù)求余的方式?jīng)Q定該條記錄存放在哪個桶當(dāng)中。

把表(或者分區(qū))組織成桶(Bucket)有兩個理由:

(1)獲得更高的查詢處理效率。桶為表加上了額外的結(jié)構(gòu),Hive在處理有些查詢時能利用

這個結(jié)構(gòu)。具體而言,連接兩個在(包含連接列的)相同列上劃分了桶的表,可以使用M叩端

連接(Map-sidejoin)高效的實現(xiàn)。比如JOIN操作。對于JOIN操作兩個表有一個相同的

列,如果對這兩個表都進行了桶操作。那么將保存相同列值的桶進行JOIN操作就可以,可

以大大較少JOIN的數(shù)據(jù)量。

(2)使取樣(sampling)更高效。在處理大規(guī)模數(shù)據(jù)集時,在開發(fā)和修改查詢的階段,如

果能在數(shù)據(jù)集的一小部分?jǐn)?shù)據(jù)上試運行查詢,會帶來很多方便。

分區(qū)

分區(qū)是以字段的形式在表結(jié)構(gòu)中存在,通過describetable命令可以查看到字段存在,但是

該字段不存放實際的數(shù)據(jù)內(nèi)容,僅僅是分區(qū)的表示(偽列)。

(1)靜態(tài)分區(qū)

createtableifnotexistssopdm.wyp2(idstring,telstring)

partitionedby(ageint)

rowformatdelimited

fieldsterminatedby

storedastextfile;

-overwrite是覆蓋,into是追加

insertintotablesopdm.w/p2

partition(age='25')

selectid,name,telfromsopdm.wyp;

(2)動態(tài)分區(qū)

--設(shè)置為true表示開啟動態(tài)分區(qū)功能(默認(rèn)為false)

sethive.exec.dynamic.partition=true;

-設(shè)置為nonstrict,表示允許所有分區(qū)都是動態(tài)的(默認(rèn)為strict)

sethive.exec.dynamic.partition.mode=nonstrict;

-insertoverwrite是覆蓋,insertinto是追加

sethive.exec.dynamic.partition.mode=nonstrict;

insertoverwritetablesopdm.wyp2

partition(age)

selectid,name,tel,agefromsopdm.wyp;

Hive服務(wù):

hive命令行模式,直接輸入#/hive/bin/hive的執(zhí)行程序,或者輸

入#hive-servicecli

2、hiveweb界面的(端口號9999)啟動方式

#hive-servicehwi&

用于通過瀏覽器來訪問hive

http://hadoopO:9999/hwi/

3、hive遠(yuǎn)程服務(wù)(端口號10000)啟動方式(java訪問必須打開)

#hive一servicehiveserver&

與數(shù)據(jù)庫中的Table在概念上是類似

每一個Table在Hive中都有一個相應(yīng)的目錄存儲數(shù)據(jù)。例如,一個表test,它

在HDFS中的路徑為:/warehouse/testowarehouse是在hive-site.xml中

由${hive.metastore,warehouse,dir}

指定的數(shù)據(jù)倉庫的目錄所有的Table數(shù)據(jù)(不包括ExternalTable)都保存在這個目

錄中。刪除表時,元數(shù)據(jù)與數(shù)據(jù)都會被刪除

內(nèi)部表和外部表的區(qū)別:

內(nèi)部表:的創(chuàng)建過程和數(shù)據(jù)加載過程(這兩個過程可以在同一個語句中完成),在加載數(shù)據(jù)

的過程中,實際數(shù)據(jù)會被移動到數(shù)據(jù)倉庫目錄中;之后對數(shù)據(jù)對訪問將會直接在數(shù)據(jù)倉庫目

錄中完成。刪除表時,表中的數(shù)據(jù)和元數(shù)據(jù)將會被同時刪

除。

外部表:只有一個過程,加載數(shù)據(jù)和創(chuàng)建表同時完成,并不會移動到數(shù)據(jù)倉庫H錄中,只是

與外部數(shù)據(jù)建立一個鏈接,當(dāng)刪除一個外部表時,僅刪除該鏈接

HiveSql:

創(chuàng)建內(nèi)部表createtableinner_table(keystring);〃如果不指定分隔符,默

認(rèn)分隔符為

createtableinner_table(keystring)rowformatdelimitedfieldsterm

inatedby〃指定分隔符

createtableinner_table(keystring)rowformatdelimitedfieldsterm

inatedbystoredasSEQUENCEFILE;〃用哪種方式存儲數(shù)據(jù),SEQUENCEFILE

是hadoop自帶的文件壓縮格式

查看表結(jié)構(gòu)

describeinner_table;或者descinner_table;

加載數(shù)據(jù)

loaddatalocalinpath*/root/test,txt'intotableinnertable;

loaddatalocalinpath1/root/test.txt*overwi'iterintotableinnertab

le;〃數(shù)據(jù)有誤,重新加載數(shù)據(jù)

查看數(shù)據(jù)select*frominner_table

selectcount(*)frominnertable

刪除表droptableinrer_table

重命名表altertableinnertablerenametonew_table_name;

修改字段altertableinnertablechangekeykey1;

添加字段altertableinner_tableaddcolumns(valuestring);

分區(qū)相關(guān):

創(chuàng)建分區(qū)表

createtablepartition_tab1e(namestring,salaryfloat,genderstring,level

string)partitionedby(dtstring,depstring)rowformatdelimitedfie

Idsterminatedbystoredastextfile;

查看表有哪些分區(qū)SHOWPARTITIONSpartition.table;

查看表結(jié)構(gòu)describepartitiontable;或

者dosepartitiontable;

加載數(shù)據(jù)到分區(qū)

loaddatalocalinp

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論