Spark大數(shù)據(jù)分析實(shí)戰(zhàn)_第1頁(yè)
Spark大數(shù)據(jù)分析實(shí)戰(zhàn)_第2頁(yè)
Spark大數(shù)據(jù)分析實(shí)戰(zhàn)_第3頁(yè)
Spark大數(shù)據(jù)分析實(shí)戰(zhàn)_第4頁(yè)
Spark大數(shù)據(jù)分析實(shí)戰(zhàn)_第5頁(yè)
已閱讀5頁(yè),還剩285頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

Spark大數(shù)據(jù)分析實(shí)戰(zhàn)

目錄

第1章Spark簡(jiǎn)介

1.1初識(shí)Spark

1.2Spark生態(tài)系統(tǒng)BDAS

1.3Spark架構(gòu)與運(yùn)行邏輯

1.4彈性分布式數(shù)據(jù)集

1.4.1RDD簡(jiǎn)介

1.4.2RDD算子分類

1.5本章小結(jié)

第2章Spark開發(fā)與環(huán)境配置

2.1Spark應(yīng)用開發(fā)環(huán)境配置

2.1.1使用Intellii開發(fā)Spark程序

2.1.2使用SparkShell進(jìn)行交互式數(shù)據(jù)分析

2.2遠(yuǎn)程調(diào)試Spark程序

2.3Spark編譯

2.4配置Spark源碼閱讀環(huán)境

2.5木章小結(jié)

第3章BDAS簡(jiǎn)介

3.1SOLonSpark

3.1.1為什么使用SparkSOL

3.1.2SparkSQL架構(gòu)分析

3.2SparkStreaming

321SparkStreaming簡(jiǎn)介

3.2.2SparkStreaming架構(gòu)

3.2.3SparkStreaming原理剖析

3.3GraphX

3.3.1GraphX簡(jiǎn)介

3.3.2GraphX的使用簡(jiǎn)介

3.3.3GraphX體系結(jié)構(gòu)

3.4MLlib

3.4.1MLlib簡(jiǎn)介

342MLiib中的聚類和分類

3.5本章小結(jié)

4.1日志分析概述

4.2日志分析指標(biāo)

4.3Lamda架構(gòu)

44構(gòu)建日志分析數(shù)據(jù)流水線

4.41用Flume進(jìn)行日志采集

4.4.2用Kafka將口志匯總

4.4.3用SparkStreaming進(jìn)行實(shí)時(shí)H志分析

444SparkSQL離線口志分析

445用Flask將日志KPI可視化

4.5本章小結(jié)

第5章基于云平臺(tái)和用戶日志的推薦系統(tǒng)

5.1Azure云平臺(tái)簡(jiǎn)介

5.1.1Azure網(wǎng)站模型

5.1.2Azure數(shù)據(jù)存儲(chǔ)

5.1.3AzureQueue消息傳遞

5.2系統(tǒng)架構(gòu)

5.3構(gòu)建Node.js應(yīng)用

5.3.1創(chuàng)建AzureWeb應(yīng)用

5.3.2構(gòu)建本地Node.js網(wǎng)站

533發(fā)布應(yīng)用到云平臺(tái)

5.4數(shù)據(jù)收集與預(yù)處理

5.4.1通過(guò)IS收集用戶行為日志

5.4.2用戶實(shí)時(shí)行為回傳到AzureQueue

5.5SparkStreaming實(shí)時(shí)分析用戶日志

5.5.1構(gòu)建AzureQueue的SparkStreamingReceiver

5.5.2SparkStreaming實(shí)時(shí)處理AzureOueue口志

5.5.3SparkStreaming數(shù)據(jù)存儲(chǔ)于AzureTable

5.6MLlib離線訓(xùn)練模型

561加載訓(xùn)練數(shù)據(jù)

5.6.2使用ratingRDD訓(xùn)練ALS模型

563使用ALS模型進(jìn)行電影推薦

564評(píng)估模型的均方差

5.7本章小結(jié)

第6章Twitter情感分析

6.1系統(tǒng)架構(gòu)

6.2Twitter數(shù)據(jù)收集

6.2.1設(shè)置

6.2.2SparkStreaming接收并輸出Tweet

63數(shù)據(jù)預(yù)處理與Cassandra存儲(chǔ)

631添力口SBT依賴

6.3.2創(chuàng)建CassandraSchema

6.3.3數(shù)據(jù)存儲(chǔ)于Cassandra

64SparkStreaming熱點(diǎn)Twitter分析

6.5SparkStreaming在線情感分析

6.6SparkSOL進(jìn)行Twitter分析

6.6.1讀取Cassandra數(shù)據(jù)

662查看【SON數(shù)據(jù)模式

663SparkSQL分析Twitter

6.7Twitter可視化

6.8本章小結(jié)

7.1新聞數(shù)據(jù)分析

7.2系統(tǒng)架構(gòu)

7.3爬蟲抓取網(wǎng)絡(luò)信息

7.3.1Scrapy簡(jiǎn)介

732創(chuàng)建基于Scrapy的新聞爬蟲

7.3.3爬蟲分布式化

7.4新聞文木數(shù)據(jù)預(yù)處理

7.5新聞聚類

7.5.1數(shù)據(jù)轉(zhuǎn)換為向量(向量空間模型VSM)

752新聞聚類

753詞向量同義詞查詢

7.5.4實(shí)時(shí)熱點(diǎn)新聞分析

7.6SparkElasticSearch構(gòu)建全文檢索弓I擎

7.6.1部署ElasticSearch

7.6.2用ElasticSearch索弓IMongoDB數(shù)據(jù)

7.6.3通過(guò)ElasticSearch檢索數(shù)據(jù)

7.7本章小結(jié)

第8章構(gòu)建分布式的協(xié)同過(guò)濾推薦系統(tǒng)

8.1推薦系統(tǒng)簡(jiǎn)介

8.2協(xié)同過(guò)濾介紹

8.2.1基于用戶的協(xié)同過(guò)濾算法User-basedCF

822基于項(xiàng)目的協(xié)同過(guò)濾算法Item-basedCF

823基于模型的協(xié)同過(guò)濾推薦Model-basedCF

8.3基于Spark的矩陣運(yùn)算實(shí)現(xiàn)協(xié)同過(guò)濾算法

8.3.1Spark中的矩陣類型

8.3.2Spark中的矩陣運(yùn)算

8.3.3實(shí)現(xiàn)User-based協(xié)同過(guò)濾的示例

8.3.4實(shí)現(xiàn)Item-based協(xié)同過(guò)濾的示例

8.3.5基于奇異值分解實(shí)現(xiàn)Model-based協(xié)同過(guò)濾的示例

8.4基于Spark的MLlib實(shí)現(xiàn)協(xié)同過(guò)濾算法

8.4.1MLlib的推薦算法工具

842MLlib協(xié)同過(guò)濾推薦示例

8.5案例:使用MLlib協(xié)同過(guò)濾實(shí)現(xiàn)電影推薦

8.5.1MovieLens數(shù)據(jù)集

852確定最佳的協(xié)同過(guò)濾模型參數(shù)

853利用最佳模型進(jìn)行電影推薦

8.6本章小結(jié)

第9章基于Spark的社交網(wǎng)絡(luò)分析

9.1社交網(wǎng)絡(luò)介紹

9.1.1社交網(wǎng)絡(luò)的類型

9.1.2社交網(wǎng)絡(luò)的相關(guān)概念

9.2社交網(wǎng)絡(luò)中社團(tuán)挖掘算法

