實時數(shù)倉 之 Kafka-Flink-Hive集成原理和實戰(zhàn)代碼(原理+實戰(zhàn))_第1頁
實時數(shù)倉 之 Kafka-Flink-Hive集成原理和實戰(zhàn)代碼(原理+實戰(zhàn))_第2頁
實時數(shù)倉 之 Kafka-Flink-Hive集成原理和實戰(zhàn)代碼(原理+實戰(zhàn))_第3頁
實時數(shù)倉 之 Kafka-Flink-Hive集成原理和實戰(zhàn)代碼(原理+實戰(zhàn))_第4頁
實時數(shù)倉 之 Kafka-Flink-Hive集成原理和實戰(zhàn)代碼(原理+實戰(zhàn))_第5頁
已閱讀5頁,還剩14頁未讀, 繼續(xù)免費(fèi)閱讀

付費(fèi)下載

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

實時數(shù)倉之Kafka-Flink-Hive集成原理和實戰(zhàn)代碼(原理+實戰(zhàn))1、Flink-Hive理論1.1、Flink-Hive介紹在Flink1.11版本中,社區(qū)新增了一大功能是實時數(shù)倉,可以通過kafka,將kafkasink端的數(shù)據(jù)實時寫入到Hive中。為實現(xiàn)這個功能、Flink1.11版本主要做了以下改變:將FlieSystemStreamingSink重新修改,增加了分區(qū)提交和滾動策略機(jī)制。讓HiveStreaming

sink重新使用文件系統(tǒng)流作為接收器。可以通過Flink社區(qū),查看FLIP-85FilesystemconnectorinTable的設(shè)計思路。1.2、Flink-Hive集成原理Flink與Hive集成原理圖如下:主要包含三部分內(nèi)容:HiveDialect。Flink1.1新引入了Hive方言,所以在FlinkSQL中可以編寫HIve語法,即HiveDialect。編寫HIveSQL后,F(xiàn)linkSQLPlanner會將SQL進(jìn)行解析,驗證,轉(zhuǎn)換成邏輯計劃,物理計劃,最終變成Jobgraph。HiveCatalog。HiveCatalog作為Flink和Hive的持久化介質(zhì),會將不同會話的Flink元數(shù)據(jù)存儲到HiveMetastore中。1.3、Flink-Hive版本支持Flink目前支持Hive的1.x、2.x、3.x,每個大的版本對于的Flink依賴如下:1.4、FlinkSQL支持Hive語言FlinkSQL支持兩種SQL語言,分別為default和hive。配置方式也包含兩種,配置如下圖所示:通過客戶端配置。通過SQL配置。2、kafka-Flink-Hive集群配置需求:實時將kafka中的數(shù)據(jù)通過flinkSql計算存儲到hive數(shù)據(jù)倉庫中。2.1集群部署配置信息如下:Hadoop:hadoop2.6.4Kafka:kafka_2.11-2.2.0Flink:flink1.13.0Hive:hive-2.3.4-binZookeeper:zookeeper-查詢結(jié)果要求希望FlinkSql查詢kafka輸入的數(shù)據(jù)的表結(jié)構(gòu)如下:希望FlinkSQL實時將kafka中的數(shù)據(jù)插入Hive查詢的結(jié)果根據(jù)分區(qū)查詢?nèi)缦拢?.3kafka啟動命令kafka啟動nohup

./kafka-server-start.sh

../config/perties

&查看kafkaTopic./kafka-topics.sh

--list

--bootstrap-server

61:9092

//查看是否有需要用到的topic主題創(chuàng)建kafkaTopickafka-topics.sh

--create

--bootstrap-server

61:9092

--topic

test

--partitions

10

--replication-factor

1啟動kafka生產(chǎn)者

讓批量傳輸數(shù)據(jù)kafka-console-producer.sh

--broker-list

61:9092

--topic

