版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
實時數(shù)倉之Kafka-Flink-Hive集成原理和實戰(zhàn)代碼(原理+實戰(zhàn))1、Flink-Hive理論1.1、Flink-Hive介紹在Flink1.11版本中,社區(qū)新增了一大功能是實時數(shù)倉,可以通過kafka,將kafkasink端的數(shù)據(jù)實時寫入到Hive中。為實現(xiàn)這個功能、Flink1.11版本主要做了以下改變:將FlieSystemStreamingSink重新修改,增加了分區(qū)提交和滾動策略機(jī)制。讓HiveStreaming
sink重新使用文件系統(tǒng)流作為接收器。可以通過Flink社區(qū),查看FLIP-85FilesystemconnectorinTable的設(shè)計思路。1.2、Flink-Hive集成原理Flink與Hive集成原理圖如下:主要包含三部分內(nèi)容:HiveDialect。Flink1.1新引入了Hive方言,所以在FlinkSQL中可以編寫HIve語法,即HiveDialect。編寫HIveSQL后,F(xiàn)linkSQLPlanner會將SQL進(jìn)行解析,驗證,轉(zhuǎn)換成邏輯計劃,物理計劃,最終變成Jobgraph。HiveCatalog。HiveCatalog作為Flink和Hive的持久化介質(zhì),會將不同會話的Flink元數(shù)據(jù)存儲到HiveMetastore中。1.3、Flink-Hive版本支持Flink目前支持Hive的1.x、2.x、3.x,每個大的版本對于的Flink依賴如下:1.4、FlinkSQL支持Hive語言FlinkSQL支持兩種SQL語言,分別為default和hive。配置方式也包含兩種,配置如下圖所示:通過客戶端配置。通過SQL配置。2、kafka-Flink-Hive集群配置需求:實時將kafka中的數(shù)據(jù)通過flinkSql計算存儲到hive數(shù)據(jù)倉庫中。2.1集群部署配置信息如下:Hadoop:hadoop2.6.4Kafka:kafka_2.11-2.2.0Flink:flink1.13.0Hive:hive-2.3.4-binZookeeper:zookeeper-查詢結(jié)果要求希望FlinkSql查詢kafka輸入的數(shù)據(jù)的表結(jié)構(gòu)如下:希望FlinkSQL實時將kafka中的數(shù)據(jù)插入Hive查詢的結(jié)果根據(jù)分區(qū)查詢?nèi)缦拢?.3kafka啟動命令kafka啟動nohup
./kafka-server-start.sh
../config/perties
&查看kafkaTopic./kafka-topics.sh
--list
--bootstrap-server
61:9092
//查看是否有需要用到的topic主題創(chuàng)建kafkaTopickafka-topics.sh
--create
--bootstrap-server
61:9092
--topic
test
--partitions
10
--replication-factor
1啟動kafka生產(chǎn)者
讓批量傳輸數(shù)據(jù)kafka-console-producer.sh
--broker-list
61:9092
--topic
test往kafka中批量傳入的數(shù)據(jù)源{"user_id":
"1",
"order_amount":"124.5",
"log_ts":
"2020-08-24
10:20:15"}
{"user_id":
"2",
"order_amount":"38.4",
"log_ts":
"2020-08-24
11:20:15"}
{"user_id":
"3",
"order_amount":"176.9",
"log_ts":
"2020-08-25
13:20:15"}
{"user_id":
"4",
"order_amount":"302",
"log_ts":
"2020-08-25
14:20:15"}
{"user_id":
"5",
"order_amount":"124.5",
"log_ts":
"2020-08-26
14:26:15"}
{"user_id":
"6",
"order_amount":"38.4",
"log_ts":
"2020-08-26
15:20:15"}
{"user_id":
"7",
"order_amount":"176.9",
"log_ts":
"2020-08-27
16:20:15"}
{"user_id":
"8",
"order_amount":"302",
"log_ts":
"2020-08-27
17:20:15"}
{"user_id":
"9",
"order_amount":"124.5",
"log_ts":
"2020-08-24
10:20:15"}
{"user_id":
"10",
"order_amount":"124.6",
"log_ts":
"2020-08-24
10:21:15"}
{"user_id":
"11",
"order_amount":"124.7",
"log_ts":
"2020-08-24
10:22:15"}
{"user_id":
"12",
"order_amount":"124.8",
"log_ts":
"2020-08-24
10:23:15"}
{"user_id":
"13",
"order_amount":"124.9",
"log_ts":
"2020-08-24
10:24:15"}
{"user_id":
"14",
"order_amount":"125.5",
"log_ts":
"2020-08-24
10:25:15"}
{"user_id":
"15",
"order_amount":"126.5",
"log_ts":
"2020-08-24
10:26:15"}2.4Hive集成Flinkhive安裝
修改hive-env.sh#
Set
HADOOP_HOME
to
point
to
a
specific
hadoop
install
directory
HADOOP_HOME=/root/sd/hadoop-2.6.4
#
Hive
Configuration
Directory
can
be
controlled
by:
export
HIVE_CONF_DIR=/root/sd/apache-hive-2.3.4-bin/conf
#
Folder
containing
extra
libraries
required
for
hive
compilation/execution
can
be
controlled
by:
export
HIVE_AUX_JARS_PATH=/root/sd/apache-hive-2.3.4-bin/lib由于hive的文件本身就在hdfs中保存的,所以需要指定Hadoop_Home的路徑,同時指定配置文件路徑和依賴包的路徑。修改hive-site.xml文件<!--指定mysql數(shù)據(jù)庫連接的database-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://61:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC
connect
string
for
a
JDBC
metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver
class
name
for
a
JDBC
metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username
to
use
against
metastore
database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password
to
use
against
metastore
database</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hlink163:9083</value>
<description>Thrift
URI
for
the
remote
metastore.
Used
by
metastore
client
to
connect
to
remote
metastore.</description>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/root/sd/apache-hive-2.3.4-bin/tmp/operation_logs</value>
<description>Top
level
directory
where
operation
logs
are
stored
if
logging
functionality
is
enabled</description>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive</value>
<description>HDFS
root
scratch
dir
for
Hive
jobs
which
gets
created
with
write
all
(733)
permission.
For
each
connecting
user,
an
HDFS
scratch
dir:
${hive.exec.scratchdir}/<username>
is
created,
with
${hive.scratch.dir.permission}.
</description>
</property>
<property>
<name>hive.exec.local.scratchdir</name>
<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/local</value>
<description>Local
scratch
space
for
Hive
jobs</description>
</property>
<property>
<name>hive.downloaded.resources.dir</name>
<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/resources</value>
<description>Temporary
local
directory
for
added
resources
in
the
remote
file
system.</description>
</property>添加Flink與Hadoop的依賴
在flink-conf.yaml中添加hadoop依賴.2.5Hive集群啟動啟動hive服務(wù)器hive
--service
metastore
//端口號9083
可以使用命令查詢一下,看是否啟動成功
netstat
-ntpl
|
grep
90832.6Flink集群啟動啟動FlinkSQL(在bin目錄下)./sql-client.sh
embedded
-d
../conf/sql-client-defaults.yaml在flinksql下查看hive的catalogsshow
catalogs
結(jié)果如下:使用myhivecataloguse
catalog
myhive;
show
tables;
3、kafka-Flink-HiveDDL3.1、創(chuàng)建flink讀取kafka的表(source)#
指定使用flink
sql默認(rèn)的語言
SET
table.sql-dialect=default;
CREATE
TABLE
log_kafka
(
user_id
STRING,
order_amount
DOUBLE,
log_ts
TIMESTAMP(3),
WATERMARK
FOR
log_ts
AS
log_ts
-
INTERVAL
'5'
SECOND
)
WITH
(
'connector'
=
'kafka',
'topic'
=
'test',
'properties.bootstrap.servers'
=
'61:9092',
'scan.startup.mode'
=
'earliest-offset',
'format'
=
'json',
'json.ignore-parse-errors'
=
'true',
'json.fail-on-missing-field'
=
'false',
'properties.group.id'
=
'flink1'
);kafka消費(fèi)的啟動模式有'earliest-offset','latest-offset','group-offsets','timestamp','specific-offsets'等3.2、創(chuàng)建flink寫入hive表(sink)SET
table.sql-dialect=hive;
CREATE
TABLE
log_hive
(
user_id
STRING,
order_amount
DOUBLE
)
PARTITIONED
BY
(dt
STRING,
hr
STRING)
STORED
AS
parquet
TBLPROPERTIES
(
'partition.time-extractor.timestamp-pattern'='$dt
$hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1min',
'sink.semantic'
=
'exactly-once',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'
='1min',
'sink.rolling-policy.check-interval'='1min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);配置解釋:'sink.partition-commit.trigger'='partition-time',
--使用partition中抽取時間,加上watermark決定partitoncommit的時機(jī)'partition.time-extractor.timestamp-pattern'='$dt$hour:00:00',
--配置hour級別的partition時間抽取策略,這個例子中dt字段是yyyy-MM-dd格式的天,hour是0-23的小時,timestamp-pattern定義了如何從這兩個partition字段推出完整的timestamp'sink.partition-commit.delay'='1min',
--配置dalay為分鐘級,當(dāng)watermark>partition時間+1分鐘,會commit這個partition'sink.partition-commit.policy.kind’='metastore,success-file'
--partitiioncommit的策略是:先更新metastore(addPartition),再寫SUCCESS文件3.3、將數(shù)據(jù)插入hive中INSERT
INTO
TABLE
log_hive
SELECT
user_id,
order_amount,DATE_FORMAT(log_ts,
'yyyy-MM-dd'),
DATE_FORMAT(log_ts,
'HH')
FROM
log_kafka;
3.4、查詢結(jié)果--
batch
sql,
select
with
partition
pruning
SELECT
*
FROM
hive_table
WHERE
dt='2020-08-25'
and
hr='16';
4、kafka-Flink-HiveTableAPI編寫4.1pom.xml配置<?xml
version="1.0"
encoding="UTF-8"?>
<project
xmlns="/POM/4.0.0"
xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0
/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink</groupId>
<artifactId>flinkhive</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.bin.version>2.11</scala.bin.version>
<flink.version>1.13.0</flink.version>
<hadoop.version>2.6.4</hadoop.version>
<hive.version>2.3.4</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
</project>4.2代碼存放路徑截圖Scala版本代碼:import
java.time.Duration
import
org.apache.flink.streaming.api.{CheckpointingMode,
TimeCharacteristic}
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import
org.apache.flink.table.api.{EnvironmentSettings,
SqlDialect}
import
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import
org.apache.flink.table.catalog.hive.HiveCatalog
object
KafkaToHive
{
def
main(args:
Array[String]):
Unit
=
{
val
streamEnv
=
StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)
val
tableEnvSettings
=
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val
tableEnv
=
StreamTableEnvironment.create(streamEnv,
tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(20))
val
catalogName
=
"my_catalog"
val
catalog
=
new
HiveCatalog(
catalogName,
//
catalog
name
"default",
//
default
database
"./src/main/resources",
//
Hive
config
(hive-site.xml)
directory
"2.3.4"
//
Hive
version
)
tableEnv.registerCatalog(catalogName,
catalog)
tableEnv.useCatalog(catalogName)
tableEnv.executeSql("CREATE
DATABASE
IF
NOT
EXISTS
stream_tmp")
tableEnv.executeSql("DROP
TABLE
IF
EXISTS
stream_tmp.log_kafka")
tableEnv.executeSql(
"""
|CREATE
TABLE
stream_tmp.log_kafka
(
|
user_id
STRING,
|
order_amount
DOUBLE,
|
log_ts
TIMESTAMP(3),
|
WATERMARK
FOR
log_ts
AS
log_ts
-
INTERVAL
'5'
SECOND
|)
WITH
(
|
'connector'
=
'kafka',
|
'topic'
=
'test',
|
'properties.bootstrap.servers'
=
'hlink163:9092',
|
'properties.group.id'
=
'flink1',
|
'scan.startup.mode'
=
'earliest-offset',
|
'format'
=
'json',
|
'json.fail-on-missing-field'
=
'false',
|
'json.ignore-parse-errors'
=
'true'
|)
""".stripMargin
)
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.executeSql("CREATE
DATABASE
IF
NOT
EXISTS
hive_tmp")
tableEnv.executeSql("DROP
TABLE
IF
EXISTS
hive_tmp.log_hive")
tableEnv.executeSql(
"""
|CREATE
TABLE
hive_tmp.log_hive
(
|
user_id
STRING,
|
order_amount
DOUBLE
|)
PARTITIONED
BY
(
|
dt
STRING,
|
hr
STRING
|)
STORED
AS
PARQUET
|TBLPROPERTIES
(
|
'sink.partition-commit.trigger'
=
'partition-time',
|
'sink.partition-commit.delay'
=
'1
min',
|
'format'
=
'json',
|
'sink.partition-commit.policy.kind'
=
'metastore,success-file',
|
'partition.time-extractor.timestamp-pattern'='$dt
$hr:00:00'
|)
""".stripMargin
)
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql(
"""
|INSERT
INTO
hive_tmp.log_hive
|SELECT
|
user_id,
|
order_amount,
|
DATE_FORMAT(log_ts,
'yyyy-MM-dd'),
DATE_FORMAT(log_ts,
'HH')
|
FROM
stream_tmp.log_kafka
""".stripMargin
)
}
}
java版本代碼:import
org.apache.flink.streaming.api.CheckpointingMode;
import
org.apache.flink.streaming.api.TimeCharacteristic;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.table.api.EnvironmentSettings;
import
org.apache.flink.table.api.SqlDialect;
import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import
org.apache.flink.table.catalog.hive.HiveCatalog;
import
java.time.Duration;
/**
*
類描述:
*
*
@ClassName
KafkaToHive
*
@Description:
*
@Author:
lyz
*
@Date:
2021/9/6
下午9:50
*/
public
class
KafkaToHive
{
public
static
void
main(String[]
args)
{
StreamExecutionEnvironment
senv
=
StreamExecutionEnvironment.getExecutionEnvironment();
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
senv.setParallelism(3);
EnvironmentSettings
tableEnvSettings
=
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode()
.build();
StreamTableEnvironment
tableEnv
=
StreamTableEnvironment.create(senv,
tableEnvSettings);
//
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(20));
String
catalogName
=
"my_catalog";
HiveCatalog
catalog
=
new
HiveCatalog(
catalogName,
//
catalog
name
"default",
//
default
database
"./src/main/resources",
//
Hive
config
(hive-site.xml)
directory
"2.3.4"
//
Hive
version
);
tableEnv.registerCatalog(catalogName,
catalog);
tableEnv.useCatalog(catalogName);
tableEnv.executeSql("CREATE
DATABASE
IF
NOT
EXISTS
stream_tmp");
tableEnv.executeSql("DROP
TABLE
IF
EXISTS
stream_tmp.log_kafka");
tableEnv.executeSql("create
table
stream_tmp.log_kafka("
+
"user_id
String,\n"
+
"order_amount
Double,\n"
+
"log_ts
Timestamp(3),\n"
+
"WATERMARK
FOR
log_ts
AS
log_ts
-INTERVAL
'5'
SECOND"
+
")WITH("
+
"
'connector'
=
'kafka',\n"
+
"'topic'
=
'test',\n"
+
"
'properties.bootstrap.servers'
=
'61:9092',\n"
+
"'properties.group.id'
=
'flink1',\n"
+
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026年三明醫(yī)學(xué)科技職業(yè)學(xué)院單招職業(yè)技能測試題庫含答案詳解
- 2026年上海立信會計金融學(xué)院單招職業(yè)傾向性測試題庫及答案詳解一套
- 2026年四川藝術(shù)職業(yè)學(xué)院單招職業(yè)適應(yīng)性考試題庫含答案詳解
- 2026年蘇州高博軟件技術(shù)職業(yè)學(xué)院單招職業(yè)適應(yīng)性考試題庫及完整答案詳解1套
- 天津市五區(qū)縣重點(diǎn)校聯(lián)考2024-2025學(xué)年高二上學(xué)期11月期中政治試題含答案高二政治答案
- 二建建筑面試題及答案
- 2025年西北工業(yè)大學(xué)材料學(xué)院特種陶瓷及復(fù)合材料制備與評價項目組招聘備考題庫及答案詳解1套
- 2025年重慶長江軸承股份有限公司招聘13人備考題庫及一套完整答案詳解
- 隨州市中心醫(yī)院2026年招聘45人備考題庫及參考答案詳解1套
- 上海七十邁數(shù)字科技2026校園招聘備考題庫完整參考答案詳解
- GB/T 3521-2023石墨化學(xué)分析方法
- 一年級數(shù)學(xué)重疊問題練習(xí)題
- 三維動畫及特效制作智慧樹知到課后章節(jié)答案2023年下吉林電子信息職業(yè)技術(shù)學(xué)院
- 胰腺囊腫的護(hù)理查房
- 臨床醫(yī)學(xué)概論常見癥狀課件
- 事業(yè)單位專業(yè)技術(shù)人員崗位工資標(biāo)準(zhǔn)表
- 知識圖譜與自然語言處理的深度融合
- 物業(yè)管理理論實務(wù)教材
- 仁川國際機(jī)場
- 全檢員考試試題
- 光刻和刻蝕工藝
評論
0/150
提交評論