921聚類分析和K均值算法簡(jiǎn)介

922社團(tuán)挖掘的衡量指標(biāo)

923基于譜聚類的社團(tuán)挖掘算法

9.3Spark中的K均值算法

9.3.1Spark中與K均值有關(guān)的對(duì)象和方法

9.3.2Spark下K均值算法示例

9.4案例:基于Spark的Facebook社團(tuán)挖掘

941SNAP社交網(wǎng)絡(luò)數(shù)據(jù)集介紹

9.4.2基于Spark的社團(tuán)挖掘?qū)崿F(xiàn)

9.5社交網(wǎng)絡(luò)中的鏈路預(yù)測(cè)算法

951分類學(xué)習(xí)簡(jiǎn)介

952分類器的評(píng)價(jià)指標(biāo)

9.5.3基于Logistic回歸的鏈路預(yù)測(cè)算法

9.6SparkMLlib中的Logistic回歸

961分類器相關(guān)對(duì)象

9.6.2模型驗(yàn)證對(duì)象

9.6.3基于Snark的LogisticI可歸示例

9.7案例:基于Spark的鏈路預(yù)測(cè)算法

9.7.1SNAP符號(hào)社交網(wǎng)絡(luò)Epinions數(shù)據(jù)集

9.7.2基于Spark的鏈路預(yù)測(cè)算法

9.8本章小結(jié)

第10章基于Spark的大規(guī)模新聞主題分析

10.1主題模型簡(jiǎn)介

10.2主題模型LDA

10.2.1LDA模型介紹

10.2.2LDA的訓(xùn)練算法

10.3Spark中的LDA模型

10.3.1MLlib對(duì)LDA的支持

10.3.2Spark中LDA模型訓(xùn)練示例

1。4案例:Newsgroups新聞的主題分析

10.4.1Newsgroups數(shù)據(jù)集介名召

10.4.2交叉驗(yàn)證估計(jì)新聞的主題個(gè)數(shù)

10.4.3基于主題模型的文本聚類算法

10.4.4基于主題模型的文本分類算法

10.5本章小結(jié)

第11章構(gòu)建分布式的搜索引擎

11.1搜索引擎簡(jiǎn)介

11.2搜索排序概述

11.3查詢無(wú)關(guān)模型PageRank

114基于Spark的分存式PageRank實(shí)現(xiàn)

1L4.1PageRank的MapReduce實(shí)現(xiàn)

114.2Spark的分相式圖模型GraphX

11.4.3基于GraphX的PageRank實(shí)現(xiàn)

115窠例:GoogleWebGraph的PageRank計(jì)算

11.6查詢相關(guān)模型RankingSVM

1L7Spark中支持向量機(jī)的實(shí)現(xiàn)

1171Spark中的支持向量機(jī)模型

1172使用Spark測(cè)試數(shù)據(jù)演示支持向量機(jī)的訓(xùn)練

1L8案例:基于MSLR數(shù)據(jù)集的杳詢排序

118.1MicrosoftLearningtoRank數(shù)據(jù)集介紹

11.8.2基于Spark的RankingSVM實(shí)現(xiàn)

11.9本章小結(jié)

第1章Spark簡(jiǎn)介

本章主要介紹Spark框架的概念、生態(tài)系統(tǒng)、架構(gòu)及RDD等,并圍繞Spark的BDAS項(xiàng)

目及其子項(xiàng)目進(jìn)行了簡(jiǎn)要介紹。目前,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目

的集合,其中包含SparkSQL、SparkStreaming、GraphX、MLlib等子項(xiàng)目,本章只進(jìn)行

簡(jiǎn)要介紹,后續(xù)章節(jié)會(huì)有詳細(xì)闡述。

1.1初識(shí)Spark

Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架,因?yàn)樗趦?nèi)存計(jì)算,所以提高了在大數(shù)

據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性,同時(shí)保證了高容錯(cuò)性和高可伸縮性,允許用戶將Spark部

署在大量廉價(jià)硬件之上,形成集群。

l.Spark執(zhí)行的特點(diǎn)

Hadoop中包含計(jì)算框架MapReduce和分布式文件系統(tǒng)HDFS。

Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存儲(chǔ)層,融入Hadoop

的生態(tài)系統(tǒng),并彌補(bǔ)MapReduce的不足。

(1)中間結(jié)果輸出

Spark將執(zhí)行工作流抽象為通用的有向無(wú)環(huán)圖執(zhí)行計(jì)劃(DAG),可以將多Stage的任務(wù)

串聯(lián)或者并行執(zhí)行,而無(wú)需將Stage的中間結(jié)果輸出到HDFS中,類似的引擎包括Flink、

Dryad>Tezo

(2)數(shù)據(jù)格式和內(nèi)存布局

Spark抽象出分布式內(nèi)存存儲(chǔ)結(jié)構(gòu)彈性分布式數(shù)據(jù)集RDD,可以理解為利用分布式的數(shù)組

來(lái)進(jìn)行數(shù)據(jù)的存儲(chǔ)。RDD能支持粗粒度寫操作,但對(duì)于讀取操作,它可以精確到每條記

錄。Spark的特性是能夠控制數(shù)據(jù)在不同節(jié)點(diǎn)上的分區(qū),用戶可以自定義分區(qū)策略。

(3)執(zhí)行策略

Spark執(zhí)行過(guò)程中不同Stage之間需要進(jìn)行Shuffle?Shuffle是連接有依賴的Stage的橋梁,

上游Stage輸出到下游Stage中必須經(jīng)過(guò)Shuffle這個(gè)環(huán)節(jié),通過(guò)Shuffle將相同的分組數(shù)

據(jù)拆分后聚合到同一個(gè)節(jié)點(diǎn)再處理。SparkShuffle支持基于Hash或基于排序的分布式聚

合機(jī)制。

(4)任務(wù)調(diào)度的開銷

Spark采用了事件驅(qū)動(dòng)的類庫(kù)AKKA來(lái)啟動(dòng)任務(wù),通過(guò)線程池的復(fù)用線程來(lái)避免系統(tǒng)啟動(dòng)

和切換開銷。

2.Spark的優(yōu)勢(shì)

Spark的一站式解決方案有很多的優(yōu)勢(shì),分別如下所述。

(1)打造全棧多計(jì)算范式的高效數(shù)據(jù)流水線

支持復(fù)雜查詢與數(shù)據(jù)分析任務(wù)。在簡(jiǎn)單的“Map”及"Reduce”操作之外,Spark還支持SQL

查詢、流式計(jì)算、機(jī)器學(xué)習(xí)和圖算法。同時(shí),用戶可以在同一個(gè)工作流中無(wú)縫搭配這些計(jì)

算范式。

(2)輕量級(jí)快速處理

Spark代碼量較小,這得益于Scala語(yǔ)言的簡(jiǎn)潔和豐富表達(dá)力,以及Spark通過(guò)External

DataSourceAPI充分利用和集成Hadoop等其他第三方組件的能力。同時(shí)Spark基于內(nèi)存

計(jì)算,可通過(guò)中間結(jié)果緩存在內(nèi)存來(lái)減少磁盤I/O以達(dá)到性能的提升。

(3)易于使用,支持多語(yǔ)言

Spark支持通過(guò)Scala、Java和Python編寫程序,這允許開發(fā)者在自己熟悉的語(yǔ)言環(huán)境下

進(jìn)行工作。它自帶了80多個(gè)算子,同時(shí)允許在Shell中進(jìn)行交互式計(jì)算。用戶可以利用

