版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
項目三實時數(shù)據(jù)庫采集工具Canal和Maxwell
主講人:禤捷鵬Canal、Maxwell學習目標Canal——用Java開發(fā)的基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費的中間件。Maxwell——用Java編寫的MySQL日志實時抓取軟件。知識目標:(1)掌掌握Canal中的數(shù)據(jù)結(jié)構(gòu),如CanalEntry、RowChange和RowData;(2)學會配置Canal的TCP模式進行實時數(shù)據(jù)監(jiān)控;(3)理解Canal-Kafka模式的配置和工作原理;(4)熟悉Maxwell的安裝和啟動過程;(5)理解Maxwell的基本操作流程和監(jiān)控數(shù)據(jù)庫的功能。技能目標:(1)能夠編寫應用程序解析Canal采集到的數(shù)據(jù);(2)能夠配置Canal的TCP模式和Kafka模式;(3)能夠配置MySQL的二進制日志和開啟日志功能;(4)能夠使用Maxwell監(jiān)控MySQL數(shù)據(jù)變化并發(fā)送到Kafka;(5)能夠解析和處理Canal和Maxwell生成的JSON數(shù)據(jù)。任務1Linux系統(tǒng)MySQL5.7數(shù)據(jù)庫安裝CONTENTS任務2MySQL開啟Binlog和數(shù)據(jù)準備任務3Canal下載和安裝任務5實時數(shù)據(jù)監(jiān)控測試之Kafka模式任務4實時數(shù)據(jù)監(jiān)控測試之TCP模式Canal、Maxwell任務6Maxwell初始化和進程啟動任務導航本模塊知識框架:本模塊主要是熟悉基于Linux系統(tǒng)部署MySQL5.7。模塊中MySQL使用RPM進行安裝,需要預先到Oracle官網(wǎng)下載對應的MySQL5.7的RPM包。安裝完成后,配置MySQL的編碼為UTF-8和允許遠程登錄。任務實踐1MySQL下載和上傳在MySQL官網(wǎng)下載對應版本軟件安裝包,選擇適應實驗場景的版本,本機為Centos7.9操作系統(tǒng),Mysql數(shù)據(jù)庫選擇5.7.16版本1.官網(wǎng)下載軟件壓縮包任務實踐1MySQL下載和上傳使用xftp軟件工具上傳壓縮包到node03機器2.上傳壓縮包到node03任務實踐2安裝環(huán)境檢查和安裝MySQL(1)執(zhí)行安裝命令前,先執(zhí)行查詢命令1.檢查是否安裝過MySQL(2)若已安裝,需要依次卸載rpm-qa|grepmysql命令列出來的名稱rpm-e--nodepsmysql5-commonrpm-e--nodepsmysql5rpm-e--nodepsmysql5-libsrpm-e--nodepsmysql5-serverrpm-e--nodepsmysql5-errmsgrpm-e--nodepsmysql5-devel任務實踐2安裝環(huán)境檢查和安裝MySQL(3)mariadb的安裝查看1.檢查是否安裝過MySQL(4)若已安裝,需要依次卸載rpm-qa|grepmariadb命令列出來的名稱rpm-e--nodepsmariadb-commonrpm-e--nodepsmariadb-connector-c任務實踐2安裝環(huán)境檢查和安裝MySQL(1)執(zhí)行安裝命令前,先執(zhí)行查詢命令2.檢查MySQL依賴環(huán)境(2)如果不存在需要到centos安裝盤里進行rpm安裝yum-yinstalllibaionet-tools任務實踐2安裝環(huán)境檢查和安裝MySQL3.由于MySQL安裝過程中,會通過MySQL用戶在/tmp目錄下新建tmp_db文件,所以請給/tmp較大的權(quán)限,執(zhí)行以下命令chmod-R777/tmp4.在MySQL的安裝文件目錄下執(zhí)行(必須按照順序執(zhí)行)rpm-ivhmysql-community-common-5.7.16-1.el7.x86_64.rpmrpm-ivhmysql-community-libs-5.7.16-1.el7.x86_64.rpmrpm-ivhmysql-community-client-5.7.16-1.el7.x86_64.rpmrpm-ivhmysql-community-server-5.7.16-1.el7.x86_64.rpm說明:如在檢查工作時,沒有檢查MySQL依賴環(huán)境在安裝mysql-community-server會報錯。任務實踐3MySQL服務初始化和啟停服務1.MySQL服務初始化為了保證數(shù)據(jù)庫目錄為與文件的所有者為
MySQL登陸用戶,如果當前用戶是root身份運行MySQL服務,需要執(zhí)行下面的命令初始化mysqld--initialize--user=mysql說明:參數(shù)
--initialize選項默認以“安全”模式來初始化,則會為root用戶生成一個密碼并將該密碼標記為過期,登陸后需要設(shè)置一個新的密碼。查看密碼,示例代碼如下所示cat/var/log/mysqld.log|greproot@localhost在執(zhí)行結(jié)果中,root@localhost:后面就是初始化的密碼任務實踐3MySQL服務初始化和啟停服務2.MySQL服務的啟動和關(guān)閉MySQL服務的啟動和關(guān)閉,示例代碼如下所示#啟動MySQL服務命令systemctlstartmysqld.service#關(guān)閉MySQL服務命令systemctlstopmysqld.service#重啟MySQL服務命令systemctlrestartmysqld.service任務實踐4修改MySQL字符集1.修改f文件在node03機器進入到/etc目錄,使用vim編輯打開f,f為mysql數(shù)據(jù)庫的主配置文件,進入編輯模式,修改字符編碼為utf-8任務實踐4修改MySQL字符集2.重新啟動MySQLsystemctlrestartmysqld.service3.進入MySQL數(shù)據(jù)庫mysql-uroot-pEnterpassword:將命令cat/var/log/mysqld.log|greproot@localhost執(zhí)行后得到的初始化密碼輸入到Enterpassword:后即可。4.重置密碼#修改數(shù)據(jù)庫密碼mysql>alteruser'root'@'localhost'identifiedby'123456';#刷新權(quán)限mysql>flushprivileges;5.修改字符集的SQL語句#修改數(shù)據(jù)庫的字符集mysql>alterdatabasemydbcharacterset'utf8';#修改數(shù)據(jù)表的字符集mysql>altertablemytblconverttocharacterset'utf8';任務實踐4修改MySQL字符集6.查看mysql數(shù)據(jù)庫狀態(tài)鞏固與提高在數(shù)據(jù)采集過程中,你認為數(shù)據(jù)的質(zhì)量對于后續(xù)的數(shù)據(jù)分析和決策有何重要性?任務1Linux系統(tǒng)MySQL5.7數(shù)據(jù)庫安裝CONTENTS任務2MySQL開啟Binlog和數(shù)據(jù)準備任務3Canal下載和安裝任務5實時數(shù)據(jù)監(jiān)控測試之Kafka模式任務4實時數(shù)據(jù)監(jiān)控測試之TCP模式Canal、Maxwell任務6Maxwell初始化和進程啟動任務導航本模塊知識框架:任務二主要是登錄成功MySQL數(shù)據(jù)庫,開啟Binlog日志,在MySQL命令行下創(chuàng)建數(shù)據(jù)庫和表。通過本模塊的學習,熟練MySQL數(shù)據(jù)庫DDL語句,能夠獨立的創(chuàng)建Canal測試使用的數(shù)據(jù)庫和表。并為指定的數(shù)據(jù)庫開啟Binlog日志記錄設(shè)置。知識準備MySQL主從復制以及Canal工作原理(一)MySQL主從復制流程(1)Master主庫將改變記錄,寫到二進制日志(BinaryLog)中;(2)Slave從庫向MySQLMaster發(fā)送dump協(xié)議,將Master主庫的binarylogevents拷貝到它的中繼日志(relaylog);(3)Slave從庫讀取并重做中繼日志中的事件,將改變的數(shù)據(jù)同步到自己的數(shù)據(jù)庫。為了更好的帶大家理解mysql數(shù)據(jù)庫的主從復制,我們可以看下架構(gòu)原理執(zhí)行圖如下圖所示:知識準備MySQL主從復制以及Canal工作原理知識準備MySQL主從復制以及Canal工作原理(二)Canal工作原理和應用場景 1.Canal工作原理Canal工作原理其實就是把自身偽裝成Slave,假裝從Master復制數(shù)據(jù)。把修改保存到Binlog中的數(shù)據(jù)進行解析成json數(shù)據(jù)格式,以適配到不同的系統(tǒng)。2.Canal使用場景(1)原始場景:阿里Otter中間件的一部分,Otter是阿里用于進行異地數(shù)據(jù)庫之間的同步框架,Canal是其中一部分,Canal數(shù)據(jù)采集流程如圖所示。知識準備MySQL主從復制以及Canal工作原理(2)典型場景1:我們可以使用Canal工具完成我們數(shù)據(jù)庫和緩存之間的更新,其更新框架圖如圖所示。(3)典型場景2:抓取業(yè)務表的新增變化數(shù)據(jù),用于制作實時統(tǒng)計(本案例采用的場景)。任務實踐1創(chuàng)建數(shù)據(jù)庫和表使用SqlYog創(chuàng)建數(shù)據(jù)庫,如圖所示任務實踐1創(chuàng)建數(shù)據(jù)庫和表創(chuàng)建數(shù)據(jù)表,示例代碼如下所示CREATETABLEuser_info(`id`VARCHAR(255),`name`VARCHAR(255),`sex`VARCHAR(255));任務實踐1開啟Binlog編輯配置f文件,示例代碼如下所示#編輯f配置文件(sudovim/etc/f),添加以下內(nèi)容:#MySQL服務器idserver-id=1log-bin=mysql-bin#binlog記錄級別rowbinlog_format=row#指定數(shù)據(jù)庫binlog-do-db=testdb1重啟MySQL使配置生效sudosystemctlrestartmysqld任務實踐1查看未修改數(shù)據(jù)的狀態(tài)使用Sqlyog創(chuàng)建數(shù)據(jù)庫連接后,數(shù)據(jù)庫的操作工具我們可以看到創(chuàng)建的數(shù)據(jù)庫和表。任務實踐1查看未修改數(shù)據(jù)的狀態(tài)在沒有對數(shù)據(jù)庫操作之前通過ls-l命令查看log日志文件,初始binlog文件如圖所示任務實踐2查看修改數(shù)據(jù)后的狀態(tài)使用Sqlyog工具選擇測試的數(shù)據(jù)庫和表,執(zhí)行insert命令向數(shù)據(jù)庫添加數(shù)據(jù),示例代碼如下#插入數(shù)據(jù)的代碼INSERTINTOuser_infoVALUES('1001','zhangsan','male');在向數(shù)據(jù)庫添加數(shù)據(jù)后,再次通過命令ls-l查看日志目錄,發(fā)現(xiàn)在上一步中查看的相同的日志文件日志大小發(fā)生了變化,說明我們的Binlog開啟成功,Mysql數(shù)據(jù)庫記錄了修改數(shù)據(jù)庫的日志變化,具體變化如圖任務實踐2查看修改數(shù)據(jù)后的狀態(tài)最后創(chuàng)建canal用戶,并賦予readonly權(quán)限,示例代碼如下#在MySQL中執(zhí)行MySQL>GRANTSELECT,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TO'canal'@'%'IDENTIFIEDBY'canal';鞏固與提高為什么在Canal工具中選擇row格式比較合適?請?zhí)峁┲辽賰蓚€理由。任務1Linux系統(tǒng)MySQL5.7數(shù)據(jù)庫安裝CONTENTS任務2MySQL開啟Binlog和數(shù)據(jù)準備任務3Canal下載和安裝任務5實時數(shù)據(jù)監(jiān)控測試之Kafka模式任務4實時數(shù)據(jù)監(jiān)控測試之TCP模式Canal、Maxwell任務6Maxwell初始化和進程啟動任務導航本模塊知識框架:本模塊構(gòu)建Canal環(huán)境。(1)下載解壓Canal,需要提前創(chuàng)建好解壓目錄。(2)修改perties文件。(3)修改perties屬性配置文件。知識準備Canal是什么Canal是阿里巴巴開源的一款用于MySQL數(shù)據(jù)庫的增量數(shù)據(jù)訂閱和消費的中間件(如圖3-3-1所示),它是一個用于實時數(shù)據(jù)同步和遷移的開源軟件。它主要用于將MySQL或MariaDB數(shù)據(jù)庫的數(shù)據(jù)變更(包括新增、更新和刪除操作)以日志的形式捕獲,并將這些變更同步到其他數(shù)據(jù)源,如其他數(shù)據(jù)庫、消息隊列或數(shù)據(jù)倉庫等。知識準備Canal的功能Canal的主要功能是基于MySQL的binlog日志進行數(shù)據(jù)訂閱和消費。它通過解析MySQL的binlog,將數(shù)據(jù)庫的數(shù)據(jù)變更(如插入、更新、刪除操作)以事件的形式捕獲,并將這些事件發(fā)送到訂閱者,以實現(xiàn)實時的數(shù)據(jù)同步、數(shù)據(jù)分發(fā)和數(shù)據(jù)消費。(一)數(shù)據(jù)訂閱和同步Canal通過解析MySQL的binlog日志,實現(xiàn)對數(shù)據(jù)庫的增量數(shù)據(jù)變更的訂閱和同步。它可以捕獲數(shù)據(jù)庫的插入、更新、刪除等操作,并將這些變更以事件的形式實時推送給訂閱者。(二)數(shù)據(jù)分發(fā)和轉(zhuǎn)換Canal支持將訂閱到的數(shù)據(jù)變更以多種數(shù)據(jù)格式(如JSON、Avro等)進行轉(zhuǎn)換,并將其分發(fā)到多個不同的目標數(shù)據(jù)源,如其他數(shù)據(jù)庫、消息隊列、數(shù)據(jù)倉庫等。(三)數(shù)據(jù)過濾和選擇性訂閱Canal允許用戶根據(jù)需要進行靈活的數(shù)據(jù)過濾和選擇性訂閱,可以指定需要訂閱的數(shù)據(jù)庫、表格,以及過濾條件等。這樣可以提高效率,僅訂閱感興趣的數(shù)據(jù)。任務實踐1下載Canal安裝工具在Canal官網(wǎng)下載頁(/alibaba/canal/tags)中,我們可以找到并下載Canal安裝工具下載好Canal壓縮包后,使用xftp工具,將Canal壓縮包導入到node3節(jié)點中。任務實踐2安裝Canal1.解壓Canal壓縮包到/opt/module/canal目錄下(1)在/opt/module目錄下創(chuàng)建文件夾。[root@node01softwares]#mkdir-p/opt/module/canal(2)在/export/softwares目錄下解壓下載的Canal壓縮包到/export/servers/canal目錄。[root@node01softwares]#tar-zxvfcanal.deployer-1.1.6.tar.gz-C/opt/module/canal任務實踐2安裝Canal2.修改perties文件#編輯配置文件/opt/module/canal/conf/perties##########################################################commonargument##############################################################canal.ip=canal.port=11111canal.metrics.pull.port=11112canal.zkServers=#flushdatatozkcanal.zookeeper.flush.period=1000canal.withoutNetty=false#tcp,kafka,RocketMQcanal.serverMode=tcp
#flushmetacursor/parsepositiontofile任務實踐2安裝Canal3.配置instance讀取一個
MySQL數(shù)據(jù),所以只有一個實例,這個實例的配置文件在Canal解壓目錄中的conf/example/perties目錄下。
(1)配置
MySQL服務器地址,關(guān)鍵配置參數(shù),示例代碼如下canal.instance.MySQL.slaveId=0#enablegtidusetrue/falsecanal.instance.gtidon=false#positioninfocanal.instance.master.address=node01:3306任務實踐2安裝Canal3.配置instance讀取一個
MySQL數(shù)據(jù),所以只有一個實例,這個實例的配置文件在Canal解壓目錄中的conf/example/perties目錄下。
(2)配置連接MySQL的用戶名和密碼,默認前面授權(quán)的canal,同學們需要根據(jù)自己的數(shù)據(jù)庫的用戶名和密碼進行設(shè)置,示例代碼如下#username/passwordcanal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset=UTF-8任務實踐2安裝Canal4.啟動Canal并查看相關(guān)日志(1)啟動運行Canal,示例代碼如下所示。cd/opt/module/canal/bin./startup.sh(2)查看Canal相關(guān)日志,示例代碼如下所示。cat/opt/module/canal/logs/example/example.log鞏固與提高Canal作為中間件,可能會面臨數(shù)據(jù)安全和信息泄露的風險。在使用Canal時,你如何保護數(shù)據(jù)的安全性和防止信息泄露?任務1Linux系統(tǒng)MySQL5.7數(shù)據(jù)庫安裝CONTENTS任務2MySQL開啟Binlog和數(shù)據(jù)準備任務3Canal下載和安裝任務5實時數(shù)據(jù)監(jiān)控測試之Kafka模式任務4實時數(shù)據(jù)監(jiān)控測試之TCP模式Canal、Maxwell任務6Maxwell初始化和進程啟動任務導航本模塊知識框架:本模塊主要是學習IDEA中Maven工程的搭建,通過編寫應用程序?qū)崿F(xiàn)Canal中的Message數(shù)據(jù)解析。根據(jù)不同的數(shù)據(jù)庫操作,如update,insert,delete,以及Event的類型,在應用程序中監(jiān)控數(shù)據(jù)的變化,這里使用的是canal的默認模式Tcp模式。知識準備Canal中封裝的數(shù)據(jù)結(jié)構(gòu)知識準備Canal中封裝的數(shù)據(jù)結(jié)構(gòu)(一)CanalEntryCanalEntry是Canal中最常用的數(shù)據(jù)結(jié)構(gòu),代表一條數(shù)據(jù)庫的數(shù)據(jù)變更。(二)RowChangeRowChange對象用于表示一次數(shù)據(jù)庫的數(shù)據(jù)變更,包括一系列的行級別變更操作。它包含以下幾個重要的字段:(1)EventType:表示數(shù)據(jù)操作的類型,包括INSERT、UPDATE和DELETE。(2)rowDataList:表示行級別的數(shù)據(jù)變更操作列表,包含一系列的RowData對象。(三)RowDataRowData對象表示數(shù)據(jù)庫表格中一行的數(shù)據(jù)變更。它包含以下幾個重要的字段:(1)beforeColumns:變更前的列數(shù)據(jù),以List形式存儲每個列的名稱和值。(2)afterColumns:變更后的列數(shù)據(jù),以List形式存儲每個列的名稱和值。通過使用這些數(shù)據(jù)結(jié)構(gòu),我們可以獲取到數(shù)據(jù)庫中的數(shù)據(jù)變更,并對其進行相應的處理和操作。我們可以根據(jù)具體的需求,解析CanalEntry對象獲取相關(guān)的數(shù)據(jù)信息,如數(shù)據(jù)庫名稱、表格名稱、操作類型、列名稱和列值等,進而進行相應的業(yè)務邏輯處理。任務實踐1創(chuàng)建數(shù)據(jù)庫db2并開啟Binlog監(jiān)控(1)登錄Mysql服務器創(chuàng)建一個數(shù)據(jù)庫db2具體操作如下。這里在node03號機器,輸入命令,示例代碼如下所示。mysql-u用戶名-p密碼這里輸入自己的數(shù)據(jù)庫的用戶名和密碼,登錄成功后可以進入到mysql數(shù)據(jù)庫的會話窗口中,在mysql會話窗口中我們可以輸入SQL命令進行數(shù)據(jù)庫的創(chuàng)建和數(shù)據(jù)庫的查看任務實踐1創(chuàng)建數(shù)據(jù)庫db2并開啟Binlog監(jiān)控(2)選擇db2數(shù)據(jù)庫在數(shù)據(jù)庫中創(chuàng)建數(shù)據(jù)庫表user_info表,具體操作如下所示。mysql>showdatabases;+--------------------+|Database|+--------------------+|information_schema||db2||eagle||hive||maxwell||mysql||performance_schema||sys|+--------------------+8rowsinset(0.00sec)
mysql>usedb2;Databasechangedmysql>createtableuser_info(->idvarchar(20),->namevarchar(20),->sexvarchar(10)->);QueryOK,0rowsaffected(0.00sec)任務實踐1創(chuàng)建數(shù)據(jù)庫db2并開啟Binlog監(jiān)控(3)開啟MySQL的Binlog并監(jiān)控db2數(shù)據(jù)庫。#編輯f配置文件(sudovim/etc/f),添加以下內(nèi)容:#MySQL服務器idserver-id=1log-bin=mysql-bin#binlog記錄級別rowbinlog_format=row#指定數(shù)據(jù)庫binlog-do-db=db2任務實踐2IDEA創(chuàng)建Maven工程并編寫CanalClient類(1)在IDEA中創(chuàng)建Maven工程,修改pom.xml文件,增加以下依賴和插件。示例代碼如下所示。<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency></dependencies><build><plugins>
<!--java編譯插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build>任務實踐2IDEA創(chuàng)建Maven工程并編寫CanalClient類(2)創(chuàng)建并編寫CanalClient類。CanalClient類主要類似于一個客戶端工具,通過該工具可以實現(xiàn)對Canal服務器數(shù)據(jù)進行實時的檢測,這里我們主要是實現(xiàn)數(shù)據(jù)的打印輸出到控制臺,因為要監(jiān)測數(shù)據(jù)的變化,所以我們這里先要開啟運行Canal監(jiān)測類,一旦數(shù)據(jù)發(fā)生變化,就可以實現(xiàn)數(shù)據(jù)的實時監(jiān)測并打印輸出。示例代碼如下所示。importcom.alibaba.fastjson.JSONObject;Importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importtocol.CanalEntry;importtocol.Message;importtobuf.ByteString;importtobuf.InvalidProtocolBufferException;import.InetSocketAddress;importjava.util.List;importjava.util.Random;任務實踐2publicclassCanalClient{publicstaticvoidmain(String[]args)throwsInvalidProtocolBufferException{
//1.獲取canal連接對象CanalConnectorcanalConnector=CanalConnectors.newSingleConnector(newInetSocketAddress("node01",11111),"example","","");while(true){
//2.獲取連接canalConnector.connect();
//3.指定要監(jiān)控的數(shù)據(jù)庫canalConnector.subscribe("db2.*");
//4.獲取MessageMessagemessage=canalConnector.get(100);List<CanalEntry.Entry>entries=message.getEntries();if(entries.size()<=0){
System.out.println("沒有數(shù)據(jù),休息一會");try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}}else{任務實踐2for(CanalEntry.Entryentry:entries){
//TODO獲取表名StringtableName=entry.getHeader().getTableName();
//TODOEntry類型CanalEntry.EntryTypeentryType=entry.getEntryType();
//TODO判斷entryType是否為ROWDATAif(CanalEntry.EntryType.ROWDATA.equals(entryType)){
//TODO序列化數(shù)據(jù)ByteStringstoreValue=entry.getStoreValue();
//TODO反序列化CanalEntry.RowChangerowChange=CanalEntry.RowChange.parseFrom(storeValue);
//TODO獲取事件類型CanalEntry.EventType eventType =rowChange.getEventType();
//TODO獲取具體的數(shù)據(jù)List<CanalEntry.RowData>rowDatasList=rowChange.getRowDatasList();
//TODO遍歷并打印數(shù)據(jù)for(CanalEntry.RowDatarowData:rowDatasList){List<CanalEntry.Column>beforeColumnsList=rowData.getBeforeColumnsList();JSONObjectbeforeData=newJSONObject();for(CanalEntry.Columncolumn:beforeColumnsList){任務實踐2beforeData.put(column.getName(),column.getValue());}JSONObjectafterData=newJSONObject();List<CanalEntry.Column>afterColumnsList=rowData.getAfterColumnsList();for(CanalEntry.Columncolumn:afterColumnsList){afterData.put(column.getName(),column.getValue());}System.out.println("TableName:"+tableName+",EventType:"+eventType+",After:"+beforeData+",After:"+afterData);}}}}}}}任務實踐2IDEA創(chuàng)建Maven工程并編寫CanalClient類(3)修改db2數(shù)據(jù)庫中的數(shù)據(jù),監(jiān)測控制臺的輸出。示例代碼如下所示。#插入數(shù)據(jù)INSERTINTOuser_infoVALUES('1001','zhangsan','male'),('1002','lisi','female');鞏固與提高Canal的Binlog記錄級別設(shè)置為row的意義是什么?它與Canal的功能和應用有什么關(guān)系?任務1Linux系統(tǒng)MySQL5.7數(shù)據(jù)庫安裝CONTENTS任務2MySQL開啟Binlog和數(shù)據(jù)準備任務3Canal下載和安裝任務5實時數(shù)據(jù)監(jiān)控測試之Kafka模式任務4實時數(shù)據(jù)監(jiān)控測試之TCP模式Canal、Maxwell任務6Maxwell初始化和進程啟動任務導航本模塊知識框架:本模塊主要是對Canal進行配置,監(jiān)控數(shù)據(jù)庫數(shù)據(jù)的變化。通過Kafka-Consumer把獲得變化的json數(shù)據(jù)輸出到控制臺。知識準備Canal之Kafka模式和基本流程在我們使用Canal時,經(jīng)常會使用Kafka作為消息隊列,可以實現(xiàn)將數(shù)據(jù)庫的增量數(shù)據(jù)變更以事件的形式發(fā)布到Kafka中,供其他應用程序進行消費和處理。這種模式被稱為Canal-Kafka模式。在Canal-Kafka模式下,需要配置CanalServer和Kafka的相關(guān)參數(shù)和連接信息,確保CanalServer能夠正確地將數(shù)據(jù)發(fā)送到Kafka并供應用程序消費。同時,應用程序也需要配置合適的Kafka消費者來接收和處理事件數(shù)據(jù)。我們可以通過以下幾個步驟來實現(xiàn)Canal-Kafka模式:(1)CanalServer訂閱數(shù)據(jù)庫的數(shù)據(jù)變更,并解析binlog日志,捕獲增量數(shù)據(jù)變更。(2)CanalServer將捕獲的數(shù)據(jù)變更轉(zhuǎn)換為Canal內(nèi)部定義的數(shù)據(jù)結(jié)構(gòu),如CanalEntry。(3)CanalServer將轉(zhuǎn)換后的數(shù)據(jù)以事件的形式發(fā)送到Kafka的特定主題(Topic)中。(4)Kafka作為一個分布式消息隊列,將事件數(shù)據(jù)持久化存儲,并提供高吞吐量和水平擴展的能力。(5)應用程序作為Kafka的消費者,可以通過訂閱特定的Kafka主題,實時接收數(shù)據(jù)庫的增量數(shù)據(jù)變更事件。(6)應用程序根據(jù)接收到的變更事件,進行相應的邏輯處理和業(yè)務操作。知識準備Canal之Kafka模式和基本流程本任務中,我們主要實踐配置CanalServer與Kafka集群的連接,在CanalServer的配置文件中,添加配置項以連接到Kafka集群。配置項包括Kafka的連接地址、主題名稱等信息。這樣CanalServer才能夠?qū)⒔馕龅脑隽繑?shù)據(jù)發(fā)送到指定的Kafka主題中。任務實踐1創(chuàng)建數(shù)據(jù)庫db3并開啟Binlog監(jiān)控(1)創(chuàng)建數(shù)據(jù)庫db3mysql>createdatabasedb3;(2)選擇db3數(shù)據(jù)庫在數(shù)據(jù)庫中創(chuàng)建數(shù)據(jù)庫表tb_product表,具體操作如下所示。mysql>usedb3;Databasechangedmysql>createtabletb_product(->product_idvarchar(20),->product_namevarchar(20),->product_pricedouble->);QueryOK,0rowsaffected(0.00sec)任務實踐1創(chuàng)建數(shù)據(jù)庫db3并開啟Binlog監(jiān)控(3)開啟MySQL的Binlog并監(jiān)控db3數(shù)據(jù)庫。示例代碼如下所示。#編輯f配置文件(sudovim/etc/f),添加以下內(nèi)容:#MySQL服務器idserver-id=1log-bin=mysql-bin#binlog記錄級別rowbinlog_format=row#指定數(shù)據(jù)庫binlog-do-db=db3任務實踐2修改Canal配置文件(1)修改Canal配置,開啟Kafka監(jiān)測,修改perties文件#canal的輸出model,默認tcp,改為輸出到kafka##########################################################commonargument##############################################################canal.ip=canal.port=11111canal.metrics.pull.port=11112canal.zkServers=#flushdatatozkcanal.zookeeper.flush.period=1000canal.withoutNetty=false#tcp,kafka,RocketMQcanal.serverMode=kafka#flushmetacursor/parsepositiontofile#修改Kafka集群的地址###########################################################MQ###############################################################canal.mq.servers=node01:9092,node02:9092,node03:9092任務實踐2修改Canal配置文件(2)修改perties,示例代碼如下所示。#修改perties輸出到Kafka的主題以及分區(qū)數(shù)#mqconfigcanal.mq.topic=canal_testcanal.mq.partitionsNum=1#hashpartitionconfig#canal.mq.partition=0#canal.mq.partitionHash=mytest.person:id,mytest.role:id注意:默認還是輸出到指定
Kafka主題的一個kafka分區(qū),因為多個分區(qū)并行可能會打亂binlog的順序,如果要提高并行度,首先設(shè)置kafka的分區(qū)數(shù)>1,然后設(shè)置canal.mq.partitionHash屬性。
任務實踐3插入數(shù)據(jù)測試采集結(jié)果1.啟動Canal/opt/module/canal/bin/startup.sh啟動Canal后,輸入命令jps,如果可以看到CanalLauncher,表示CanalServer啟動成功,同時會創(chuàng)建
canal_test主題。2.啟動Kafka消費客戶端#啟動Kafka消費客戶端測試,查看消費情況/opt/module/canal/bin/kafka-console-consumer.sh--bootstrap-servernode01:9092--topiccanal_test任務實踐3插入數(shù)據(jù)測試采集結(jié)果3.修改數(shù)據(jù)庫數(shù)據(jù)#插入數(shù)據(jù)INSERTINTOtb_productVALUES('1001','book',20.0),('1002','pencil',5.0);4.查看Kafka消費者控制臺輸出#Kafka消費者控制臺{"data":[{"id":"1001","name":"zhangsan","sex":"male"},{"id":"1002","name":"lisi","sex":"female"}],"database":"gmall-2021","es":1639360729000,"id":1,"isDdl":false,"MySQLType":{"id":"varchar(255)","name":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1639361038454,"type":"INSERT"}鞏固與提高在實施Canal-Kafka模式時,有哪些道德和法律方面的考慮需要注意?任務1Linux系統(tǒng)MySQL5.7數(shù)據(jù)庫安裝CONTENTS任務2MySQL開啟Binlog和數(shù)據(jù)準備任務3Canal下載和安裝任務5實時數(shù)據(jù)監(jiān)控測試之Kafka模式任務4實時數(shù)據(jù)監(jiān)控測試之TCP模式Canal、Maxwell任務6Maxwell初始化和進程啟動任務導航本模塊知識框架:本模塊主要是通過下載和安裝MaxWell,開啟MySQL的Binlog,并在MySQL數(shù)據(jù)庫中創(chuàng)建用戶和對用戶進行權(quán)限的分配,通過本案例的學習和實操,讓大家能夠熟悉MaxWell的基本操作流程。本模塊主要是MaxWe軟件部署與配置,以及MySQL數(shù)據(jù)庫、表的創(chuàng)建,為后面模塊學習奠定基礎(chǔ)。知識準備安裝和配置Maxwell(一)Maxwell軟件相關(guān)資源(1)Maxwell官網(wǎng)地址:https://maxwells-daemon.io/(2)文檔查看地址:https://maxwells-daemon.io/quickstart/。(二)安裝部署(1)軟件基礎(chǔ)在安裝Maxwell之前,你需要確保系統(tǒng)已經(jīng)安裝了以下依賴:①Java②MySQL客戶端③kafka以上軟件的安裝在本教材中都有涉及,同學們可以自行翻閱前面的知識,回顧并提前安裝好,在本任務中不再贅述。(2)通過xftp工具,將下載好的maxwell-1.29.2.tar.gz上傳到服務器節(jié)點的/opt/software目錄下。(3)解壓maxwell-1.29.2.tar.gz的安裝包到/opt/module下,命令如下:tar-zxvf/opt/software/maxwell-1.29.2.tar.gz-C/opt/module知識準備MySQL環(huán)境準備(1)修改MySQL的配置文件①開啟MySQLBinlog設(shè)置#編輯f配置文件(sudovim/etc/f)在[MySQLd]模塊下添加以下內(nèi)容[MySQLd]server_id=1log-bin=MySQ
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 法制史自考試題及答案
- 醫(yī)院外科考試實操題庫及答案
- 道橋測量考核試題及答案
- 初中課外讀物試題及答案
- 中國華錄集團有限公司2026屆校園招聘正式開啟備考題庫必考題
- 樂清市人力資源和社會保障局關(guān)于公開選調(diào)2名下屬事業(yè)單位工作人員的考試備考題庫必考題
- 北京市豐臺區(qū)北宮鎮(zhèn)社區(qū)衛(wèi)生服務中心招聘3人一備考題庫附答案
- 古藺縣教育和體育局關(guān)于公開遴選古藺縣城區(qū)公辦幼兒園副園長的考試備考題庫必考題
- 威海銀行2026屆校園招聘備考題庫必考題
- 岳池縣酉溪鎮(zhèn)人民政府關(guān)于公開招聘社區(qū)專職網(wǎng)格員的備考題庫必考題
- 網(wǎng)絡(luò)安全運維與管理規(guī)范(標準版)
- 2026年包頭職業(yè)技術(shù)學院高職單招職業(yè)適應性考試模擬試題含答案解析
- 2026年XX醫(yī)院兒科護理工作計劃
- 液冷系統(tǒng)防漏液和漏液檢測設(shè)計研究報告
- 2025-2026學年貴州省安順市多校高一(上)期末物理試卷(含答案)
- 呼吸機相關(guān)肺炎預防策略指南2026
- 妊娠期缺鐵性貧血中西醫(yī)結(jié)合診療指南-公示稿
- 北京市2025年七年級上學期期末考試數(shù)學試卷三套及答案
- 2026年上海理工大學單招職業(yè)適應性測試題庫附答案
- TCEC電力行業(yè)數(shù)據(jù)分類分級規(guī)范-2024
- 建設(shè)用地報批培訓課件
評論
0/150
提交評論