03_擴展資料MySQLSource_第1頁
03_擴展資料MySQLSource_第2頁
03_擴展資料MySQLSource_第3頁
03_擴展資料MySQLSource_第4頁
03_擴展資料MySQLSource_第5頁
已閱讀5頁,還剩7頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1、第1章 自定義MySQLSource1.1 自定義Source說明Source是負責接收數(shù)據(jù)到Flume Agent的組件。Source組件可以處理各種類型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經(jīng)很多,但是有時候并不能滿足實際開發(fā)當中的需求,此時我們就需要根據(jù)實際需求自定義某些source。如:實時監(jiān)控MySQL,從MySQL中獲取數(shù)據(jù)傳輸?shù)紿DFS或者其他存儲框架,所以此時需要我們自己實現(xiàn)MySQLSourc

2、e。官方也提供了自定義source的接口:官網(wǎng)說明:/FlumeDeveloperGuide.html#source1.3 自定義MySQLSource組成圖6-1 自定義MySQLSource組成1.2 自定義MySQLSource步驟根據(jù)官方說明自定義mysqlsource需要繼承AbstractSource類并實現(xiàn)Configurable和PollableSource接口。實現(xiàn)相應方法:getBackOffSleepIncrement()/暫不用getMaxBackOffSleepInterval()/暫不用configure(Context

3、 context)/初始化contextprocess()/獲取數(shù)據(jù)(從mysql獲取數(shù)據(jù),業(yè)務處理比較復雜,所以我們定義一個專門的類SQLSourceHelper來處理跟mysql的交互),封裝成event并寫入channel,這個方法被循環(huán)調用stop()/關閉相關的資源1.4 代碼實現(xiàn)1.4.1 導入pom依賴 org.apache.flume flume-ng-core 1.7.0 mysql mysql-connector-java 5.1.27 1.4.2 添加配置信息在classpath下添加perties和log4j. per

4、ties:dbDriver=com.mysql.jdbc.DriverdbUrl=jdbc:mysql:/hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8dbUser=rootdbPassword=000000log4j. properties:#-console-log4j.rootLogger=info,myconsole,myfilelog4j.appender.myconsole=org.apache.log4j.ConsoleAppenderlog4j.appender.myconsole.layou

5、t=org.apache.log4j.SimpleLayout#log4j.appender.myconsole.layout.ConversionPattern =%d %t %-5p %c - %m%n#log4j.rootLogger=error,myfilelog4j.appender.myfile=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.myfile.File=/tmp/flume.loglog4j.appender.myfile.layout=org.apache.log4j.PatternLayoutlog4

6、j.appender.myfile.layout.ConversionPattern =%d %t %-5p %c - %m%n1.4.3 SQLSourceHelper1) 屬性說明:屬性說明(括號中為默認值)runQueryDelay查詢時間間隔(10000)batchSize緩存大?。?00)startFrom查詢語句開始id(0)currentIndex查詢語句當前id,每次查詢之前需要查元數(shù)據(jù)表recordSixe查詢返回條數(shù)table監(jiān)控的表名columnsToSelect查詢字段(*)customQuery用戶傳入的查詢語句query查詢語句defaultCharsetResul

7、tSet編碼格式(UTF-8)2) 方法說明:方法說明SQLSourceHelper(Context context)構造方法,初始化屬性及獲取JDBC連接InitConnection(String url, String user, String pw)獲取JDBC連接checkMandatoryProperties()校驗相關屬性是否設置(實際開發(fā)中可增加內容)buildQuery()根據(jù)實際情況構建sql語句,返回值StringexecuteQuery()執(zhí)行sql語句的查詢操作,返回值ListListgetAllRows(ListList queryResult)將查詢結果轉換為Str

8、ing,方便后續(xù)操作updateOffset2DB(int size)根據(jù)每次查詢結果將offset寫入元數(shù)據(jù)表execSql(String sql)具體執(zhí)行sql語句方法getStatusDBIndex(int startFrom)獲取元數(shù)據(jù)表中的offsetqueryOne(String sql)獲取元數(shù)據(jù)表中的offset實際sql語句執(zhí)行方法close()關閉資源3) 代碼實現(xiàn):package com.atguigu.source;import org.apache.flume.Context;import org.apache.flume.conf.ConfigurationExce

9、ption;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.sql.*;import java.text.ParseException;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class SQLSourceHelper private static final Logger LOG = LoggerFactory.getLogge

10、r(SQLSourceHelper.class); private int runQueryDelay, /兩次查詢的時間間隔 startFrom, /開始id currentIndex, /當前id recordSixe = 0, /每次查詢返回結果的條數(shù) maxRow; /每次查詢的最大條數(shù) private String table, /要操作的表 columnsToSelect, /用戶傳入的查詢的列 customQuery, /用戶傳入的查詢語句 query, /構建的查詢語句 defaultCharsetResultSet;/編碼集 /上下文,用來獲取配置文件 private Con