Spark像書寫單機(jī)程序一樣書寫分布式程序,輕松利用Spark搭建大數(shù)據(jù)內(nèi)存計(jì)算平臺(tái)并

充分利用內(nèi)存計(jì)算,實(shí)現(xiàn)海量數(shù)據(jù)的實(shí)時(shí)處理。

(4)與ExternalDataSource多數(shù)據(jù)源支持

Spark可以獨(dú)立運(yùn)行,除了可以運(yùn)行在當(dāng)下的Yarn集群管理之外,它還可以讀取已有的

任何Hadoop數(shù)據(jù)。它可以運(yùn)行多種數(shù)據(jù)源,比如Parquet、Hive、HBase、HDFS等。這

個(gè)特性讓用戶可以輕易遷移已有的持久化層數(shù)據(jù)。

(5)社區(qū)活躍度高

Spark起源于2009年,當(dāng)下已有超過(guò)600多位工程師貢獻(xiàn)過(guò)代碼。開源系統(tǒng)的發(fā)展不應(yīng)

只看一時(shí)之快,更重要的是一個(gè)活躍的社區(qū)和強(qiáng)大的生態(tài)系統(tǒng)的支持。

同時(shí)也應(yīng)該看到Spark并不是完美的,RDD模型適合的是粗粒度的全局?jǐn)?shù)據(jù)并行計(jì)算;

不適合細(xì)粒度的、需要異步更新的計(jì)算。對(duì)于一些計(jì)算需求,如果要針對(duì)特定工作負(fù)載達(dá)

到最優(yōu)性能,還需要使用一些其他的大數(shù)據(jù)系統(tǒng)。例如,圖計(jì)算領(lǐng)域的GraphLab在特定

計(jì)算負(fù)載性能上優(yōu)于GraphX,流計(jì)算中的Storm在實(shí)時(shí)性要求很高的場(chǎng)合要更勝Spark

Streaming一籌。

1.2Spark生態(tài)系統(tǒng)BDAS

目前,Spark已經(jīng)發(fā)展成為包含眾多子項(xiàng)目的大數(shù)據(jù)計(jì)算平臺(tái)。BDAS是伯克利大學(xué)提出

的基于Spark的數(shù)據(jù)分析棧(BDAS)。其核心框架是Spark,同時(shí)涵蓋支持結(jié)構(gòu)化數(shù)據(jù)

SQL查詢與分析的查詢引擎SparkSQL,提供機(jī)器學(xué)習(xí)功能的系統(tǒng)MLBase及底層的分布

式機(jī)器學(xué)習(xí)庫(kù)MLlib,并行圖計(jì)算框架GraphX,流計(jì)算框架SparkStreaming,近似查詢

引擎BlinkDB,內(nèi)存分布式文件系統(tǒng)Tachyon,資源管理框架Mesos等子項(xiàng)目。這些子項(xiàng)

目在Spark上層提供了更高層、更豐富的計(jì)算范式。

圖1-1展現(xiàn)了BDAS的主要項(xiàng)目結(jié)構(gòu)圖。

MLlib

SparkSparkGraphX

(machine

SQL(graph)

□Learning)

ApacheSpark

圖1-1伯克利數(shù)據(jù)分析棧(BDAS)主要項(xiàng)目結(jié)構(gòu)圖

下面對(duì)BDAS的各個(gè)子項(xiàng)目進(jìn)行更詳細(xì)的介紹。

(1)Spark

Spark是整個(gè)BDAS的核心組件,是一個(gè)大數(shù)據(jù)分布式編程框架,不僅實(shí)現(xiàn)了MapReduce

的算子map函數(shù)和reduce函數(shù)及計(jì)算模型,還提供了更為豐富的算子,例如filter、join、

groupByKey等。Spark將分布式數(shù)據(jù)抽象為RDD(彈性分布式數(shù)據(jù)集),并實(shí)現(xiàn)了應(yīng)用

任務(wù)調(diào)度、RPC、序列化和壓縮,并為運(yùn)行在其上層的組件提供API。其底層采用Scala

這種函數(shù)式語(yǔ)言書寫而成,并且所提供的API深度借鑒函數(shù)式的編程思想,提供與Scala

類似的編程接口。

圖1-2所示即為Spark的處理流程(主要對(duì)象為RDD)。

Spark將數(shù)據(jù)在分布式環(huán)境下分區(qū),然后將作業(yè)轉(zhuǎn)化為有向無(wú)環(huán)圖(DAG),并分階段進(jìn)

行DAG的調(diào)度和任務(wù)的分布式并行處理。

(2)SparkSQL

SparkSQL提供在大數(shù)據(jù)上的SQL查詢功能,類似于Shark在整個(gè)生態(tài)系統(tǒng)的角色,它們

可以統(tǒng)稱為SQLonSpark。之前,由于Shark的查詢編譯和優(yōu)化器依賴Hive,使得Shark

不得不維護(hù)一套Hive分支。而SparkSQL使用Catalyst作為查詢解析和優(yōu)化器,并在底

層使用Spark作為執(zhí)行引擎實(shí)現(xiàn)SQL的算子。用戶可以在Spark上直接書寫SQL,相當(dāng)

于為Spark擴(kuò)充了一套SQL算子,這無(wú)疑更加豐富了Spark的算子和功能。同時(shí)Spark

SQL不斷兼容不同的持久化存儲(chǔ)(如HDFS、Hive等),為其發(fā)展奠定廣闊的空間。

圖1-2Spark的任務(wù)處理流程圖

(3)SparkStreaming

SparkStreaming通過(guò)將流數(shù)據(jù)按指定時(shí)間片累積為RDD,然后將每個(gè)RDD進(jìn)行批處理,

進(jìn)而實(shí)現(xiàn)大規(guī)模的流數(shù)據(jù)處理。其吞吐量能夠超越現(xiàn)有主流流處理框架Storm,并提供豐

富的API用于流數(shù)據(jù)計(jì)算。

(4)GraphX

GraphX基于BSP模型,在Spark之上封裝類似Pregel的接口,進(jìn)行大規(guī)模同步全局的

圖計(jì)算,尤其是當(dāng)用戶進(jìn)行多輪迭代的時(shí)候,基于Spark內(nèi)存計(jì)算的優(yōu)勢(shì)尤為明顯。

(5)MLlib

MLlib是Spark之上的分布式機(jī)器學(xué)習(xí)算法庫(kù),同時(shí)包括相關(guān)的測(cè)試和數(shù)據(jù)生成器。

MLlib支持常見的機(jī)器學(xué)習(xí)問題,例如分類、回歸、聚類以及協(xié)同過(guò)濾,同時(shí)也包括一個(gè)

底層的梯度下降優(yōu)化基礎(chǔ)算法。

1.3Spark架構(gòu)與運(yùn)行邏輯

l.Spark的架構(gòu)

?Driver:運(yùn)行Application的main()函數(shù)并且創(chuàng)建SparkContext。

?Client:用戶提交作業(yè)的客戶端。

?Worker:集群中任何可以運(yùn)行Application代碼的節(jié)點(diǎn),運(yùn)行一個(gè)或多個(gè)Executor進(jìn)程。

?Executor:運(yùn)行在Worker的Task執(zhí)行器,Executor啟動(dòng)線程池運(yùn)行Task,并且負(fù)責(zé)將

數(shù)據(jù)存在內(nèi)存或者磁盤上。每個(gè)Application都會(huì)申請(qǐng)各自的Executor來(lái)處理任務(wù)。