test往kafka中批量傳入的數(shù)據(jù)源{"user_id":

"1",

"order_amount":"124.5",

"log_ts":

"2020-08-24

10:20:15"}

{"user_id":

"2",

"order_amount":"38.4",

"log_ts":

"2020-08-24

11:20:15"}

{"user_id":

"3",

"order_amount":"176.9",

"log_ts":

"2020-08-25

13:20:15"}

{"user_id":

"4",

"order_amount":"302",

"log_ts":

"2020-08-25

14:20:15"}

{"user_id":

"5",

"order_amount":"124.5",

"log_ts":

"2020-08-26

14:26:15"}

{"user_id":

"6",

"order_amount":"38.4",

"log_ts":

"2020-08-26

15:20:15"}

{"user_id":

"7",

"order_amount":"176.9",

"log_ts":

"2020-08-27

16:20:15"}

{"user_id":

"8",

"order_amount":"302",

"log_ts":

"2020-08-27

17:20:15"}

{"user_id":

"9",

"order_amount":"124.5",

"log_ts":

"2020-08-24

10:20:15"}

{"user_id":

"10",

"order_amount":"124.6",

"log_ts":

"2020-08-24

10:21:15"}

{"user_id":

"11",

"order_amount":"124.7",

"log_ts":

"2020-08-24

10:22:15"}

{"user_id":

"12",

"order_amount":"124.8",

"log_ts":

"2020-08-24

10:23:15"}

{"user_id":

"13",

"order_amount":"124.9",

"log_ts":

"2020-08-24

10:24:15"}

{"user_id":

"14",

"order_amount":"125.5",

"log_ts":

"2020-08-24

10:25:15"}

{"user_id":

"15",

"order_amount":"126.5",

"log_ts":

"2020-08-24

10:26:15"}2.4Hive集成Flinkhive安裝

修改hive-env.sh#

Set

HADOOP_HOME

to

point

to

a

specific

hadoop

install

directory

HADOOP_HOME=/root/sd/hadoop-2.6.4

#

Hive

Configuration

Directory

can

be

controlled

by:

export

HIVE_CONF_DIR=/root/sd/apache-hive-2.3.4-bin/conf

#

Folder

containing

extra

libraries

required

for

hive

compilation/execution

can

be

controlled

by:

export

HIVE_AUX_JARS_PATH=/root/sd/apache-hive-2.3.4-bin/lib由于hive的文件本身就在hdfs中保存的,所以需要指定Hadoop_Home的路徑,同時指定配置文件路徑和依賴包的路徑。修改hive-site.xml文件<!--指定mysql數(shù)據(jù)庫連接的database-->

<property>

<name>javax.jdo.option.ConnectionURL</name>

<value>jdbc:mysql://61:3306/hive?createDatabaseIfNotExist=true</value>

<description>JDBC

connect

string

for

a

JDBC

metastore</description>

</property>

<property>

<name>javax.jdo.option.ConnectionDriverName</name>

<value>com.mysql.jdbc.Driver</value>

<description>Driver

class

name

for

a

JDBC

metastore</description>

</property>

<property>

<name>javax.jdo.option.ConnectionUserName</name>

<value>root</value>

<description>username

to

use

against

metastore

database</description>

</property>

<property>

<name>javax.jdo.option.ConnectionPassword</name>

<value>123456</value>

<description>password

to

use

against

metastore

database</description>

</property>

<property>

<name>hive.metastore.uris</name>

<value>thrift://hlink163:9083</value>

<description>Thrift

URI

for

the

remote

metastore.

Used

by

metastore

client

to

connect

to

remote

metastore.</description>

</property>

<property>

<name>datanucleus.schema.autoCreateAll</name>

<value>true</value>

</property>

<property>

<name>hive.server2.logging.operation.log.location</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/operation_logs</value>

<description>Top

level

directory

where

operation

logs

are

stored

if

logging

functionality

is

enabled</description>

</property>

<property>

<name>hive.exec.scratchdir</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive</value>

<description>HDFS

root

scratch

dir

for

Hive

jobs

which

gets

created

with

write

all

(733)

permission.

For

each

connecting

user,

an

HDFS

scratch

dir:

${hive.exec.scratchdir}/<username>

is

created,

with

${hive.scratch.dir.permission}.

</description>

</property>

<property>

<name>hive.exec.local.scratchdir</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/local</value>

<description>Local

scratch

space

for

Hive

jobs</description>

</property>

<property>

<name>hive.downloaded.resources.dir</name>

<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/resources</value>

<description>Temporary

local

directory

for

added

resources

in

the

remote

file

system.</description>

</property>添加Flink與Hadoop的依賴

在flink-conf.yaml中添加hadoop依賴.2.5Hive集群啟動啟動hive服務(wù)器hive

--service

metastore

//端口號9083

可以使用命令查詢一下,看是否啟動成功

netstat

-ntpl

|

grep

90832.6Flink集群啟動啟動FlinkSQL(在bin目錄下)./sql-client.sh

embedded

-d

../conf/sql-client-defaults.yaml在flinksql下查看hive的catalogsshow

catalogs

結(jié)果如下:使用myhivecataloguse

catalog

myhive;

show

tables;

3、kafka-Flink-HiveDDL3.1、創(chuàng)建flink讀取kafka的表(source)#

指定使用flink

sql默認(rèn)的語言

SET

table.sql-dialect=default;

CREATE

TABLE

log_kafka

(

user_id

STRING,

order_amount

DOUBLE,

log_ts

TIMESTAMP(3),

WATERMARK

FOR

log_ts

AS

log_ts

-

INTERVAL

'5'

SECOND

)

WITH

(

'connector'

=

'kafka',

'topic'

=

'test',

'properties.bootstrap.servers'

=

'61:9092',

'scan.startup.mode'

=

'earliest-offset',

'format'

=

'json',

'json.ignore-parse-errors'

=

'true',

'json.fail-on-missing-field'

=

'false',

'properties.group.id'

=

'flink1'

);kafka消費(fèi)的啟動模式有'earliest-offset','latest-offset','group-offsets','timestamp','specific-offsets'等3.2、創(chuàng)建flink寫入hive表(sink)SET

table.sql-dialect=hive;

CREATE

TABLE

log_hive

(

user_id

STRING,

order_amount

DOUBLE

)

PARTITIONED

BY

(dt

STRING,

hr

STRING)

STORED

AS

parquet

TBLPROPERTIES

(

'partition.time-extractor.timestamp-pattern'='$dt

$hr:00:00',

'sink.partition-commit.trigger'='partition-time',

'sink.partition-commit.delay'='1min',

'sink.semantic'

=

'exactly-once',

'sink.rolling-policy.file-size'='128MB',

'sink.rolling-policy.rollover-interval'

='1min',

'sink.rolling-policy.check-interval'='1min',

'sink.partition-commit.policy.kind'='metastore,success-file'

);配置解釋:'sink.partition-commit.trigger'='partition-time',

--使用partition中抽取時間,加上watermark決定partitoncommit的時機(jī)'partition.time-extractor.timestamp-pattern'='$dt$hour:00:00',

--配置hour級別的partition時間抽取策略,這個例子中dt字段是yyyy-MM-dd格式的天,hour是0-23的小時,timestamp-pattern定義了如何從這兩個partition字段推出完整的timestamp'sink.partition-commit.delay'='1min',

--配置dalay為分鐘級,當(dāng)watermark>partition時間+1分鐘,會commit這個partition'sink.partition-commit.policy.kind’='metastore,success-file'

--partitiioncommit的策略是:先更新metastore(addPartition),再寫SUCCESS文件3.3、將數(shù)據(jù)插入hive中INSERT

INTO

TABLE

log_hive

SELECT

user_id,

order_amount,DATE_FORMAT(log_ts,

'yyyy-MM-dd'),

DATE_FORMAT(log_ts,

'HH')

FROM

log_kafka;

3.4、查詢結(jié)果--

batch

sql,

select

with

partition

pruning

SELECT

*

FROM

hive_table

WHERE

dt='2020-08-25'

and

hr='16';

4、kafka-Flink-HiveTableAPI編寫4.1pom.xml配置<?xml

version="1.0"

encoding="UTF-8"?>

<project

xmlns="/POM/4.0.0"

xmlns:xsi="/2001/XMLSchema-instance"

xsi:schemaLocation="/POM/4.0.0

/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>com.flink</groupId>

<artifactId>flinkhive</artifactId>

<packaging>jar</packaging>

<version>1.0-SNAPSHOT</version>

<properties>

<scala.bin.version>2.11</scala.bin.version>

<flink.version>1.13.0</flink.version>

<hadoop.version>2.6.4</hadoop.version>

<hive.version>2.3.4</hive.version>

</properties>

<dependencies>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_${scala.bin.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_${scala.bin.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-common</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-hive_${scala.bin.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-json</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>${hadoop.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-hdfs</artifactId>

<version>${hadoop.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-yarn-client</artifactId>

<version>${hadoop.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-mapreduce-client-core</artifactId>

<version>${hadoop.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-exec</artifactId>

<version>${hive.version}</version>

</dependency>

</dependencies>

</project>4.2代碼存放路徑截圖Scala版本代碼:import

java.time.Duration

import

org.apache.flink.streaming.api.{CheckpointingMode,

TimeCharacteristic}

import

org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions

import

org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import

org.apache.flink.table.api.{EnvironmentSettings,

SqlDialect}

import

org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import

org.apache.flink.table.catalog.hive.HiveCatalog

object

KafkaToHive

{

def

main(args:

Array[String]):

Unit

=

{

val

streamEnv

=

StreamExecutionEnvironment.getExecutionEnvironment

streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

streamEnv.setParallelism(3)

val

tableEnvSettings

=

EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build()

val

tableEnv

=

StreamTableEnvironment.create(streamEnv,

tableEnvSettings)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,

CheckpointingMode.EXACTLY_ONCE)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,

Duration.ofSeconds(20))

val

catalogName

=

"my_catalog"

val

catalog

=

new

HiveCatalog(

catalogName,

//

catalog

name

"default",

//

default

database

"./src/main/resources",

//

Hive

config

(hive-site.xml)

directory

"2.3.4"

//

Hive

version

)

tableEnv.registerCatalog(catalogName,

catalog)

tableEnv.useCatalog(catalogName)

tableEnv.executeSql("CREATE

DATABASE

IF

NOT

EXISTS

stream_tmp")

tableEnv.executeSql("DROP

TABLE

IF

EXISTS

stream_tmp.log_kafka")

tableEnv.executeSql(

"""

|CREATE

TABLE

stream_tmp.log_kafka

(

|

user_id

STRING,

|

order_amount

DOUBLE,

|

log_ts

TIMESTAMP(3),

|

WATERMARK

FOR

log_ts

AS

log_ts

-

INTERVAL

'5'

SECOND

|)

WITH

(

|

'connector'

=

'kafka',

|

'topic'

=

'test',

|

'properties.bootstrap.servers'

=

'hlink163:9092',

|

'properties.group.id'

=

'flink1',

|

'scan.startup.mode'

=

'earliest-offset',

|

'format'

=

'json',

|

'json.fail-on-missing-field'

=

'false',

|

'json.ignore-parse-errors'

=

'true'

|)

""".stripMargin

)

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

tableEnv.executeSql("CREATE

DATABASE

IF

NOT

EXISTS

hive_tmp")

tableEnv.executeSql("DROP

TABLE

IF

EXISTS

hive_tmp.log_hive")

tableEnv.executeSql(

"""

|CREATE

TABLE

hive_tmp.log_hive

(

|

user_id

STRING,

|

order_amount

DOUBLE

|)

PARTITIONED

BY

(

|

dt

STRING,

|

hr

STRING

|)

STORED

AS

PARQUET

|TBLPROPERTIES

(

|

'sink.partition-commit.trigger'

=

'partition-time',

|

'sink.partition-commit.delay'

=

'1

min',

|

'format'

=

'json',

|

'sink.partition-commit.policy.kind'

=

'metastore,success-file',

|

'partition.time-extractor.timestamp-pattern'='$dt

$hr:00:00'

|)

""".stripMargin

)

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

tableEnv.executeSql(

"""

|INSERT

INTO

hive_tmp.log_hive

|SELECT

|

user_id,

|

order_amount,

|

DATE_FORMAT(log_ts,

'yyyy-MM-dd'),

DATE_FORMAT(log_ts,

'HH')

|

FROM

stream_tmp.log_kafka

""".stripMargin

)

}

}

java版本代碼:import

org.apache.flink.streaming.api.CheckpointingMode;

import

org.apache.flink.streaming.api.TimeCharacteristic;

import

org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;

import

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import

org.apache.flink.table.api.EnvironmentSettings;

import

org.apache.flink.table.api.SqlDialect;

import

org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import

org.apache.flink.table.catalog.hive.HiveCatalog;

import

java.time.Duration;

/**

*

類描述:

*

*

@ClassName

KafkaToHive

*

@Description:

*

@Author:

lyz

*

@Date:

2021/9/6

下午9:50

*/

public

class

KafkaToHive

{

public

static

void

main(String[]

args)

{

StreamExecutionEnvironment

senv

=

StreamExecutionEnvironment.getExecutionEnvironment();

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

senv.setParallelism(3);

EnvironmentSettings

tableEnvSettings

=

EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode()

.build();

StreamTableEnvironment

tableEnv

=

StreamTableEnvironment.create(senv,

tableEnvSettings);

//

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,

CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,

Duration.ofSeconds(20));

String

catalogName

=

"my_catalog";

HiveCatalog

catalog

=

new

HiveCatalog(

catalogName,

//

catalog

name

"default",

//

default

database

"./src/main/resources",

//

Hive

config

(hive-site.xml)

directory

"2.3.4"

//

Hive

version

);

tableEnv.registerCatalog(catalogName,

catalog);

tableEnv.useCatalog(catalogName);

tableEnv.executeSql("CREATE

DATABASE

IF

NOT

EXISTS

stream_tmp");

tableEnv.executeSql("DROP

TABLE

IF

EXISTS

stream_tmp.log_kafka");

tableEnv.executeSql("create

table

stream_tmp.log_kafka("

+

"user_id

String,\n"

+

"order_amount

Double,\n"

+

"log_ts

Timestamp(3),\n"

+

"WATERMARK

FOR

log_ts

AS

log_ts

-INTERVAL

'5'

SECOND"

+

")WITH("

+

"

'connector'

=

'kafka',\n"

+

"'topic'

=

'test',\n"

+

"

'properties.bootstrap.servers'

=

'61:9092',\n"

+

"'properties.group.id'

=

'flink1',\n"

+

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論