11、text context; /為定義的變量賦值(默認值),可在flume任務的配置文件中修改 private static final int DEFAULT_QUERY_DELAY = 10000; private static final int DEFAULT_START_VALUE = 0; private static final int DEFAULT_MAX_ROWS = 2000; private static final String DEFAULT_COLUMNS_SELECT = *; private static final String DEFAULT_CHARSET

12、_RESULTSET = UTF-8; private static Connection conn = null; private static PreparedStatement ps = null; private static String connectionURL, connectionUserName, connectionPassword; /加載靜態(tài)資源 static Properties p = new Properties(); try p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream(jd

13、perties); connectionURL = p.getProperty(dbUrl); connectionUserName = p.getProperty(dbUser); connectionPassword = p.getProperty(dbPassword); Class.forName(p.getProperty(dbDriver); catch (IOException | ClassNotFoundException e) LOG.error(e.toString(); /獲取JDBC連接 private static Connection InitConn

14、ection(String url, String user, String pw) try Connection conn = DriverManager.getConnection(url, user, pw); if (conn = null) throw new SQLException(); return conn; catch (SQLException e) e.printStackTrace(); return null; /構造方法 SQLSourceHelper(Context context) throws ParseException /初始化上下文 this.cont

15、ext = context; /有默認值參數(shù):獲取flume任務配置文件中的參數(shù),讀不到的采用默認值 this.columnsToSelect = context.getString(columns.to.select, DEFAULT_COLUMNS_SELECT); this.runQueryDelay = context.getInteger(run.query.delay, DEFAULT_QUERY_DELAY); this.startFrom = context.getInteger(start.from, DEFAULT_START_VALUE); this.defaultCha

16、rsetResultSet = context.getString(default.charset.resultset, DEFAULT_CHARSET_RESULTSET); /無默認值參數(shù):獲取flume任務配置文件中的參數(shù) this.table = context.getString(table); this.customQuery = context.getString(custom.query); connectionURL = context.getString(connection.url); connectionUserName = context.getString(conn

17、ection.user); connectionPassword = context.getString(connection.password); conn = InitConnection(connectionURL, connectionUserName, connectionPassword); /校驗相應的配置信息,如果沒有默認值的參數(shù)也沒賦值,拋出異常 checkMandatoryProperties(); /獲取當前的id currentIndex = getStatusDBIndex(startFrom); /構建查詢語句 query = buildQuery(); /校驗相應

18、的配置信息(表,查詢語句以及數(shù)據(jù)庫連接的參數(shù)) private void checkMandatoryProperties() if (table = null) throw new ConfigurationException(property table not set); if (connectionURL = null) throw new ConfigurationException(connection.url property not set); if (connectionUserName = null) throw new ConfigurationException(con

19、nection.user property not set); if (connectionPassword = null) throw new ConfigurationException(connection.password property not set); /構建sql語句 private String buildQuery() String sql = ; /獲取當前id currentIndex = getStatusDBIndex(startFrom); LOG.info(currentIndex + ); if (customQuery = null) sql = SELE

20、CT + columnsToSelect + FROM + table; else sql = customQuery; StringBuilder execSql = new StringBuilder(sql); /以id作為offset if (!sql.contains(where) execSql.append( where ); execSql.append(id).append().append(currentIndex); return execSql.toString(); else int length = execSql.toString().length(); retu

21、rn execSql.toString().substring(0, length - String.valueOf(currentIndex).length() + currentIndex; /執(zhí)行查詢 ListList executeQuery() try /每次執(zhí)行查詢時都要重新生成sql,因為id不同 customQuery = buildQuery(); /存放結果的集合 ListList results = new ArrayList(); if (ps = null) / ps = conn.prepareStatement(customQuery); ResultSet re

22、sult = ps.executeQuery(customQuery); while (result.next() /存放一條數(shù)據(jù)的集合(多個列) List row = new ArrayList(); /將返回結果放入集合 for (int i = 1; i = result.getMetaData().getColumnCount(); i+) row.add(result.getObject(i); results.add(row); LOG.info(execSql: + customQuery + nresultSize: + results.size(); return resul

23、ts; catch (SQLException e) LOG.error(e.toString(); / 重新連接 conn = InitConnection(connectionURL, connectionUserName, connectionPassword); return null; /將結果集轉化為字符串,每一條數(shù)據(jù)是一個list集合,將每一個小的list集合轉化為字符串 List getAllRows(ListList queryResult) List allRows = new ArrayList(); if (queryResult = null | queryResul

24、t.isEmpty() return allRows; StringBuilder row = new StringBuilder(); for (List rawRow : queryResult) Object value = null; for (Object aRawRow : rawRow) value = aRawRow; if (value = null) row.append(,); else row.append(aRawRow.toString().append(,); allRows.add(row.toString(); row = new StringBuilder(

25、); return allRows; /更新offset元數(shù)據(jù)狀態(tài),每次返回結果集后調用。必須記錄每次查詢的offset值,為程序中斷續(xù)跑數(shù)據(jù)時使用,以id為offset void updateOffset2DB(int size) /以source_tab做為KEY,如果不存在則插入,存在則更新(每個源表對應一條記錄) String sql = insert into flume_meta(source_tab,currentIndex) VALUES( + this.table + , + (recordSixe += size) + ) on DUPLICATE key update s

26、ource_tab=values(source_tab),currentIndex=values(currentIndex); LOG.info(updateStatus Sql: + sql); execSql(sql); /執(zhí)行sql語句 private void execSql(String sql) try ps = conn.prepareStatement(sql); LOG.info(exec: + sql); ps.execute(); catch (SQLException e) e.printStackTrace(); /獲取當前id的offset private Inte

27、ger getStatusDBIndex(int startFrom) /從flume_meta表中查詢出當前的id是多少 String dbIndex = queryOne(select currentIndex from flume_meta where source_tab= + table + ); if (dbIndex != null) return Integer.parseInt(dbIndex); /如果沒有數(shù)據(jù),則說明是第一次查詢或者數(shù)據(jù)表中還沒有存入數(shù)據(jù),返回最初傳入的值 return startFrom; /查詢一條數(shù)據(jù)的執(zhí)行語句(當前id) private Strin

28、g queryOne(String sql) ResultSet result = null; try ps = conn.prepareStatement(sql); result = ps.executeQuery(); while (result.next() return result.getString(1); catch (SQLException e) e.printStackTrace(); return null; /關閉相關資源 void close() try ps.close(); conn.close(); catch (SQLException e) e.print

29、StackTrace(); int getCurrentIndex() return currentIndex; void setCurrentIndex(int newValue) currentIndex = newValue; int getRunQueryDelay() return runQueryDelay; String getQuery() return query; String getConnectionURL() return connectionURL; private boolean isCustomQuerySet() return (customQuery !=

30、null); Context getContext() return context; public String getConnectionUserName() return connectionUserName; public String getConnectionPassword() return connectionPassword; String getDefaultCharsetResultSet() return defaultCharsetResultSet; 1.4.4 MySQLSource代碼實現(xiàn):package com.atguigu.source;import or

31、g.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.PollableSource;import org.apache.flume.conf.Configurable;import org.apache.flume.event.SimpleEvent;import org.apache.flume.source.AbstractSource;import org.slf4j.Logger;import

32、org.slf4j.LoggerFactory;import java.text.ParseException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;public class SQLSource extends AbstractSource implements Configurable, PollableSource /打印日志 private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class); /定

33、義sqlHelper private SQLSourceHelper sqlSourceHelper; Override public long getBackOffSleepIncrement() return 0; Override public long getMaxBackOffSleepInterval() return 0; Override public void configure(Context context) try /初始化 sqlSourceHelper = new SQLSourceHelper(context); catch (ParseException e)

34、e.printStackTrace(); Override public Status process() throws EventDeliveryException try /查詢數(shù)據(jù)表 ListList result = sqlSourceHelper.executeQuery(); /存放event的集合 List events = new ArrayList(); /存放event頭集合 HashMap header = new HashMap(); /如果有返回數(shù)據(jù),則將數(shù)據(jù)封裝為event if (!result.isEmpty() List allRows = sqlSource

35、Helper.getAllRows(result); Event event = null; for (String row : allRows) event = new SimpleEvent(); event.setBody(row.getBytes(); event.setHeaders(header); events.add(event); /將event寫入channel this.getChannelProcessor().processEventBatch(events); /更新數(shù)據(jù)表中的offset信息 sqlSourceHelper.updateOffset2DB(resu

36、lt.size(); /等待時長 Thread.sleep(sqlSourceHelper.getRunQueryDelay(); return Status.READY; catch (InterruptedException e) LOG.error(Error procesing row, e); return Status.BACKOFF; Override public synchronized void stop() LOG.info(Stopping sql source ., getName(); try /關閉資源 sqlSourceHelper.close(); final

37、ly super.stop(); 1.5 測試1.5.1 jar包準備1) 將mysql驅動包放入flume的lib目錄下atguiguhadoop102 flume$ cp /opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar /opt/module/flume/lib/2) 打包項目并將jar包放入flume的lib目錄下1.5.2 配置文件準備1)創(chuàng)建配置文件并打開atguiguhadoop102 job$ touch mysql.confatguiguhadoop102 job$ vim mysql.conf 2)添加如下內容# Name the components on this agenta1.sources

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論