?SparkContext:整個(gè)應(yīng)用的上下文,控制應(yīng)用的生命周期。

?RDD:Spark的基本計(jì)算單元,一組RDD形成執(zhí)行的有向無(wú)環(huán)圖RDDGrapho

?DAGScheduler:根據(jù)Job構(gòu)建基于Stage的DAG工作流,并提交Stage給

TaskSchedulero

?TaskScheduler:將Task分發(fā)給Executor執(zhí)行。

?SparkEnv:線程級(jí)別的上下文,存儲(chǔ)運(yùn)行時(shí)的重要組件的引用。

2.運(yùn)行邏輯

(1)Spark作業(yè)提交流程

如圖1-3所示,Client提交應(yīng)用,Master找到一個(gè)Worker啟動(dòng)Driver,Driver向

Master或者資源管理器申請(qǐng)資源,之后將應(yīng)用轉(zhuǎn)化為RDD有向無(wú)環(huán)圖,再由

DAGScheduler將RDD有向無(wú)環(huán)圖轉(zhuǎn)化為Stage的有向無(wú)環(huán)圖提交給TaskScheduler,由

TaskScheduler提交任務(wù)給Executor進(jìn)行執(zhí)行。任務(wù)執(zhí)行的過(guò)程中其他組件再協(xié)同工作確

保整個(gè)應(yīng)用順利執(zhí)行。

圖1-3Spark架構(gòu)

(2)Spark作業(yè)運(yùn)行邏輯

如圖1-4所示,在Spark應(yīng)用中,整個(gè)執(zhí)行流程在邏輯上運(yùn)算之間會(huì)形成有向無(wú)環(huán)圖。

Action算子觸發(fā)之后會(huì)將所有累積的算子形成一個(gè)有向無(wú)環(huán)圖,然后由調(diào)度器調(diào)度該圖

上的任務(wù)進(jìn)行運(yùn)算。Spark的調(diào)度方式與MapReduce有所不同。Spark根據(jù)RDD之間不

同的依賴關(guān)系切分形成不同的階段(Stage),一個(gè)階段包含一系列函數(shù)進(jìn)行流水線執(zhí)行。

圖中的A、B、C、D、E、F,分別代表不同的RDD,RDD內(nèi)的一個(gè)方框代表一個(gè)數(shù)據(jù)塊。

數(shù)據(jù)從HDFS輸入Spark,形成RDDA和RDDC,RDDC上執(zhí)行map操作,轉(zhuǎn)換為RDD

D,RDDB和RDDE進(jìn)行join操作轉(zhuǎn)換為F,而在B到F的過(guò)程中又會(huì)進(jìn)行Shuffle。最

后RDDF通過(guò)函數(shù)saveAsSequenceFile輸出保存到HDFS中。

TranstbnnationsActions

A

SS

L

aLLC.

J

HH

tcxtF

4

icnccFilc

圖1-4Spark執(zhí)行有I可無(wú)環(huán)圖

1.4彈性分布式數(shù)據(jù)集

本節(jié)將介紹彈性分布式數(shù)據(jù)集RDDoSpark是一個(gè)分布式計(jì)算框架,而RDD是其對(duì)分布

式內(nèi)存數(shù)據(jù)的抽象,可以認(rèn)為RDD就是Spark分布式算法的數(shù)據(jù)結(jié)構(gòu),而RDD之上的

操作是Spark分布式算法的核心原語(yǔ),由數(shù)據(jù)結(jié)構(gòu)和原語(yǔ)設(shè)計(jì)上層算法。Spark最終會(huì)將

算法(RDD上的一連串操作)翻譯為DAG形式的工作流進(jìn)行調(diào)度,并進(jìn)行分布式任務(wù)的

分發(fā)。

1.4.1RDD簡(jiǎn)介

在集群背后,有一個(gè)非常重要的分布式數(shù)據(jù)架構(gòu),即彈性分布式數(shù)據(jù)集(Resilient

DistributedDataset,RDD)。它在集群中的多臺(tái)機(jī)器上進(jìn)行了數(shù)據(jù)分區(qū),邏輯上可以認(rèn)

為是一個(gè)分布式的數(shù)組,而數(shù)組中每個(gè)記錄可以是用戶自定義的任意數(shù)據(jù)結(jié)構(gòu)。RDD是

Spark的核心數(shù)據(jù)結(jié)構(gòu),通過(guò)RDD的依賴關(guān)系形成Spark的調(diào)度順序,通過(guò)對(duì)RDD的操

作形成整個(gè)Spark程序。

(1)RDD創(chuàng)建方式

1)從Hadoop文件系統(tǒng)(或與Hadoop兼容的其他持久化存儲(chǔ)系統(tǒng),如Hive、Cassandra、

HBase)輸入(例如HDFS)創(chuàng)建。

2)從父RDD轉(zhuǎn)換得到新RDDo

3)通過(guò)parallelize或makeRDD將單機(jī)數(shù)據(jù)創(chuàng)建為分布式RDD。

(2)RDD的兩種操作算子

對(duì)于RDD可以有兩種操作算子:轉(zhuǎn)換(Transformation)與行動(dòng)(Action)。

1)轉(zhuǎn)換(Transformation):Transformation操作是延遲計(jì)算的,也就是說(shuō)從一個(gè)RDD

轉(zhuǎn)換生成另一個(gè)RDD的轉(zhuǎn)換操作不是馬上執(zhí)行,需要等到有Action操作的時(shí)候才會(huì)真正

觸發(fā)運(yùn)算。

2)行動(dòng)(Action):Action算子會(huì)觸發(fā)Spark提交作業(yè)(Job),并將數(shù)據(jù)輸出Spark系

統(tǒng)。

(3)RDD的重要內(nèi)部屬性

通過(guò)RDD的內(nèi)部屬性,用戶可以獲取相應(yīng)的元數(shù)據(jù)信息。通過(guò)這些信息可以支持更復(fù)雜

的算法或優(yōu)化。

1)分區(qū)列表:通過(guò)分區(qū)列表可以找到一個(gè)RDD中包含的所有分區(qū)及其所在地址。

2)計(jì)算每個(gè)分片的函數(shù):通過(guò)函數(shù)可以對(duì)每個(gè)數(shù)據(jù)塊進(jìn)行RDD需要進(jìn)行的用戶自定義

函數(shù)運(yùn)算。

3)對(duì)父RDD的依賴列表:為了能夠回溯到父RDD,為容錯(cuò)等提供支持。

4)對(duì)key-valuepair數(shù)據(jù)類型RDD的分區(qū)器,控制分區(qū)策略和分區(qū)數(shù)。通過(guò)分區(qū)函數(shù)可

以確定數(shù)據(jù)記錄在各個(gè)分區(qū)和節(jié)點(diǎn)上的分配,減少分布不平衡。

5)每個(gè)數(shù)據(jù)分區(qū)的地址列表(如HDFS上的數(shù)據(jù)塊的地址)。

如果數(shù)據(jù)有副本,則通過(guò)地址列表可以獲知單個(gè)數(shù)據(jù)塊的所有副本地址,為負(fù)載均衡和容

錯(cuò)提供支持。

(4)Spark計(jì)算工作流

圖1-5中描述了Spark的輸入、運(yùn)行轉(zhuǎn)換、輸出。在運(yùn)行轉(zhuǎn)換中通過(guò)算子對(duì)RDD進(jìn)行轉(zhuǎn)

換。算子是RDD中定義的函數(shù),可以對(duì)RDD中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換和操作。

