版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)
文檔簡介
分布式系統(tǒng)組件介紹
Hadoop原理
分為HDFS與Yarn兩個部分。HDFS有Namenode和Datanode兩個部分。每個節(jié)點占
用一個電腦。Datanode定時向Namenode發(fā)送心跳包,心跳包中包含Datanode的校驗等信
息,用來監(jiān)控Datanode。HDFS將數(shù)據(jù)分為塊,默認(rèn)為64M每個塊信息按照配置的參數(shù)分
別備份在不同的Datanode,而數(shù)據(jù)塊在哪個節(jié)點上,這些信息都存儲到Narnenode上面cYarn
是MapReduce2,可以集成更多的組件,如spark>mpi等。MapReduce包括Jobtraker與
Tasktrakcr兩個部分。其中JobTrakcr是在主節(jié)點上,負(fù)責(zé)整體的調(diào)度。Tasktrakcr在slave
節(jié)點上,當(dāng)提交任務(wù)后,將其交給Jobtraker進行調(diào)度,調(diào)度到該任務(wù)之后就會將jar包發(fā)送
到響應(yīng)的Tasktraker,以實現(xiàn)分布式中移動計算資源而非移動數(shù)據(jù)。因為這些任務(wù)都是并發(fā)
執(zhí)行的,所以每個任務(wù)需要調(diào)用哪個節(jié)點的數(shù)據(jù)必須非常的明確,通常這一部分是通過
Jobtraker進行指揮。
在MapReduce的開始,每個block作為一個Map輸入,Map輸入后就進入shuffle階段。
Shuffle階段主:要分為Map和Reduce兩個階段。在MapShuffle階段,Map輸入按用戶的程
序進行處理,生成的結(jié)果按key排序,然后進入內(nèi)存,溢出后形成一個spill寫入磁盤,這
樣多個spill在這個節(jié)點中進行多路歸并排序(勝者樹)形成一個排序文件寫入磁盤,這期
間那些Map文件交給那些節(jié)點處理由Jobtraker進行調(diào)度,最后生成一個大排序的文件,并
且刪除spilL之后,再將多個節(jié)點上已經(jīng)排序的文件進行行多路歸并排序(一個大文件N
分到n個節(jié)點,每個節(jié)點分為k個spill,?個spill長度s,時間復(fù)雜度是N(logn(n個節(jié)點
多路歸并排序)+k)gk(每個節(jié)點內(nèi)k個spill排序)-logs(每個spill內(nèi)部進行排序)),
N=nks,所以最后的復(fù)雜度亦是NlogN)。
完成MapShuffle階段后通知Jobtraker進入ReduceShuffle階段。在這個階段,因為已
經(jīng)排序,很容易將用戶的程序直接作用到相同key的數(shù)據(jù)上,這些數(shù)據(jù)作為Reduce的輸入
進行處理,最終將輸出的結(jié)果數(shù)據(jù)寫入到HDFS上,并刪除磁盤數(shù)據(jù)。Map一般多,Reduce
少,所以通過Hash的方法將Map文件映射到Reduce上,進行處理,這個階段叫做Pa(ition<,
為了避免譬如所有數(shù)據(jù)相加這種操作使得數(shù)據(jù)負(fù)載移動的數(shù)量少的Reduce階段,造成效率
低下的結(jié)果,我們可以在在MapShuffle階段加一個Combine階段,這個Combine是在每一
臺節(jié)點上將已經(jīng)排序好的文件進行一次Reduce并將結(jié)果作為ReduceShuffle階段的輸入,
這樣可以大大減少數(shù)據(jù)的輸入量。通常Reduce的個數(shù)通過用戶來指定,通常和CPU個數(shù)相
適應(yīng)才能使其效率達到最大。
HBase原理
Hbase是列存儲數(shù)據(jù)庫。其存儲的組織結(jié)構(gòu)就是將相同的列族存儲在?起,因此得名的。
Hbase存儲有行鍵,作為唯一標(biāo)識,列表示為〈列族〉:〈列〉存儲信息,如address:city,address:
provice,然后是時間戳。
Hbase物理模型中,有一個總結(jié)點HMaster,通過其自帶的zookeeper與客戶端相連接。
Hbse作為分布式每一個節(jié)點作為一個RegionServer,維乎Region的狀態(tài)和管理。Region是
數(shù)據(jù)管理的基本單位。最初只有一-個,通過擴充后達到閾值然后分裂,通過Server控制其
規(guī)模。在RegionServer中,每一個store作為一個列族。當(dāng)數(shù)據(jù)插入進來,新數(shù)據(jù)成為
Memstore,寫入內(nèi)存,當(dāng)Memstore達到閾值后,通過Flashcache進程將數(shù)據(jù)寫入storeFile,
也就是當(dāng)內(nèi)存數(shù)據(jù)增多后溢出成一個SloreFile寫入磁盤,這里和Hadoop的spill類似,這
據(jù)后一并寫入。當(dāng)StoreFile數(shù)量過多時,進行合并,將形成一個大的StoreFile并且刪
除掉原來的StoreFile。再當(dāng)SloreFile大小超過一定閾值后,分裂成Region。
HBase有一個ROOT表和META表。META表記錄用戶Region的信息,但是隨著數(shù)據(jù)
增多,META也會增大,進而分裂成多個Region,那我們用ROOT表記錄下META的信
息,是一個二級表,而zookeeper中記錄ROOT表的locationo當(dāng)我們需找找到一條信息時,
先去zookccpcr看找ROOT,從ROOT中查找META找到META位置,在進入META表中
尋找該數(shù)據(jù)所在Region,再讀取該Region的信息。HBase適合大量插入乂同時讀的情況,
其瓶頸是硬盤的傳輸速度而不再是像Oracle一樣瓶頸在硬盤的尋道速度。
Zookeeper原理
Zookeepcr是一個資源管理庫,對節(jié)點進行協(xié)調(diào)、通信、失敗處理、節(jié)點損壞的處理等,
是一個無中心設(shè)計,主節(jié)點通過選舉產(chǎn)生。Zookeeper的節(jié)點是Znode。每一個節(jié)點可以存
放1M的數(shù)據(jù),client訪問服務(wù)器時創(chuàng)建一個Znode,可以是短暫的Znode,其上可以放.上觀
察Waicher對node進行監(jiān)控。Zookeeper有高可用性,每個機器復(fù)制一份數(shù)據(jù),只要有一般
以上的機器可以正常的運行,整個集群就可以工作。比如6臺的集群容忍2臺斷開,超過兩
臺達到一般的數(shù)最就不可以,因此集群通常都是奇數(shù)來節(jié)約資源。
Zookeeper使用zab協(xié)議,是一個無中心協(xié)議,通過選舉的方式產(chǎn)生leader,通過每臺
機器的信息擴散選舉最閑的資源利用較少的節(jié)點作為主控。同時當(dāng)配置數(shù)據(jù)有更改更新時,
在每個節(jié)點上有配置watcher并觸發(fā)讀取更改,。因此能?哆保證一致性。每個節(jié)點通過leader
廣播的方式,使所有follower同步。
Zookeeper可以實現(xiàn)分布式鎖機制。通過waicher監(jiān)控,對每個Znode的鎖都有一個獨
一的編號,按照序號的大小比較,來分配鎖。當(dāng)一個暫時Znode完結(jié)后刪除本節(jié)點,通知
leader完結(jié),之后下一個Znode獲取鎖進行操作。
Kafka原理
Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由Linkedln公司開發(fā),之后成為Apache項
目的一部分。Kafka是一種快速、可擴展的、設(shè)計內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提
交日志服務(wù)。它被設(shè)計為一個分布式系統(tǒng),易于向外擴展;它何時為發(fā)布和訂閱提供高吞吐
量;它支持多訂閱者,當(dāng)失敗時能自動平衡消費者;它將消息持久化到磁盤,因此可用于批
量消費,例如ETL,以及實時應(yīng)用程序。broker和生產(chǎn)者、消費者各自都是集群,集群中的
各個實例他們之間是對等的,集群擴充節(jié)點很方便。
Kafka的基本概念包括話題、生產(chǎn)者、消費者、代理或者kafka集群。話題是特定類型
的消息流。消息是字節(jié)的有效負(fù)載,話題是消息的分類名或種子名。生產(chǎn)者是能夠發(fā)布消息
到話題的任何對象。已發(fā)布的消息保存在?組服務(wù)器中,它們被稱為代理或Kafka集群。消
費者可以訂閱一個或多個話題,并從Brokei?拉數(shù)據(jù),從而消費這些已發(fā)布的消息、。
Kafka的存儲布局非常簡單。話題的每個分區(qū)對應(yīng)一個邏輯日志。物理上,一個日志為
相同大小的一組分段文件,每次生產(chǎn)者發(fā)布消息到一個分區(qū),代理就將消息追加到最后一個
段文件中。當(dāng)發(fā)布的消息數(shù)量達到設(shè)定值或者經(jīng)過一定的時間后,段文件真正寫入磁盤中。
寫入完成后,消息公開給消費者。段文件機制和Hadoop中spill類似。消費者始終從特定分
區(qū)順序地獲取消息,如果消費者知道特定消息的偏移量,也就說明消費者已經(jīng)消費了之前的
所有消息。消費者向代理發(fā)出異步拉請求,準(zhǔn)備字節(jié)緩沖區(qū)用于消費。每個異步拉請求都包
含要消費的消息偏移量與其它消息系統(tǒng)不同,Kafka代理是無狀態(tài)的。這意味著消費者必須
維護已消費的狀態(tài)信息。這些信息由消費者自己維護,代理完全不管。消費者可以故意倒回
到老的偏移量再次消費數(shù)據(jù)。這違反了隊列的常見約定,但被證明是許多消費者的基本特征。
kaflca的broker在配置文件中可以配置最多保存多少小時的數(shù)據(jù)和分區(qū)最大的空間占
用,過期的和超量的數(shù)據(jù)會被broker自動清除掉。
Kafka會記錄offset到zk,同時又在內(nèi)存中維護offset,允許快速的checkpoint,如果
consumer比partition多是浪費,因為kafka不允許partition上并行consumer讀取。同時,
consumer比partition少,一個consumer會對應(yīng)多個partition,有可能導(dǎo)致partition中數(shù)據(jù)的
讀取不均勻,也不能保證數(shù)據(jù)間的順序性,kafka只有在一個partition讀取的時候才能保證
時間上是有順序的。增加partition或者consumer或者broker會導(dǎo)致rebalance,所以rebalance
后consumer對應(yīng)的partition會發(fā)生變化。
Spark原理
spark可以很容易和yam結(jié)合,直接調(diào)用HDFS、Hbase上面的數(shù)據(jù),和hadoop結(jié)合。
配置很容易。spark發(fā)展迅猛,框架比hadoop更加靈活實用。減少了延時處理,提高性能效
率實用靈活性。也可以與hadoop切實相互結(jié)合。
spark核心部分分為RDD。SparkSQL、SparkStreamingsMLlib、GraphX、SparkR等
核心組件解決了很多的大數(shù)據(jù)問題,其完美的框架日受歡迎。其相應(yīng)的生態(tài)環(huán)境包括zepplin
等可視化方面,正日益壯大。大型公司爭相實用spark來代替原有hadoop上相應(yīng)的功能模
塊。Spark讀寫過程不像hadoop溢出寫入磁盤,都是基于內(nèi)存,因此速度很快。另外DAG
作業(yè)調(diào)度系統(tǒng)的寬窄依賴讓Spark速度提高。
RDD是彈性分布式數(shù)據(jù)也是spark的核心,完全彈性的,如果數(shù)據(jù)丟失一部分還可以重
建。有自動容錯、位置感知調(diào)度和可伸縮性,通過數(shù)據(jù)檢杳點和記錄數(shù)據(jù)更新金象容錯性檢
查。通過SparkConlexl.texlFile。加載文件變成RDD,然后通過Iransformalion構(gòu)建新的RDD,
通過action將RDD存儲到外部系統(tǒng)。
RDD使用延遲加載,也就是懶加載,只有當(dāng)用到的時候才加載數(shù)據(jù)。如果加載存儲所
有的中間過程會浪費空間,因此要延遲加載。一旦spark看到整個變換鏈,他可以計算僅需
的結(jié)果數(shù)據(jù),如果下面的函數(shù)不需要數(shù)據(jù)那么數(shù)據(jù)也不會再加載。轉(zhuǎn)換RDD是惰性的,只
有在動作中才可以使用它們。
Spark分為driver和executor,driver提交作業(yè),executor是application早worknodc上的
進程,運行task,driver對應(yīng)為sparkcontext。Spark的RDD操作有iransformalion、action
Transformation對RDD進行依賴包裝,RDD所對應(yīng)的依賴都進行DAG的構(gòu)建并保存,在
worknode掛掉之后除了通過備份恢復(fù)還可以通過元數(shù)據(jù)對其保存的依賴再計算一次得到。
當(dāng)作業(yè)提交也就是調(diào)用runJob時,spark會根據(jù)RDD構(gòu)建DAG圖,提交給DAGScheduler,
這個DAGSchcduler是在SparkContext創(chuàng)建時一同初始化的,他會對作業(yè)進行調(diào)度處理。當(dāng)
依賴圖構(gòu)建好以后,從action開始進行解析,每一個操作作為一個task,每遇到shuffle就
切割成為一個taskSet,并把數(shù)據(jù)輸出到磁盤,如果不是shuffle數(shù)據(jù)還在內(nèi)存中存儲。就這
樣再往前推進,直到?jīng)]有算子,然后運行從前面開始,如果沒有action的算子在這里不會執(zhí)
行,直到遇到action為止才開始運行,這就形成了spark的懶加載,taskset提交給TaskSheduler
生成TaskSetManager并且提交給Executor運行,運行結(jié)束后反饋給DAGScheduler完成一個
taskSet,之后再提交下一個,當(dāng)TaskSet運行失敗時就返回DAGScheduler并重新再次創(chuàng)建。
一個job里面可能有多個TaskSet,一個application可能包含多個job。
HDFS介紹
HDFS(HadoopDistributedFileSystem,Hadoop分布式文件系統(tǒng)),它是一個高度容錯性的
系統(tǒng),適合部署在廉價的磯器上。HDFS能提供高吞吐量的數(shù)據(jù)訪問,適合那些有著超大數(shù)
據(jù)集(largedataset)的應(yīng)用程序。
HDFS的設(shè)計特點是:
1、大數(shù)據(jù)文件。
2、文件分塊存儲,HDFS會將一個完整的大文件平均分塊存儲到不同計算器上,讀寫效率
高。
3、流式數(shù)據(jù)討問,一次寫入多次讀寫,不支持修改(hadoop0.21之前支持Append),
4、廉價硬件,HDFS可以應(yīng)用在普通PC機上。
5、硬件故障,HDFS認(rèn)為所有計算機都可能會出問題,通過多個副本解決。
RacklRack2
HDFS讀寫文件流程-讀
2:獷bkxklocations
1:openDistributedNameNode
HDFSFHeSystem
dient3:read
FSOatanamenode
Inputstream
clientJVM
dientnode
4:read5:read
▼
DataNodeDataNodeDataNode
datanodedatanodedatanode
HDFS讀寫文件流程-寫
HDFS常見操作:
hadoopfs-Is/data/查看所有文件
hadoopfs-du-h/data/列出匹配pattern的指定的文件系統(tǒng)空間總量(單位bytes),等
價于unix下的針對目錄的du
hadoopfs-chmod[-R]777/data/修改文件的權(quán)限,-上標(biāo)記遞歸修改。
hadoopfs-mkdir/data/在指定位置創(chuàng)建目錄。
hadoopfs-putxxx.log/data/從本地系統(tǒng)拷貝文件到DFS。
hadoopfs-rmrZdala/xxx.log歸刪掉所有的文件和目錄,等價于unix下的rm-rf<src>o
hadoopfs-get/data/xxx.log從DFS拷貝文件到本地文彳匕系統(tǒng)
hadoopfs-getmerge/data/*Jogdata.log顧名思義,從DFS拷貝多個文件、合并排序為一個
文件到本地文件系統(tǒng)。
hadoopfs-mv<src><dsl>:將制定格式的文件move到指定的目標(biāo)位置。當(dāng)src為多個文
件時,dst必須是個目錄
HDFS實踐?HDFSjavaAPI讀操作
Configurationconf=newConfiguration();
PathuriPath=newPath(uri);
try(
hdfs=FileSystem.get(|uriPath|.toUri(),conf);
ycatch(lOExceptxone)<
e.printStackTrace();
hdfs=null;
y
if(hdfs=null)return;
List<String>configPaths=newLinkedList<String>();
SDatalnputStreamdis=null;
BufferedReaderin=null;
try<_________
if(hdfs.exists(uriPath)){
d±s=hdfs.open(uriPath);
in=newBufferedReader(newInputStreamReader(dis,"LFTF-8"));
Stringline=null;
while((line=in.readLine())!=null){
line=line.trim();
configPaths.addCline):
?
}catch(lOExceptxone)<
e.printStackTrace();
>finally<
if(in!=nulLX
try{
±n.close();
}catch(TOExceptione){
e.printStackTrace();
y
y
_____Hd±s!=_______________________________________________________________
HDFS實踐?HDFSjavaAPI寫操作
//寫入一個文件到HDFS
Conf±gurationconf=newConf±guration();
FileSystemfileS=FileSystem.get(conf);
FileSystemlocaLFile=FileSystem.getLocal(conf);
Pathinput=newPath("/User/ryan/1.txt");
Pathout=newPath("/dara/l.txt");
try<
-FileStatus(]inpirtFiLe=LocalFile.TistStatus(input);
FSDataOutputStreamoutStream=fileS.create(out);
for(inti=0;i<inputFile.length;{
System.out.printLn(inputFile[i]-getPath().getName());
FSDa±aInputS±reamin-locatFilc.opcn(inpu±Filc(i].gc±Pa±h());
bytebuffer[]=newbyte[1024];
intbytesRead=0;
//邊讀邊寫
while((bytesRead=in.read(buffer))=>0){
outStream.write(buffer,0,bytesRead);
in.cTose();
>catch(Exceptione){
e.printStackTrace();
HDFS數(shù)據(jù)采用壓縮,減少磁盤空間占,和傳輸時候的帶寬占用,用(LZO,SNAPPY,GZIP
等),對于可以拆分的數(shù)據(jù)塊大小>=blocksize,不支持分拆文件采用接近blocksize大小
(Mapreduce)<,數(shù)據(jù)分布在各個節(jié)點需要均勻,避免熱點(讀寫性能)
MapReduce經(jīng)驗與教訓(xùn)
數(shù)據(jù)傾斜問題
背景:H前消息統(tǒng)計采用以下流程messageid維度二>taskid_叩pid維度=>1)taskid維度2)
appid維度
問題:由于部分taskid或者部分appid推送量很大,導(dǎo)致以taskid,appid為key的mapreduce
出現(xiàn)部分reducer待處理的數(shù)據(jù)量很多,造成reducer之間處理數(shù)據(jù)量嚴(yán)重不平衡,任務(wù)在最
后99%時候卡在個別reducertask上。
解決:依據(jù)數(shù)據(jù)特性在以taskid,appid為key的mapreduce中添加combiner,讓數(shù)據(jù)在map
完成之后直接在本地機器上進行一次合并,減少數(shù)據(jù)本身數(shù)量級,進而降低部分reducer負(fù)
載過高問題
數(shù)據(jù)大量重復(fù)在reduce中部分日志出現(xiàn)大量重復(fù),但是一個reducer進行行處理的時候出現(xiàn)
內(nèi)存使用開銷很大或者部分reducer出現(xiàn)數(shù)據(jù)傾斜,解決:采用combiner在本地做一次數(shù)據(jù)
去重?zé)?/p>
shuffle階段出現(xiàn)內(nèi)存不足
例如任務(wù)提示:Error:java.lang.OutOfMcmoryError:Javaheapspaceat
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOulpu(Copier.shuffleInMemory(Red
uceTask.java:1640)
解決://設(shè)置sort完成后reduce計算階段用來緩存數(shù)據(jù)的百分比
conf.setFloat("mapred.job.shuffle.input.buffer.percent",0.30f);
conf.sctFloat("maprcducc.job.shufflc.input.buffcr.pcrccnt",0.30f);
一次計算很多天數(shù)據(jù)占用集群大部分資源并且無法保證任務(wù)成功完成
背景:
標(biāo)注LBS數(shù)據(jù)時,一次性將I個月行為數(shù)據(jù)放入在一個MRJob中執(zhí)行,導(dǎo)致占用掉Hado。
集群大部分資源,和其他任務(wù)搶占資源,執(zhí)行3個小時最后IO超時異常。
解決:
避免蠻力計算,將超大規(guī)模數(shù)據(jù)分拆多次計算,得到降低數(shù)量級后的中間結(jié)果,然后再合并
計算。好處:1、延長時叵,保證程序穩(wěn)定。2、程序即使異常數(shù)據(jù)也不用從頭開始計算,可
以利用中間結(jié)果。
問題:
輸入的數(shù)據(jù)中部分文件不可分割,導(dǎo)致單個reducer負(fù)載過高,task卡住不進行下去。
解決:
I、一個是數(shù)據(jù)在導(dǎo)入之前分割好來
2、轉(zhuǎn)化為可以分割的格式。
Hive簡介
Hive是基于Hadoop的一個數(shù)據(jù)倉庫工具,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,
并提供簡單的sql查詢功能,可以將sql語句轉(zhuǎn)換為MapReduce任務(wù)進行運行。
優(yōu)點是學(xué)習(xí)成本低,可以通過類SQL語句快速實現(xiàn)簡單的MapReduce統(tǒng)計,不必開發(fā)專門
的M叩Reduce應(yīng)用,十分適合數(shù)據(jù)倉庫的統(tǒng)計分析。
H2c體系結(jié)構(gòu)
Hive實踐
分區(qū)添加
altertableuscr_qucry_poi_qunaraddpartition(day=20141125)location
7data_resul(/lbs/user_query_poi_qunar/20141125,;
統(tǒng)計
useuser;
droptableuser_qu
ROWFORMATDELIMITED
FIELDSTERMINATEDBYT
LOCATION7data_result/lbs/usery_poi_qunar_airport_cid_count;
CREATEEXTERNALTABLEuser_query_poi_qunar_airport_cid_coun((namesiring,count
bigint)
er_query_poi_qunar_airpon_cid_count';
insertoverwritetableuser_queiy_poi_qunar_airport_cid_countselect
count(distinct(cid)),concat(poi_name,"\t",city)fromuser_query_poi_qunarwhereday=20141125
andtag='bt_airport'groupbyconcat(poi_name,"\t",city);
UDF
例如我們有個自定義jar支持md5方法,udf.jar
1、添加jar
hive>addjar/app/yuankai/hadoopjars/gexin_hive_udf.jar;
2、關(guān)聯(lián)內(nèi)置函數(shù)hive>createtemporaryfunctionmd5ascom.igexin.hive.udf.Md5,;
3、執(zhí)行內(nèi)置函數(shù)hivc>sclcctmd5('tcst')fromtestlimit1;
其它內(nèi)置函數(shù)類實現(xiàn)方式:
publicclassXXXUDFextendsUDF{
publicTextevahiate(Object...args){〃實現(xiàn)操作returnnewText("yourresult");}}
comigexinhiveudf
orgapache.hadoophiveqlexec.UDF
orgapachehadoopioText
bl■cSelectlntsalledAppUDF{
Textevaluate(Objectargs){
(args.length4){
//com.tencent.pao;116;20141119;20141119;20141119#com.tencent.qq;112;20141118
String[]apps((String)args[0])spliti"#")
(appslength0)0
Stringpackageld(String)args[1]
startDayLongparseLongt(String)args[2])
endDayLongparseLongt(String)args[3])
(Stringappapps){
//com.tencent.pao;116;20141119;20141119;20141119
String[]arrappsplit。';"1)
(arr.length4){
(packageld.equals(arr[0])){
installed_dayLongparseLongi(String)arr[2])
(installed_daystartDayinstall.ed_dayendDay){
returnneuText(app);
}
)
}
}
}
returnnewTextC,H);
}
}
Hive實踐-遠(yuǎn)程調(diào)用
javasqlSQLException
HiveJdbcClient(
privat*finalStringdriverName"org.apache.hive.jdbc.HiveDriver"
privatefinalst?iStringjdbcUrl-jdbc:hive2://192.168.2.249:19999/defauXf
privatefinalStringjdbcUrl_user"user"
privatefinalstaticStringjdbcurl_pwd"yuanKai”
pu()li'.sf.:i.voidnain(String[]args)thrczSQLException{
tr(
Class.forA/ame(driverName)
)(ClassNotFoundExceptione){
Systemoutprintln(HClassNotFoundException:"egetMessage())
eprintStackTrace()
System.exit(l);
)
ConnectionconDriverManagergetConnection(jdbcUrIjdbcUrl_userjdbcllrl_pwd)
StatementstmtconcreateStatement()
Stringsqlnull;
ResultSet-null;|
stmtexecuteC'usereport_ods_mdp")
StringsubTaskidargs(0]
StringstartActionargs(1)
StringendActionargs(2)
StringstartDayargs(3]
StringendDayargs(4]
sql“selectdistinct(token_md5)fromreport_ods_mdp.msg_aswheretask_id=
subTaskidandaction_id>='*startAction"andaction_id<="endAction
?'andday>="startDay"andday<="endDay
resstmt.executeOuery(sql);
whili(res.next()){
Systemoutprintln(resgetString(l))
〃保存獲取到Cd到對應(yīng)表格中
//...
用戶是不是在某個時間內(nèi)安裝了的內(nèi)置函數(shù)
\addjar/app/yuankai/hadoopjars/hive-udf.jar;
createtemporaryfunctiongx_selectinstallas'com.igexin.hive.udf.SelectlntsallcdApp';
使用舉例:
select*fromuser_app_infowhere
gx.selectinstalKinstalled:'com.tencent.pao\'^OMIII9";'2O141119")!=nulllimiti;
褊一部分:產(chǎn)生背景
產(chǎn)生背景
?為了滿足客戶個性化的需求,Hive被設(shè)計成一個很開放的系統(tǒng),很多內(nèi)容都支持用戶定制,包括:
?文件格式:TextFile.SequenceFile
?內(nèi)存中的數(shù)據(jù)格式:JavaInteger/String,HadoopIntWritable/Text
?用戶提供的map/reduce腳本:不管什么語言,利用stdin/stdout傳輸數(shù)據(jù)
?用戶自定義函數(shù)
自定義函數(shù)
?雖然Hive提供了很多函數(shù),但是有些還是難以滿足我們的需求。因此Hive提供了自定義函數(shù)開發(fā)
?自定義函數(shù)包括三種UDF、JADF、UDTF
?UDF(User-Defined-Functon)
?UDAF(User-DefinedAggregationFuncation)
?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入一行輸出多行(On-to-many
maping)的需求。
HIVE中使用定義的函數(shù)的三種方式
?在HIVE會話中add自定義函數(shù)的jar文件,然后創(chuàng)建function,繼而使用函數(shù)
?在進入HIVE會話之前先自動執(zhí)行創(chuàng)建function,不用用戶手工創(chuàng)建
?把自定義的函數(shù)寫到系統(tǒng)函數(shù)中,使之成為HIVE的一個默認(rèn)函數(shù),這樣就不需要createtemporary
function
第二部分:UDF
UDF用法
?UDF(User-Defined-Function)
?UDF函數(shù)可以直接應(yīng)用于select語句,對查詢結(jié)構(gòu)做格式化處理后,再輸出內(nèi)容
?編寫UDF函數(shù)的時候需要注意一下幾點
?自定義UDF需要繼承org.apache.hadoop.hive.ql.UDF
?需要實現(xiàn)evaluate函數(shù)
?evaluate函數(shù)支持重載
?UDF只能實現(xiàn)一進一出的操作,如果需要實現(xiàn)多進一出,則需要實現(xiàn)UDAF
UDF用法代碼示例
importorg.apache.Hadoop.hive.ql.exec.UDF
publicclassHellowordextendsUDF(
publicStringevaluate(){
return"helloworld!";
}
publicStringevaluate(Stringstr){
return"helloworld:"+str;
}
)
開發(fā)步驟
?開發(fā)代碼
?把程序打包放到目標(biāo)機器上去
?進入hive客戶端
?添加jar包:hive>addjar/run/jar/udf_test.jar;
?創(chuàng)建臨時函數(shù):hive〉CREATETEMPORARYFUNCTIONmy_addAS'com.hive.udf.Add'
?查詢HQL語句:
?SELECTmy_add(8,9)FROMscores;
?SELECTmy_add(scores.math,scores.art)FROMscores;
?銷毀臨時函數(shù):hive>DROPTEMPORARYFUNCTIONmy_add;
?細(xì)節(jié)
?在使用UDF的時候,會自動進行類型轉(zhuǎn)換,例如:
SELECTmy_add(8,9.1)FROMscores;
?結(jié)果是17.1,UDF將類型為Int的參數(shù)轉(zhuǎn)化成double。類型的飲食轉(zhuǎn)換是通過UDFResolver來進行
控制的
第三部分:UDAF
UDAF
?Hive查詢數(shù)據(jù)時,有些聚類函數(shù)在HQL沒有自帶,需要用戶自定義實現(xiàn)
?用戶自定義聚合函數(shù):Sum,Averagen-1
?UDAF(User-DefinedAggregationFuncation)
用法
??下兩個包是必須的importorg.apache.hadoop.hive.ql.exec.UDAF和
org.apache,hadoop.hive.ql.exec.UDAFEvaluator
開發(fā)步驟
?函數(shù)類需要繼承UDAF類,為部類Evaluator實UDAFEvaluator接口
?Evaluator"需要實現(xiàn)init、iterate、terminatePartiaKmerge>terminate這幾個函數(shù)
a)init函數(shù)實現(xiàn)接口UDAFEvaluator的init函數(shù)。
b)iterate接收傳入的參數(shù),并進行內(nèi)部的輪轉(zhuǎn)。其返回類型為boolean。
c)terminatePartial無參數(shù),其為iterate函數(shù)輪轉(zhuǎn)結(jié)束后,返回輪轉(zhuǎn)數(shù)據(jù),terminatePartial類似于
hadoop的Combiner,
d)merge接收terminatePartial的返回結(jié)果,進行數(shù)據(jù)merge操作,其返回類型為boolean。
e)terminate返回最終的聚親函數(shù)結(jié)果。
執(zhí)行步驟
?執(zhí)行求平均數(shù)函數(shù)的步驟
a)將java文件編譯成Avg_test.jar。
b)進入hive客戶端添加jar包:
hive>addjar/run/jar/Avg_test.jaro
c)創(chuàng)建臨時函數(shù):
hive>createtemporaryfunctionavg_test'hive.udaf.Avg';
d)查詢語句:
hivoselectavg_test(scoies.math)fromscores;
e)銷毀臨時函數(shù):
hive>droptemporaryfunctionavg_test;
UDAF代碼示例
publicclassMyAvgextendsUDAF{
publicstaticclassAvgEvauatorimplementsUDAFEvalustor(
}
publicvoidinit()
publicbooleaniterate(Doubleo){}
publicAvgStateterminatePartial(){}
publicbooleanterminateFartial(Doubleo){}
publicDoubleterminate(){}
)
第四部分:UDTF
UDTF
?UDTF(User-DefinedTable-GeneratingFunctions)用來解決輸入?行輸出多行(On-to-many
maping)的需求。
開發(fā)步驟
?UDTF步驟:
?必須繼承org.apache.Hadoop.hive.ql.udf.generic.GenericUDTF
?實現(xiàn)initialize,process,close三個方法
?UDTF首先會
?調(diào)用initialize方法,此方法返回UDTF的返回行的信息(返回個數(shù),類型)
初始化完成后,會調(diào)用process方法,對傳入的參數(shù)進行處理,可以通過forword。方法把結(jié)果返回
?最后close。方法調(diào)用,對辭要清理的方法進行清理
使用方法
?UDTF有兩種使用方法,一種直接放到select后面,一種和lateralview一起使用
?直接select中使用:selectexplode_map(properties)as(coll,col2)fromsrc;
?不可以添加其他字段使用:selecta,explode_map(properties)as(coll,col2)fromsrc
?不可以嵌套調(diào)用:selectexplode_map(explode_map(properties))fromsrc
?不可以和groupby/clusterby/distributeby/sortby一起使用:selectexplode_map(properties)
as(collzcol2)fromsrcgroupbycoll,col2
?和lateralview一起使用:selectsrc.id,mytable.coll,mytable.col2fromsrclateralview
explode_map(properties)mytableascoll,col2;
此方法史為方便H常使用。執(zhí)行過程相當(dāng)于單獨執(zhí)行了兩次抽取,然后union到一個表里。
lateralview
?LateralView語法
?lateralview:LATERALVIEWudtf(expression)tableAliasAScolumnAliascolumnAlias)*
fromClause:FROMbaseTable(lateralview)*
?LateralView用于UDTF(user-definedtablegeneratingfunctions)中將行轉(zhuǎn)成列,例如explode().
?目前LateralView不支持有上而下的優(yōu)化。如果使用Where子句,查詢可能將不被編譯。解決方法見:
此時,在查詢之前執(zhí)行sethive.optimize.ppd=false;
?例子
?pageAdy。它有兩個列
stringpageidArray<int>adidlist
"front_page"[1,2,3]
"contact_page"[3,4,5]
?SELECTpageid,adidFROMpageAdsLATERALVIEWexplode(adidjist)adTableASadid;
?將輸出如下結(jié)果
stringpageidintadid
"front_page"1
Mcontact_page"3
代碼示例
publicclassMyUDTFextendsGenericUDTF{
publicStructObjectlnspectorinitialize(ObjectInspector[]args){}
publicvoidprocess(Object[]args)throwsHiveException(}
}
實現(xiàn):切分"key:value;key:value;”這種字符串,返回結(jié)果為key,value兩個字段
Hive使用經(jīng)驗
/結(jié)果顯示設(shè)置列名
sethive.cli.print.header=true;
〃壓縮設(shè)置
seterniediate=true;
setprcss.output=false;
〃設(shè)置并行開啟和數(shù)量
sethive.exec.parallel=true;
sethive.exec.parallel.thread.number=10;
1)使用外表,避免drop的時候?qū)⒃磾?shù)據(jù)刪除。
2)對于較大兩個表格進行關(guān)聯(lián)查詢的時候,盡可能將數(shù)據(jù)預(yù)先處理
(例如篩選到單獨表格),然后再join。
3)在數(shù)據(jù)入庫的時候,依照時間或者某個屬性進行分區(qū),方便查詢
時能夠在一個分區(qū)內(nèi)檢索,提高效率。
4)復(fù)雜字段使用自定義內(nèi)置函數(shù),避免超級復(fù)雜的sql。
分桶
CREATETABLEbucketeduser(idINT)nameSTRING)
CLUSTEREDBY(id)INTO4BUCKETS;
對于每一個表(table)或者分區(qū),Hive可以進一步組織成桶,也就是說桶是更為細(xì)粒度的
數(shù)據(jù)范圍劃分。Hive也是針對某一列進行桶的組織。Hive采用對列值哈希,然后除以桶的
個數(shù)求余的方式?jīng)Q定該條記錄存放在哪個桶當(dāng)中。
把表(或者分區(qū))組織成桶(Bucket)有兩個理由:
(1)獲得更高的查詢處理效率。桶為表加上了額外的結(jié)構(gòu),Hive在處理有些查詢時能利用
這個結(jié)構(gòu)。具體而言,連接兩個在(包含連接列的)相同列上劃分了桶的表,可以使用M叩端
連接(Map-sidejoin)高效的實現(xiàn)。比如JOIN操作。對于JOIN操作兩個表有一個相同的
列,如果對這兩個表都進行了桶操作。那么將保存相同列值的桶進行JOIN操作就可以,可
以大大較少JOIN的數(shù)據(jù)量。
(2)使取樣(sampling)更高效。在處理大規(guī)模數(shù)據(jù)集時,在開發(fā)和修改查詢的階段,如
果能在數(shù)據(jù)集的一小部分?jǐn)?shù)據(jù)上試運行查詢,會帶來很多方便。
分區(qū)
分區(qū)是以字段的形式在表結(jié)構(gòu)中存在,通過describetable命令可以查看到字段存在,但是
該字段不存放實際的數(shù)據(jù)內(nèi)容,僅僅是分區(qū)的表示(偽列)。
(1)靜態(tài)分區(qū)
createtableifnotexistssopdm.wyp2(idstring,telstring)
partitionedby(ageint)
rowformatdelimited
fieldsterminatedby
storedastextfile;
-overwrite是覆蓋,into是追加
insertintotablesopdm.w/p2
partition(age='25')
selectid,name,telfromsopdm.wyp;
(2)動態(tài)分區(qū)
--設(shè)置為true表示開啟動態(tài)分區(qū)功能(默認(rèn)為false)
sethive.exec.dynamic.partition=true;
-設(shè)置為nonstrict,表示允許所有分區(qū)都是動態(tài)的(默認(rèn)為strict)
sethive.exec.dynamic.partition.mode=nonstrict;
-insertoverwrite是覆蓋,insertinto是追加
sethive.exec.dynamic.partition.mode=nonstrict;
insertoverwritetablesopdm.wyp2
partition(age)
selectid,name,tel,agefromsopdm.wyp;
Hive服務(wù):
hive命令行模式,直接輸入#/hive/bin/hive的執(zhí)行程序,或者輸
入#hive-servicecli
2、hiveweb界面的(端口號9999)啟動方式
#hive-servicehwi&
用于通過瀏覽器來訪問hive
http://hadoopO:9999/hwi/
3、hive遠(yuǎn)程服務(wù)(端口號10000)啟動方式(java訪問必須打開)
#hive一servicehiveserver&
與數(shù)據(jù)庫中的Table在概念上是類似
每一個Table在Hive中都有一個相應(yīng)的目錄存儲數(shù)據(jù)。例如,一個表test,它
在HDFS中的路徑為:/warehouse/testowarehouse是在hive-site.xml中
由${hive.metastore,warehouse,dir}
指定的數(shù)據(jù)倉庫的目錄所有的Table數(shù)據(jù)(不包括ExternalTable)都保存在這個目
錄中。刪除表時,元數(shù)據(jù)與數(shù)據(jù)都會被刪除
內(nèi)部表和外部表的區(qū)別:
內(nèi)部表:的創(chuàng)建過程和數(shù)據(jù)加載過程(這兩個過程可以在同一個語句中完成),在加載數(shù)據(jù)
的過程中,實際數(shù)據(jù)會被移動到數(shù)據(jù)倉庫目錄中;之后對數(shù)據(jù)對訪問將會直接在數(shù)據(jù)倉庫目
錄中完成。刪除表時,表中的數(shù)據(jù)和元數(shù)據(jù)將會被同時刪
除。
外部表:只有一個過程,加載數(shù)據(jù)和創(chuàng)建表同時完成,并不會移動到數(shù)據(jù)倉庫H錄中,只是
與外部數(shù)據(jù)建立一個鏈接,當(dāng)刪除一個外部表時,僅刪除該鏈接
HiveSql:
創(chuàng)建內(nèi)部表createtableinner_table(keystring);〃如果不指定分隔符,默
認(rèn)分隔符為
createtableinner_table(keystring)rowformatdelimitedfieldsterm
inatedby〃指定分隔符
createtableinner_table(keystring)rowformatdelimitedfieldsterm
inatedbystoredasSEQUENCEFILE;〃用哪種方式存儲數(shù)據(jù),SEQUENCEFILE
是hadoop自帶的文件壓縮格式
查看表結(jié)構(gòu)
describeinner_table;或者descinner_table;
加載數(shù)據(jù)
loaddatalocalinpath*/root/test,txt'intotableinnertable;
loaddatalocalinpath1/root/test.txt*overwi'iterintotableinnertab
le;〃數(shù)據(jù)有誤,重新加載數(shù)據(jù)
查看數(shù)據(jù)select*frominner_table
selectcount(*)frominnertable
刪除表droptableinrer_table
重命名表altertableinnertablerenametonew_table_name;
修改字段altertableinnertablechangekeykey1;
添加字段altertableinner_tableaddcolumns(valuestring);
分區(qū)相關(guān):
創(chuàng)建分區(qū)表
createtablepartition_tab1e(namestring,salaryfloat,genderstring,level
string)partitionedby(dtstring,depstring)rowformatdelimitedfie
Idsterminatedbystoredastextfile;
查看表有哪些分區(qū)SHOWPARTITIONSpartition.table;
查看表結(jié)構(gòu)describepartitiontable;或
者dosepartitiontable;
加載數(shù)據(jù)到分區(qū)
loaddatalocalinp
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 湖北省隨州市部分高中2025-2026學(xué)年高一上學(xué)期期末聯(lián)考物理答案
- 貴州省貴陽市2025-2026學(xué)年上學(xué)期期末九年級物理試卷(含答案)
- 過程裝備密封技術(shù)
- 會東事業(yè)單位招聘2022年考試全真模擬試題4套及答案解析(附后)
- 鋼結(jié)構(gòu)成型技術(shù)操作要點
- 事業(yè)編考試判斷推理題及答案
- 社區(qū)護士考試題及答案
- 社保業(yè)務(wù)知識試題及答案
- 禽病防治試題庫及答案
- 甘肅省定西市岷縣2025-2026學(xué)年三年級上學(xué)期學(xué)情監(jiān)測數(shù)學(xué)試卷(含答案)
- 2024年風(fēng)電、光伏項目前期及建設(shè)手續(xù)辦理流程匯編
- 不良資產(chǎn)合作戰(zhàn)略框架協(xié)議文本
- 先進班級介紹
- 2025年浙江省輔警考試真題及答案
- 2025中國熱帶農(nóng)業(yè)科學(xué)院科技信息研究所第一批招聘4人備考題庫(第1號)附答案
- 雨課堂學(xué)堂在線學(xué)堂云《婚姻家庭法(武漢科大 )》單元測試考核答案
- 安徽寧馬投資有限責(zé)任公司2025年招聘派遣制工作人員考試筆試模擬試題及答案解析
- 學(xué)堂在線 雨課堂 學(xué)堂云 研究生學(xué)術(shù)與職業(yè)素養(yǎng)講座 章節(jié)測試答案
- 2025光纖供貨合同模板
- 2025年山東省濟南市歷下區(qū)中考一模英語試題(原卷版+解析版)
- 制造部年終總結(jié)
評論
0/150
提交評論