版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
HBase的協(xié)處理器開發(fā)編碼實例Observer協(xié)處理器通常在一個特定的事件(諸如Get或Put)之前或之后發(fā)生,相當于RDBMS中的觸發(fā)器。Endpoint協(xié)處理器則類似于RDBMS中的存儲過程,因為它可以讓你在RegionServer上對數(shù)據(jù)執(zhí)行自定義計算,而不是在客戶端上執(zhí)行計算。1協(xié)處理器簡介如果要統(tǒng)計HBase中的數(shù)據(jù),比如統(tǒng)計某個字段的最大值、統(tǒng)計滿足某種條件的記錄數(shù)、統(tǒng)計各種記錄的特點并按照記錄特點分類等等,常規(guī)的做法是把HBase中整個表的數(shù)據(jù)Scan出來,或者加一個Filter,進行一些初步的過濾,然后在客戶端進行統(tǒng)計處理。但是這么做會有很大的副作用,比如占用大量的網(wǎng)絡(luò)帶寬(大數(shù)據(jù)量尤為明顯),RPC的壓力也是不容小覷的。HBase作為列式數(shù)據(jù)庫最經(jīng)常被人詬病的特性包括:無法輕易建立“二級索引”,難以執(zhí)行求和、計數(shù)、排序等操作。比如,在舊版本的(《0.92)HBase中,統(tǒng)計數(shù)據(jù)表的總行數(shù),需要使用Counter方法,執(zhí)行一次MapReduceJob才能得到。雖然HBase在數(shù)據(jù)存儲層中集成了MapReduce,能夠有效進行數(shù)據(jù)表的分布式計算,然而在很多情況下,做一些簡單的相加或者聚合計算的時候,如果直接將計算過程放置在server端,能夠減少網(wǎng)絡(luò)開銷,從而獲得很好的性能提升。于是,HBase在0.92之后引入了協(xié)處理器(coprocessors),實現(xiàn)了一些激動人心的新特性:能夠輕易建立二次索引、復(fù)雜過濾器以及訪問控制等。簡單理解來說,協(xié)處理器是HBase讓用戶的部分邏輯在數(shù)據(jù)存放端即HBase服務(wù)端進行計算的機制,它允許用戶在HBase服務(wù)端運行自己的代碼。2協(xié)處理器的分類協(xié)處理器分為兩種類型:系統(tǒng)協(xié)處理器可以全局導(dǎo)入RegionServer上的所有數(shù)據(jù)表,表協(xié)處理器是用戶可以指定一張表使用的協(xié)處理器。協(xié)處理器框架為了更好支持其行為的靈活性,提供了兩個不同方面的插件。一個是觀察者(Observer),類似于關(guān)系數(shù)據(jù)庫的觸發(fā)器。另一個是終端(Endpoint),動態(tài)的終端有點像存儲過程。Observer的設(shè)計意圖是允許用戶通過插入代碼來重載協(xié)處理器框架的upcall方法,而具體的事件觸發(fā)的callback方法由HBase的核心代碼來執(zhí)行。協(xié)處理器框架處理所有的callback調(diào)用細節(jié),協(xié)處理器自身只需要插入添加或者改變的功能。Endpoint是動態(tài)RPC插件的接口,它的實現(xiàn)代碼被安裝在服務(wù)器端,從而能夠通過HBaseRPC喚醒。客戶端類庫提供了非常方便的方法來調(diào)用這些動態(tài)接口,它們可以在任意時候調(diào)用一個終端,它們的實現(xiàn)代碼會被目標Region遠程執(zhí)行,結(jié)果會返回到終端。用戶可以結(jié)合使用這些強大的插件接口,為HBase添加全新的特性。3ProtocolBuffer的使用由于下面的Endpoint編碼示例使用了Google公司的混合語言數(shù)據(jù)標準ProtocolBuffer,所以首先了解一下這個常用于RPC系統(tǒng)的工具。3.1ProtocolBuffer介紹ProtocolBuffer是一種輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲格式,可以用于結(jié)構(gòu)化數(shù)據(jù)串行化,很適合做數(shù)據(jù)存儲或RPC數(shù)據(jù)交換格式。它可用于通訊協(xié)議、數(shù)據(jù)存儲等領(lǐng)域的語言無關(guān)、平臺無關(guān)、可擴展的序列化結(jié)構(gòu)數(shù)據(jù)格式。目前提供了C++、Java、Python三種語言的API。為什么要使用ProtocolBuffer呢?先看一個在實際開發(fā)中經(jīng)常會遇到的系統(tǒng)場景:我們的客戶端程序是使用Java開發(fā)的,可能運行自不同的平臺,如Linux、Windows或者是Android,而我們的服務(wù)器程序通常是基于Linux平臺并使用C++開發(fā)完成的。在這兩種程序之間進行數(shù)據(jù)通訊時存在多種方式用于設(shè)計消息格式,如:1、直接傳遞C/C++語言中字節(jié)對齊的結(jié)構(gòu)體數(shù)據(jù),只要結(jié)構(gòu)體的聲明為定長格式,那么該方式對于C/C++程序而言就非常方便了,僅需將接收到的數(shù)據(jù)按照結(jié)構(gòu)體類型強行轉(zhuǎn)換即可。事實上對于變長結(jié)構(gòu)體也不會非常麻煩。在發(fā)送數(shù)據(jù)時,也只需定義一個結(jié)構(gòu)體變量并設(shè)置各個成員變量的值之后,再以char*的方式將該二進制數(shù)據(jù)發(fā)送到遠端。反之,該方式對于Java開發(fā)者而言就會非常繁瑣,首先需要將接收到的數(shù)據(jù)存于ByteBuffer之中,再根據(jù)約定的字節(jié)序逐個讀取每個字段,并將讀取后的值再賦值給另外一個值對象中的域變量,以便于程序中其他代碼邏輯的編寫。對于該類型程序而言,聯(lián)調(diào)的基準是必須客戶端和服務(wù)器雙方均完成了消息報文構(gòu)建程序的編寫后才能展開,而該設(shè)計方式將會直接導(dǎo)致Java程序開發(fā)的進度過慢。即便是Debug階段,也會經(jīng)常遇到Java程序中出現(xiàn)各種域字段拼接的小錯誤。2、使用SOAP協(xié)議(WebService)作為消息報文的格式載體,由該方式生成的報文是基于文本格式的,同時還存在大量的XML描述信息,因此將會大大增加網(wǎng)絡(luò)IO的負擔。又由于XML解析的復(fù)雜性,這也會大幅降低報文解析的性能??傊?,使用該設(shè)計方式將會使系統(tǒng)的整體運行性能明顯下降。對于以上兩種方式所產(chǎn)生的問題,ProtocolBuffer均可以很好的解決,不僅如此,ProtocolBuffer還有一個非常重要的優(yōu)點就是可以保證同一消息報文新舊版本之間的兼容性。3.2安裝ProtocolBuffer//在https://developers.google/protocol-buffers/docs/downloads下載protobuf-2.6.1.tar.gz后解壓至指定目錄$tar-xvfprotobuf-2.6.1.tar.gz-Capp///刪除壓縮包$rmprotobuf-2.6.1.tar.gz//安裝c++編譯器相關(guān)包$sudoapt-getinstallg++//編譯安裝protobuf$cdapp/protobuf-2.6.1/$。/configure$make$makecheck$sudomakeinstall//添加到lib$vim~/.bashrcexportLD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib$source~/.bashrc//驗證是否安裝成功$protoc--version3.3編寫proto文件首先需要編寫一個proto文件,定義程序中需要處理的結(jié)構(gòu)化數(shù)據(jù)。proto文件非常類似java或者C語言的數(shù)據(jù)定義。如下代碼給出了示例中定義RPC接口的to文件內(nèi)容:[plain]viewplaincopy//定義常用選項optionjava_package=“com.hbase.demo.endpoint”;//指定生成Java代碼的包名optionjava_outer_classname=“Sum”;//指定生成Java代碼的外部類名稱optionjava_generic_services=true;//基于服務(wù)定義產(chǎn)生抽象服務(wù)代碼optionoptimize_for=SPEED;//指定優(yōu)化級別//定義請求包messageSumRequest{requiredstringfamily=1;//列族requiredstringcolumn=2;//列名}//定義回復(fù)包messageSumResponse{requiredint64sum=1[default=0];//求和結(jié)果}//定義RPC服務(wù)serviceSumService{//獲取求和結(jié)果rpcgetSum(SumRequest)returns(SumResponse);}3.4編譯proto文件//將proto文件編譯生成java代碼$protocto--java_out=。///生成的文件Sum.java如下圖所示:4Endpoint編碼示例業(yè)務(wù)邏輯如求和、排序等功能放在服務(wù)端,在服務(wù)端完成計算后將結(jié)果發(fā)送給客戶端,可以減少數(shù)據(jù)的傳輸量。下面的示例將在HBase的服務(wù)端生成一個RPC服務(wù),即在服務(wù)端對指定表的指定列值進行求和計算,并將計算結(jié)果返回給客戶端??蛻舳苏{(diào)用該RPC服務(wù),獲取響應(yīng)結(jié)果后輸出。4.1服務(wù)端代碼首先,將通過ProtocolBuffer生成的RPC接口文件Sum.java導(dǎo)入項目,然后在項目中新建類SumEndPoint編寫服務(wù)端代碼:[java]viewplaincopypackagecom.hbase.demo.endpoint;importjava.io.IOException;importjava.util.ArrayList;importjava.util.List;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.CellUtil;importorg.apache.hadoop.hbase.Coprocessor;importorg.apache.hadoop.hbase.CoprocessorEnvironment;importorg.apache.hadoop.hbase.client.Scan;importorg.apache.hadoop.hbase.coprocessor.CoprocessorException;importorg.apache.hadoop.hbase.coprocessor.CoprocessorService;importorg.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;importtobuf.ResponseConverter;importorg.apache.hadoop.hbase.regionserver.InternalScanner;importorg.apache.hadoop.hbase.util.Bytes;importtobuf.RpcCallback;importtobuf.RpcController;importtobuf.Service;importcom.hbase.demo.endpoint.Sum.SumRequest;importcom.hbase.demo.endpoint.Sum.SumResponse;importcom.hbase.demo.endpoint.Sum.SumService;/***@authordeveloper*說明:hbase協(xié)處理器endpooint的服務(wù)端代碼*功能:繼承通過protocolbuffer生成的rpc接口,在服務(wù)端獲取指定列的數(shù)據(jù)后進行求和操作,最后將結(jié)果返回客戶端*/publicclassSumEndPointextendsSumServiceimplementsCoprocessor,CoprocessorService{privateRegionCoprocessorEnvironmentenv;//定義環(huán)境@OverridepublicServicegetService(){returnthis;}@OverridepublicvoidgetSum(RpcControllercontroller,SumRequestrequest,RpcCallback《SumResponse》done){//定義變量SumResponseresponse=null;InternalScannerscanner=null;//設(shè)置掃描對象Scanscan=newScan();scan.addFamily(Bytes.toBytes(request.getFamily()));scan.addColumn(Bytes.toBytes(request.getFamily()),Bytes.toBytes(request.getColumn()));//掃描每個region,取值后求和try{scanner=env.getRegion().getScanner(scan);List《Cell》results=newArrayList《Cell》();booleanhasMore=false;Longsum=0L;do{hasMore=scanner.next(results);for(Cellcell:results){sum+=Long.parseLong(newString(CellUtil.cloneValue(cell)));}results.clear();}while(hasMore);//設(shè)置返回結(jié)果response=SumResponse.newBuilder().setSum(sum).build();}catch(IOExceptione){ResponseConverter.setControllerException(controller,e);}finally{if(scanner!=null){try{scanner.close();}catch(IOExceptione){e.printStackTrace();}}}//將rpc結(jié)果返回給客戶端done.run(response);}//協(xié)處理器初始化時調(diào)用的方法@Overridepublicvoidstart(CoprocessorEnvironmentenv)throwsIOException{if(envinstanceofRegionCoprocessorEnvironment){this.env=(RegionCoprocessorEnvironment)env;}else{thrownewCoprocessorException(“noloadregion”);}}//協(xié)處理器結(jié)束時調(diào)用的方法@Overridepublicvoidstop(CoprocessorEnvironmentenv)throwsIOException{}}4.2客戶端代碼在項目中新建類SumClient作為調(diào)用RPC服務(wù)的客戶端測試程序,代碼如下:[java]viewplaincopypackagecom.hbase.demo.endpoint;importjava.io.IOException;importjava.util.Map;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Connection;importorg.apache.hadoop.hbase.client.ConnectionFactory;importorg.apache.hadoop.hbase.client.HTable;importorg.apache.hadoop.hbase.client.coprocessor.Batch;importorg.apache.hadoop.hbase.ipc.BlockingRpcCallback;importtobuf.ServiceException;importcom.hbase.demo.endpoint.Sum.SumRequest;importcom.hbase.demo.endpoint.Sum.SumResponse;importcom.hbase.demo.endpoint.Sum.SumService;/***@authordeveloper*說明:hbase協(xié)處理器endpooint的客戶端代碼*功能:從服務(wù)端獲取對hbase表指定列的數(shù)據(jù)的求和結(jié)果*/publicclassSumClient{publicstaticvoidmain(String[]args)throwsServiceException,Throwable{longsum=0L;//配置HBseConfigurationconf=HBaseConfiguration.create();conf.set(“hbase.zookeeper.quorum”,“l(fā)ocalhost”);conf.set(“perty.clientPort”,“2222”);//建立一個數(shù)據(jù)庫的連接Connectionconn=ConnectionFactory.createConnection(conf);//獲取表HTabletable=(HTable)conn.getTable(TableName.valueOf(“sum_table”));//設(shè)置請求對象finalSumRequestrequest=SumRequest.newBuilder().setFamily(“info”).setColumn(“score”).build();//獲得返回值Map《byte[],Long》result=table.coprocessorService(SumService.class,null,null,newBatch.Call《SumService,Long》(){@OverridepublicLongcall(SumServiceservice)throwsIOException{BlockingRpcCallback《SumResponse》rpcCallback=newBlockingRpcCallback《SumResponse》();service.getSum(null,request,rpcCallback);SumResponseresponse=(SumResponse)rpcCallback.get();returnresponse.hasSum()?response.getSum():0L;}});//將返回值進行迭代相加for(Longv:result.values()){sum+=v;}//結(jié)果輸出System.out.println(“sum:”+sum);//關(guān)閉資源table.close();conn.close();}}4.3加載Endpoint//將Sum類和SumEndPoint類打包后上傳到HDFS$hadoopfs-putendpoint_sum.jar/input//修改hbase配置文件,添加配置$vimapp/hbase-1.2.0-cdh5.7.1/conf/hbase-site.xml[html]viewplaincopy《property》《name》hbase.table.sanity.checks《/name》《value》false《/value》《/property》//重啟hbase$stop-hbase.sh$start-hbase.sh//啟動hbaseshell$hbaseshell//創(chuàng)建表sum_table》create‘sum_table’,‘info’//插入測試數(shù)據(jù)》put‘sum_table’,‘rowkey01’,‘info:score’,‘95’》put‘sum_table’,‘rowkey02’,‘info:score’,‘98’》put‘sum_table’,‘rowkey02’,‘info:age’,‘20’//查看數(shù)據(jù)》scan‘sum_table’//加載協(xié)處理器》disable‘sum_table’》alter‘sum_table’,METHOD=》‘table_att’,‘coprocessor’=》‘hdfs://localhost:9000/input/endpoint_sum.jar|com.hbase.demo.endpoint.SumEndPoint|100’》enable‘sum_table’//如果要卸載協(xié)處理器,可以先查看表中協(xié)處理器名,然后通過命令卸載》disable‘sum_table’》describe‘sum_table’》alter‘sum_table’,METHOD=》‘table_att_unset’,NAME=》‘coprocessor$1’》enable‘sum_table’4.4測試在eclipse中運行客戶端程序SumClient,輸出結(jié)果為193,正好符合預(yù)期,如下圖所示:5Observer編碼示例一般來說,對數(shù)據(jù)庫建立索引,往往需要單獨的數(shù)據(jù)結(jié)構(gòu)來存儲索引的數(shù)據(jù)。在hbase表中,除了使用rowkey索引數(shù)據(jù)外,還可以另外建立一張索引表,查詢時先查詢索引表,然后用查詢結(jié)果查詢數(shù)據(jù)表。下面這個示例演示如何使用Observer協(xié)處理器生成HBase表的二級索引:將數(shù)據(jù)表ob_table中列info:name的值作為索引表index_ob_table的rowkey,將數(shù)據(jù)表ob_table中列info:score的值作為索引表index_ob_table中列info:score的值建立二級索引,當用戶向數(shù)據(jù)表中插入數(shù)據(jù)時,索引表將自動插入二級索引,從而為查詢業(yè)務(wù)數(shù)據(jù)提供了便利。5.1代碼在項目中新建類PutObserver作為Observer協(xié)處理器應(yīng)用邏輯類,代碼如下:[java]viewplaincopypackagecom.hbase.demo.observer;importjava.io.IOException;importjava.util.List;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.CellUtil;importorg.apache.hadoop.hbase.TableName;importorg.apache.hadoop.hbase.client.Durability;importorg.apache.hadoop.hbase.client.HTableInterface;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.coprocessor.BaseRegionObserver;importorg.apache.hadoop.hbase.coprocessor.ObserverContext;importorg.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;importorg.apache.hadoop.hbase.regionserver.wal.WALEdit;importorg.apache.hadoop.hbase.util.Bytes;/***@authordeveloper*說明:hbase協(xié)處理器observer的應(yīng)用邏輯代碼*功能:在應(yīng)用了該observer的hbase表中,所有的put操作,都會將每行數(shù)據(jù)的info:name列值作為rowkey、info:score列值作為value*寫入另一張二級索引表index_ob_table,可以提高對于特定字段的查詢效率*/@SuppressWarnings(“deprecation”)publicclassPutObserverextendsBaseRegionObserver{@OverridepublicvoidpostPut(ObserverContext《RegionCoprocessorEnvironment》e,Putput,WALEditedit,Durabilitydurability)throwsIOException{//獲取二級索引表HTableInterfacetable=e.getEnvironment().getTable(TableName.valueOf(“index_ob_table”));//獲取值List《Cell》cellList1=put.get(Bytes.toBytes(“info”),Bytes.toBytes(“name”));List《Cell》cellList2=put.get(Bytes.toBytes(“info”),Bytes.toBytes(“score”));//將數(shù)據(jù)插入二級索引表for(Cellcell1:cellList1){//列info:name的值作為二級索引表的rowkeyPutindexPut=newPut(CellUtil.cloneValue(cell1));for(Cellcell2:cellList2){//列info:score的值作為二級索引表中列info:score的值indexPut.add(Bytes.toBytes(“info”),Bytes.toBytes(“score”),CellUtil.cloneValue(cell2));}//數(shù)據(jù)插入二級索引表table.put(indexPut);}//關(guān)閉資源table.close();}}5.2加載Observer//將PutObserver類打包后上傳到HDFS$hadoopfs-putovserver_put.jar/input//啟動hbaseshell$hbaseshell//創(chuàng)建數(shù)據(jù)表ob_table》create‘ob_table’,‘info’//創(chuàng)建二級索引表ob_table》create‘index_ob_table’,‘info’//加載協(xié)處理器》disable‘ob_table’》alter‘ob_table’,METHOD=》‘table_att’,‘coprocessor’=》‘hdfs://localhost:9000/input/observer_put.jar|com.hbase.demo.observer.PutObserver|100’》enable‘ob_table’//查看數(shù)據(jù)表ob_table》describe‘ob_table’5.3測試//在
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026河南鄭州政務(wù)大廳招聘12人考試參考試題及答案解析
- 2026中旅西南重慶旅游發(fā)展有限公司招聘14人考試備考題庫及答案解析
- 2026河南新鄉(xiāng)市誠城卓人學(xué)校教師招聘考試備考題庫及答案解析
- 2026贛州有色冶金研究所有限公司招聘11人考試參考試題及答案解析
- 2026年六安裕安區(qū)江家店鎮(zhèn)公開招考村級后備干部5名筆試備考試題及答案解析
- 2026江蘇宿遷市公安局招聘輔警21人考試參考題庫及答案解析
- 2026北京興賓通人力資源管理有限公司北京市大興區(qū)教委招聘勞務(wù)派遣人員7人考試備考題庫及答案解析
- 2025內(nèi)外貿(mào)一體化認證服務(wù)指南-動力電池產(chǎn)業(yè)
- 2026年煙臺市青年干部人才“菁英計劃”選聘-中國石油大學(xué)(華東)考試參考題庫及答案解析
- 2026年哈爾濱鐵道職業(yè)技術(shù)學(xué)院單招綜合素質(zhì)筆試參考題庫帶答案解析
- 2026年及未來5年市場數(shù)據(jù)中國船舶智能化市場深度分析及投資戰(zhàn)略咨詢報告
- 鋼結(jié)構(gòu)廠房拆除施工方案設(shè)計
- 2026共青團中央所屬單位高校畢業(yè)生招聘66人考試筆試備考題庫及答案解析
- 多維并舉:河南省周口市農(nóng)村養(yǎng)老服務(wù)體系的困境與突破
- 煤礦安全規(guī)程機電部分課件
- 第二章第三節(jié)中國的河流中國第一大河長江課件-湘教版地理八年級上冊
- 美術(shù)培訓(xùn)策劃書
- 2025中國企業(yè)出海競爭力指數(shù)報告
- 人教版七年級英語上冊期末復(fù)習(xí)真題分類練習(xí) 專題05 完形填空(15空)20題(原卷版)
- 2026屆湖南省長沙市長郡集團物理八年級第一學(xué)期期末復(fù)習(xí)檢測模擬試題含解析
- 駕駛證“三力”測試20題-駕考題庫
評論
0/150
提交評論