?輸入:在Spark程序運(yùn)行中,數(shù)據(jù)從外部數(shù)據(jù)空間(例如,HDFS、Scala集合或數(shù)據(jù))

輸入到Spark,數(shù)據(jù)就進(jìn)入了Spark運(yùn)行時(shí)數(shù)據(jù)空間,會(huì)轉(zhuǎn)化為Spark中的數(shù)據(jù)塊,通過(guò)

BlockManager進(jìn)行管理。

?運(yùn)行:在Spark數(shù)據(jù)輸入形成RDD后,便可以通過(guò)變換算子fliter等,對(duì)數(shù)據(jù)操作并將

RDD轉(zhuǎn)化為新的RDD,通過(guò)行動(dòng)(Action)算子,觸發(fā)Spark提交作業(yè)。如果數(shù)據(jù)需要

復(fù)用,可以通過(guò)Cache算子,將數(shù)據(jù)緩存到內(nèi)存。

?輸出:程序運(yùn)行結(jié)束數(shù)據(jù)會(huì)輸出Spark運(yùn)行時(shí)空間,存儲(chǔ)到分布式存儲(chǔ)中(如

saveAsTextFile輸出到HDFS)或Scala數(shù)據(jù)或集合中(collect輸出到Scala集合,count

返回ScalaInt型數(shù)據(jù))。

外部數(shù)據(jù)空間

圖1-5Spark算子和數(shù)據(jù)空間

Spark的核心數(shù)據(jù)模型是RDD,但RDD是個(gè)抽象類,具體由各子類實(shí)現(xiàn),如

MappedRDD、ShuffledRDD等子類。Spark將常用的大數(shù)據(jù)操作都轉(zhuǎn)化成為RDD的子類。

1.4.2RDD算子分類

本節(jié)將主要介紹Spark算子的作用,以及算子的分類。

Spark算子大致可以分為以下兩類。

1)Transformation變換算子:這種變換并不觸發(fā)提交作業(yè),完成作業(yè)中間過(guò)程處理。

2)Action行動(dòng)算子:這類算子會(huì)觸發(fā)SparkContext提交Job作業(yè)。

下面分別對(duì)兩類算子進(jìn)行詳細(xì)介紹。

l.Transformations算子

下文將介紹常用和較為重要的Transformation算子。

(1)map

將原來(lái)RDD的每個(gè)數(shù)據(jù)項(xiàng)通過(guò)map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€(gè)新的元素。源

碼中map算子相當(dāng)于初始化一個(gè)RDD,新RDD叫做MappedRDD(this,sc.clean

(f))o

圖1-7中每個(gè)方框表示一個(gè)RDD分區(qū),左側(cè)的分區(qū)經(jīng)過(guò)用戶自定義函數(shù)f:T->U映射為

右側(cè)的新RDD分區(qū)。但是,實(shí)際只有等到Action算子觸發(fā)后這個(gè)f函數(shù)才會(huì)和其他函數(shù)

在一個(gè)stage中對(duì)數(shù)據(jù)進(jìn)行運(yùn)算。在圖1-6中的第一個(gè)分區(qū),數(shù)據(jù)記錄VI輸入f,通過(guò)f

轉(zhuǎn)換輸出為轉(zhuǎn)換后的分區(qū)中的數(shù)據(jù)記錄VI。

(2)flatMap

將原來(lái)RDD中的每個(gè)元素通過(guò)函數(shù)f轉(zhuǎn)換為新的元素,并將生成的RDD的每個(gè)集合中的

元素合并為一個(gè)集合,內(nèi)部創(chuàng)建FlatMappedRDD(this,sc.clean(f))。

in=4iri

IZMZI

圖1-6map算子對(duì)RDD轉(zhuǎn)換

圖1-7表示RDD的一個(gè)分區(qū)進(jìn)行flatMap函數(shù)操作,flatMap中傳入的函數(shù)為f:T->U,

T和U可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過(guò)用戶自定義函數(shù)f轉(zhuǎn)換為新的數(shù)據(jù)。

外部大方框可以認(rèn)為是一個(gè)RDD分區(qū),小方框代表一個(gè)集合。VI、V2、V3在一個(gè)集合

作為RDD的一個(gè)數(shù)據(jù)項(xiàng),可能存儲(chǔ)為數(shù)組或其他容器,轉(zhuǎn)換為V1、V'2、V3后,將原

來(lái)的數(shù)組或容器結(jié)合拆散,拆散的數(shù)據(jù)形成為RDD中的數(shù)據(jù)項(xiàng)。

圖1-7flapMap算子對(duì)RDD轉(zhuǎn)換

(3)mapPartitions

mapPartitions函數(shù)獲取到每個(gè)分區(qū)的迭代器,在函數(shù)中通過(guò)這個(gè)分區(qū)整體的迭代器對(duì)整

個(gè)分區(qū)的元素進(jìn)行操作。內(nèi)部實(shí)現(xiàn)是生成MapPartitionsRDD。圖1-8中的方框代表一個(gè)

RDD分區(qū)。

圖1-8中,用戶通過(guò)函數(shù)f(iter)=>iter.filter(_>=3)對(duì)分區(qū)中所有數(shù)據(jù)進(jìn)行過(guò)濾,大

于和等于3的數(shù)據(jù)保留。一個(gè)方塊代表一個(gè)RDD分區(qū),含有1、2、3的分區(qū)過(guò)濾只剩下

元素3o

n-P^r~i

EMU

圖1-8mapPartitions算子對(duì)RDD轉(zhuǎn)換

(4)union

使用union函數(shù)時(shí)需要保證兩個(gè)RDD元素的數(shù)據(jù)類型相同,返回的RDD數(shù)據(jù)類型和被

合并的RDD元素?cái)?shù)據(jù)類型相同。并不進(jìn)行去重操作,保存所有元素,如果想去重可以使

用distinct()<>同時(shí)Spark還提供更為簡(jiǎn)潔的使用union的API,通過(guò)++符號(hào)相當(dāng)于

union函數(shù)操作。

圖1-9中左側(cè)大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代

表合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。合并后,VI、V2、V3V8形成一個(gè)分

區(qū),其他元素同理進(jìn)行合并。

(5)cartesian

對(duì)兩個(gè)RDD內(nèi)的所有元素進(jìn)行笛卡爾積操作。操作后,內(nèi)部實(shí)現(xiàn)返回CartesianRDD。圖

1-10中左側(cè)大方框代表兩個(gè)RDD,大方框內(nèi)的小方框代表RDD的分區(qū)。右側(cè)大方框代表

合并后的RDD,大方框內(nèi)的小方框代表分區(qū)。

例如:VI和另一個(gè)RDD中的Wl、W2、Q5進(jìn)行笛卡爾積運(yùn)算形成(VI,Wl)、(VI,

W2)、(VI,Q5)。

圖1-9union算子對(duì)RDD轉(zhuǎn)換

圖1-10cartesian算子對(duì)RDD轉(zhuǎn)換

(6)groupBy

groupBy:將元素通過(guò)函數(shù)生成相應(yīng)的Key,數(shù)據(jù)就轉(zhuǎn)化為Key-Value格式,之后將Key

相同的元素分為一組。

函數(shù)實(shí)現(xiàn)如下:

1)將用戶函數(shù)預(yù)處理:

valcleanF=sc.clean(f)

2)對(duì)數(shù)據(jù)map進(jìn)行函數(shù)操作,最后再進(jìn)行g(shù)roupByKey分組操作。

this.mapft=>(cleanF(t),tJ).groupByKey(p)

其中,P確定了分區(qū)個(gè)數(shù)和分區(qū)函數(shù),也就決定了并行化的程度。

圖1-11中方框代表一個(gè)RDD分區(qū),相同key的元素合并到一個(gè)組。例如VI和V2合并

為V,Value為VI,V2。形成V,Seq(VI,V2)。

圖1-11groupBy算子對(duì)RDD轉(zhuǎn)換

(7)filter

filter函數(shù)功能是對(duì)元素進(jìn)行過(guò)濾,對(duì)每個(gè)元素應(yīng)用f函數(shù),返回值為true的元素在RDD

中保留,返回值為false的元素將被過(guò)濾掉。內(nèi)部實(shí)現(xiàn)相當(dāng)于生成FilteredRDD(this,

sc.clean(f))。

下面代碼為函數(shù)的本質(zhì)實(shí)現(xiàn):

deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

圖1-12中每個(gè)方框代表一個(gè)RDD分區(qū),T可以是任意的類型。通過(guò)用戶自定義的過(guò)濾函

數(shù)f,對(duì)每個(gè)數(shù)據(jù)項(xiàng)操作,將滿足條件、返回結(jié)果為true的數(shù)據(jù)項(xiàng)保留。例如,過(guò)濾掉

V2和V3保留了VI,為區(qū)分命名為VI。

(8)sample

sample將RDD這個(gè)集合內(nèi)的元素進(jìn)行采樣,獲取所有元素的子集。用戶可以設(shè)定是否有

放回的抽樣、百分比、隨機(jī)種子,進(jìn)而決定采樣方式。

內(nèi)部實(shí)現(xiàn)是生成SampledRDD(withReplacement,fraction,seed)。

函數(shù)參數(shù)設(shè)置:

?withReplacement=true,表示有放回的抽樣。

?withReplacement=false,表示無(wú)放回的抽樣。

圖1-13中的每個(gè)方框是一個(gè)RDD分區(qū)。通過(guò)sample函數(shù),采樣50%的數(shù)據(jù)。VI、V2、

UI、U2U4采樣出數(shù)據(jù)VI和UI、U2形成新的RDD。

f:T->Boolean

VI

V2

V3

U1

U2

U3

U4

圖1-12filter算子對(duì)RDD轉(zhuǎn)換

fraction=O.5,sccd==9

UI

U2

U3

U4

圖1-13sample算子對(duì)RDD轉(zhuǎn)換

(9)cache

cache將RDD元素從磁盤緩存到內(nèi)存。相當(dāng)于persist(MEMORY_ONLY)函數(shù)的功能。

EDISKMUMEMORY

nn^nn

圖1-14Cache算子對(duì)RDD轉(zhuǎn)換

圖1-14中每個(gè)方框代表一個(gè)RDD分區(qū),左側(cè)相當(dāng)于數(shù)據(jù)分區(qū)都存儲(chǔ)在磁盤,通過(guò)cache

算子將數(shù)據(jù)緩存在內(nèi)存。

(10)persist

persist函數(shù)對(duì)RDD進(jìn)行緩存操作。數(shù)據(jù)緩存在哪里依據(jù)StorageLevel這個(gè)枚舉類型進(jìn)行

確定。有以下幾種類型的組合(見圖1?14),DISK代表磁盤,MEMORY代表內(nèi)存,SER

代表數(shù)據(jù)是否進(jìn)行序列化存儲(chǔ)。

下面為函數(shù)定義,StorageLevel是枚舉類型,代表存儲(chǔ)模式,用戶可以通過(guò)圖1?14按需

進(jìn)行選擇。

persist(newLevel:StorageLevel]

圖1-15中列出persist函數(shù)可以進(jìn)行緩存的模式。例如,MEMORY_AND_DISK_SER代表

數(shù)據(jù)可以存儲(chǔ)在內(nèi)存和磁盤,并且以序列化的方式存儲(chǔ),其他同理。

valDISK_OMLY:StorageLevel

valDISK__ONLY_2:StorageLevel

vallEMORY__Ain)_DISK:StorageLevel

vallE10RY_A!ro_DISK_2:StorageLevel

val■E1ORY_A!TO_DISK_SER:StorageLevel

valMEMORY_AND_DISK_SER_2:StoraaeL已uel

val!E10RY_0!iLY:StorazeLevel

val1EMORY_OBLY_2:StorageLevel

val1E1ORY_ONLY_SER:StorageLevel

val1EMORY_ONLY_SER_2:StorageLevel

valNONE:StorageLevel

valOFF_HEAP:StorageLevel

圖1-15persist算子對(duì)RDD轉(zhuǎn)換

圖1-16中方框代表RDD分區(qū)。disk代表存儲(chǔ)在磁盤,mem代表存儲(chǔ)在內(nèi)存。數(shù)據(jù)最初

全部存儲(chǔ)在磁盤,通過(guò)persist(MEMORY_AND_DISK)將數(shù)據(jù)緩存到內(nèi)存,但是有的分

區(qū)無(wú)法容納在內(nèi)存,將含有VI、V2、V3的分區(qū)存儲(chǔ)到磁盤。

(11)mapValues

mapValues:針對(duì)(Key,Value)型數(shù)據(jù)中的Value進(jìn)行Map操作,而不對(duì)Key進(jìn)行處

理。

圖1-17中的方框代表RDD分區(qū)。a=>a+2代表對(duì)(VI,1)這樣的KeyValue數(shù)據(jù)對(duì),

數(shù)據(jù)只對(duì)Value中的1進(jìn)行加2操作,返回結(jié)果為3。

Pcn;ist(MEMORY_AND_D!SK.)

V2(disk)[V2(disk)

V3(disk)JV3(disk)|

UI(disk)|UI(mein)

U2(disk)I</z|U2(mem)

圖1-16Persist算子對(duì)RDD轉(zhuǎn)換

mapValues(a=>a+2)

圖1-17mapValues算子RDD對(duì)轉(zhuǎn)換

(12)combineByKey

下面代碼為combineByKey函數(shù)的定義:

combineByKey[C](createCombiner:(V]C,mergeValue:(C,VJC,mergeCombiners:(C,C)C,

partitioneriPartitioner,mapSideCombine:Boolean=true,

serializer:Serializer=null):RDD[(K,C)]

說(shuō)明:

?createCombiner:V=>C,C不存在的情況下,比如通過(guò)V創(chuàng)建seqCo

?mergeValue:(C,V)=>C,當(dāng)C已經(jīng)存在的情況下,需要merge,比如把itemV加到

seqC中,或者疊加。

?mergeCombiners:(C,C)=>C,合并兩個(gè)C。

?partitioner:Partitioner,Shuffle時(shí)需要的Partitionero

?mapSideCombine:Boolean=true,為了減小傳輸量,很多combine可以在map端先做,

比如疊加,可以先在一個(gè)partition中把所有相同的key的value疊加,再shuffle<>

?serializerClass:String=null,傳輸需要序列化,用戶可以自定義序列化類:

例如,相當(dāng)于將元素為(Int,Int)的RDD轉(zhuǎn)變?yōu)榱?Int,Seq[Int])類型元素的RDD。

圖1?18中的方框代表RDD分區(qū)。如圖,通過(guò)combineByKey,將(VI,2),(VI,1)

數(shù)據(jù)合并為(VI,Seq(2,1))。

(13)reduceByKey

reduceByKey是比combineByKey更簡(jiǎn)單的一種情況,只是兩個(gè)值合并成一個(gè)值,(Int,

IntV)to(Int,IntC),比如疊加。所以createCombinerreduceBykey彳艮簡(jiǎn)單,就是直

接返回v,而mergeValue和mergeCombiners邏輯是相同的,沒有區(qū)別。

combineByKey

U「以有多種實(shí)現(xiàn).此處是groupByKey的實(shí)現(xiàn)

Vl,Scpf2,l)

V2,Scp(2)

V3,Scp(l)

圖1-18comBineByKey算子對(duì)RDD轉(zhuǎn)換

函數(shù)實(shí)現(xiàn):

defreduceByKeyfpartitioner:Partitioner,func:(V,V)=>V):RDD[(K,V)]=

{combineByKey[V]([v:V)=>v,func,func,partitioner)}

圖1-19中的方框代表RDD分區(qū)。通過(guò)用戶自定義函數(shù)(A,B)=>(A+B)函數(shù),將相

同key的數(shù)據(jù)(VI,2)和(VI,1)的value相加運(yùn)算,結(jié)果為(VI,3)。

reduceByKc((A,B)=>(A+B))

圖1-19reduceByKey算子對(duì)RDD轉(zhuǎn)換

(14)join

join對(duì)兩個(gè)需要連接的RDD進(jìn)行cogroup函數(shù)操作,將相同key的數(shù)據(jù)能夠放到一個(gè)分

區(qū),在cogroup操作之后形成的新RDD對(duì)每個(gè)key下的元素進(jìn)行笛卡爾積的操作,返回

的結(jié)果再展平,對(duì)應(yīng)key下的所有元組形成一個(gè)集合。最后返回RDD[(K,(V,

W))]o

下面代碼為join的函數(shù)實(shí)現(xiàn),本質(zhì)是通過(guò)cogroup算子先進(jìn)行協(xié)同劃分,再通過(guò)

flatMapValues將合并的數(shù)據(jù)打散。

this.cogroup(other,partitioner).f?latMapValues{case(vs,ws)=>for(v<-vs;w>-ws)yield(v,w)}

圖1-20是對(duì)兩個(gè)RDD的join操作示意圖。大方框代表RDD,小方框代表RDD中的分區(qū)。

函數(shù)對(duì)相同key的元素,如VI為key做連接后結(jié)果為(VI,(1,1))和(VI,(1,

2))o

2.Actions算子

本質(zhì)上在Action算子中通過(guò)SparkContext進(jìn)行了提交作業(yè)的runjob操作,觸發(fā)了RDD

DAG的執(zhí)行。

圖1-20join算子對(duì)RDD轉(zhuǎn)換

例如,Action算子collect函數(shù)的代碼如下,感興趣的讀者可以順著這個(gè)入口進(jìn)行源碼剖

析:

/***ReturnanarraythatcontainsalloftheelementsinthisRDD.*/defcollect():Array[T]

={/*提交Job*/valresults=sc.runjob(this,(iter:Iterator[T]]=>iter.toArray)

Array.concat(results:_*)}

下面將介紹常用和較為重要的Action算子。

(1)foreach

foreach對(duì)RDD中的每個(gè)元素都應(yīng)用f函數(shù)操作,不返回RDD和Array,而是返回Uinto

圖1-21表示foreach算子通過(guò)用戶自定義函數(shù)對(duì)每個(gè)數(shù)據(jù)項(xiàng)進(jìn)行操作。本例中自定義函

數(shù)為printin(),控制臺(tái)打印所有數(shù)據(jù)項(xiàng)。

Foreach(_>printin(_))

、U'l

7U,2

圖1-21foreach算子對(duì)RDD轉(zhuǎn)換

(2)saveAsTextFile

函數(shù)將數(shù)據(jù)輸出,存儲(chǔ)到HDFS的指定目錄。

下面為saveAsTextFile函數(shù)的內(nèi)部實(shí)現(xiàn),其內(nèi)部通過(guò)調(diào)用saveAsHadoopFile進(jìn)行實(shí)現(xiàn):

this.mapfx=>(NullWritable.getQ,new

Text(x.toString))),saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)

將RDD中的每個(gè)元素映射轉(zhuǎn)變?yōu)?null,x.toString),然后再將其寫入HDFS。

圖1-22中左側(cè)方框代表RDD分區(qū),右側(cè)方框代表HDFS的Block。通過(guò)函數(shù)將RDD的

每個(gè)分區(qū)存儲(chǔ)為HDFS中的一個(gè)Blocko

(3)collect

collect相當(dāng)于toArray,toArray已經(jīng)過(guò)時(shí)不推薦使用,collect將分布式的RDD返回為一

個(gè)單機(jī)的scalaArray數(shù)組。在這個(gè)數(shù)組上運(yùn)用scala的函數(shù)式操作。

圖1-23中左側(cè)方框代表RDD分區(qū),右側(cè)方框代表單機(jī)內(nèi)存中的數(shù)組。通過(guò)函數(shù)操作,

將結(jié)果返回到Driver程序所在的節(jié)點(diǎn),以數(shù)組形式存儲(chǔ)。

RDDHDFS

m=^in20000

|2|__粕|Part-00001

圖1-22saveAsHadoopFile算子對(duì)RDD轉(zhuǎn)換

collect()

圖1-23Collect算子對(duì)RDD轉(zhuǎn)換

(4)count

count返回整個(gè)RDD的元素個(gè)數(shù)。

內(nèi)部函數(shù)實(shí)現(xiàn)為:

defcountO:Long=sc.runJob(this,Utils.getIteratorSize_).sum

圖1-24中,返回?cái)?shù)據(jù)的個(gè)數(shù)為5。一個(gè)方塊代表一個(gè)RDD分區(qū)。

圖1-24count對(duì)RDD算子轉(zhuǎn)換

1.5本章小結(jié)

本章首先介紹了Spark分布式計(jì)算平臺(tái)的基本概念、原理以及Spark生態(tài)系統(tǒng)BDAS之

上的典型組件。Spark為用戶提供了系統(tǒng)底層細(xì)節(jié)透明、編程接口簡(jiǎn)潔的分布式計(jì)算平臺(tái)。

Spark具有內(nèi)存計(jì)算、實(shí)時(shí)性高、容錯(cuò)性好等突出特點(diǎn)。同時(shí)本章介紹了Spark的計(jì)算模

型,Spark會(huì)將應(yīng)用程序整體翻譯為一個(gè)有向無(wú)環(huán)圖進(jìn)行調(diào)度和執(zhí)行。相比MapReduce,

Spark提供了更加優(yōu)化和復(fù)雜的執(zhí)行流。讀者還可以深入了解Spark的運(yùn)行機(jī)制與Spark

算子,這樣能更加直觀地了解API的使用。Spark提供了更加豐富的函數(shù)式算子,這樣就

為Spark上層組件的開發(fā)奠定了堅(jiān)實(shí)的基礎(chǔ)。

相信讀者已經(jīng)想了解如何開發(fā)Spark程序,接下來(lái)將就Spark的開發(fā)環(huán)境配置進(jìn)行闡述。

第2章Spark開發(fā)與環(huán)境配:閆

用戶進(jìn)行Spark應(yīng)用程序開發(fā),一般在用戶本地進(jìn)行單機(jī)開發(fā)調(diào)試,之后再將作業(yè)提交

到集群生產(chǎn)環(huán)境中運(yùn)行。下面將介紹Spark開發(fā)環(huán)境的配置,如何編譯和進(jìn)行源碼閱讀

環(huán)境的配置。

用戶可以在官網(wǎng)上下載最新的AS軟件包,網(wǎng)址為:/o

2.1Spark應(yīng)用開發(fā)環(huán)境配置

Spark的開發(fā)可以通過(guò)Intel?或者EclipseIDE進(jìn)行,在環(huán)境配置的開始階段,還需要安

裝相應(yīng)的Scala插件。

2.1.1使用Intellij開發(fā)Spark程序

本節(jié)介紹如何使用IntellijIDEA構(gòu)建Spark開發(fā)環(huán)境和源碼閱讀環(huán)境。由于Intellij對(duì)

Scala的支持更好,目前Spark開發(fā)團(tuán)隊(duì)主要使用Intellij作為開發(fā)環(huán)境。

1.配置開發(fā)環(huán)境

(1)安裝JDK

用戶可以自行安裝JDK8。官網(wǎng)地址:

/technetwork/java/javase/downloads/index.htmlo

下載后,如果在Windows下直接運(yùn)行安裝程序,會(huì)自動(dòng)配置環(huán)境變量,安裝成功后,在

CMD的命令行下輸入Java,有Java版本的日志信息提示則證明安裝成功。

如果在Linux下安裝,下載JDK包解壓縮后,還需要配置環(huán)境變量。

在/etc/profile文件中,配置環(huán)境變量:

exportJAVA_HOME=/usr/java/jdkl.8exportJAVA_BlN=/usr/java/jdkl.8/binexport

PATH=$PATH:$JAVA_HOME/binexport

CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$]AVA_HOME/lib/tools.jarexportJAVA_HOME

JAVA_BINPATHCLASSPATH

(2)安裝Scala

Spark內(nèi)核采用Scala進(jìn)行開發(fā),上層通過(guò)封裝接口提供Java和Python的API,在進(jìn)行

開發(fā)前需要配置好Scala的開發(fā)包。

Spark對(duì)Scala的版本有約束,用戶可以在Spark的官方下載界面看到相應(yīng)的Scala版本

號(hào)。下載指定的Scala包,官網(wǎng)地址:http://www.scala-lang.Org/download/o

(3)安裝IntellijIDEA

用戶可以下載安裝最新版本的Intellij,官網(wǎng)地址:

http://www.jetbrains.eom/idea/download/o

目前Intellij最新的版本中已經(jīng)可以支持新建SBT工程,安裝Scala插件,可以很好地支

持Scala開發(fā)。

(4)Intellij中安裝Scala插件

在Intellij菜單中選擇"Configure",在下拉菜單中選擇"Plugins",再選擇"Browse

repositories",輸入"Scala”搜索插件(如圖2-1所示),在彈出的對(duì)話框中單擊"install"按

鈕,重啟Intellijo

2.配置Spark應(yīng)用開發(fā)環(huán)境

1)用戶在IntellijIDEA中創(chuàng)建ScalaProject,SparkTesto

2)選擇菜單中的"File"-?"projectstructure"->"Libraries”命令,單擊導(dǎo)入"spark-

,,

assembly_2.10-1.0.0-incubating-hadoop2.2.0.jaro

只需導(dǎo)入該jar包,該包可以通過(guò)在Spark的源碼工程下執(zhí)行"sbt/sbtassembly"命令生成,

這個(gè)命令相當(dāng)于將Spark的所有依賴包和Spark源碼打包為一個(gè)整體。

在"assembly/target/scala-2.10.4/"目錄下生成:spark-assembly-1.0.O-incubating-

hadoop2.2.0.jaro

3)如果IDE無(wú)法識(shí)別Scala庫(kù),則需要以同樣方式將Scala庫(kù)的jar包導(dǎo)入。之后就可以

開始開發(fā)Spark程序。如圖2-2所示,本例將Spark默認(rèn)的示例程序SparkPi復(fù)制到文件。

圖2-1輸入"Scala"搜索插件

圖2-2編寫程序

3.運(yùn)行Spark程序

(1)本地運(yùn)行

編寫完scala程序后,可以直接在Intellij中,以本地Local模式運(yùn)行(如圖2-3所示),

方法如下。

圖2-3以local模式運(yùn)行

在Intellij中的選擇"Run"T"DebugConfiguration"->''EditConfigurations”命令。在

"Programarguments"文本框中輸入main函數(shù)的輸入?yún)?shù)local。然后右鍵選擇需要運(yùn)行

的類,單擊“Run”按鈕運(yùn)行。

(2)集群上運(yùn)行Spark應(yīng)用jar包

如果想把程序打成jar包,通過(guò)命令行的形式運(yùn)行在Spark集群中,并按照以下步驟操作。

1)選擇"File"—"ProjectStructure”,在彈出的對(duì)話框中選擇"Artifact"->"Jar"->"From

Moduleswithdependencies”命令。

2)在選擇"FromModuleswithdependencies"之后彈出的對(duì)話框中,選擇Main函數(shù),同

時(shí)選擇輸出jar位置,最后單擊“0K”按鈕。

具體如圖2-4?圖2-6所示。

在圖2-5中選擇需要執(zhí)行的Main函數(shù)。

在圖2-6界面選擇依賴的jar包。

圖2-4生成jar包第一步

圖2-5生成jar包第二步

圖2-6生成jar包第三步

在主菜單選擇"Build”1"BuildArtifact”命令,編譯生成jar包。

3)將生成的jar包SparkTest.jar在集群的主節(jié)點(diǎn),通過(guò)下面命令執(zhí)行:

java-jarSparkTestjar

用戶可以通過(guò)上面的流程和方式通過(guò)Intellij作為集成開發(fā)環(huán)境進(jìn)行Spark程序的開發(fā)。

2.1.2使用SparkShell進(jìn)行交互式數(shù)據(jù)分析

如果是運(yùn)行SparkShell,那么會(huì)默認(rèn)創(chuàng)建一個(gè)SparkContext,命名為sc,所以不需要在

SparkShell創(chuàng)建新的SparkContext,SparkContext是應(yīng)用程序的上下文,調(diào)度整個(gè)應(yīng)用

并維護(hù)元數(shù)據(jù)信息。在運(yùn)行SparkShell之前,可以設(shè)定參數(shù)MASTER,將Spark應(yīng)用提

交到MASTER指向的相應(yīng)集群或者本地模式執(zhí)行,集群方式運(yùn)行的作業(yè)將會(huì)分布式地運(yùn)

行,本地模式執(zhí)行的作業(yè)將會(huì)通過(guò)單機(jī)多線程方式運(yùn)行??梢酝ㄟ^(guò)參數(shù)ADDJARS把

JARS添加到classpath,用戶可以通過(guò)這種方式添加所需的第三方依賴庫(kù)。

如果想spakr-sheli在本地4核的CPU運(yùn)行,需要如下方式啟動(dòng):

$MASTER=local[4]./spark-shell

這里的4是指啟動(dòng)4個(gè)工作線程。

如果要添加JARS,代碼如下:

$MASTER=local[4]ADD_JARS=code.jar./spark-shell

在spark-shell中,輸入下面代碼,讀取dir文件:

scala>valtext=sc.textFile("dir")

輸出文件中有多少數(shù)據(jù)項(xiàng),則可用:

scala>text.count

按鍵,即可運(yùn)行程序。

通過(guò)以上介紹,用戶可以了解如何使用SparkShell進(jìn)行交互式數(shù)據(jù)分析。

對(duì)于邏輯較為復(fù)

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論