《Spark編程基礎(chǔ)及項目實踐》課件05Spark-SQL實踐_第1頁
《Spark編程基礎(chǔ)及項目實踐》課件05Spark-SQL實踐_第2頁
《Spark編程基礎(chǔ)及項目實踐》課件05Spark-SQL實踐_第3頁
《Spark編程基礎(chǔ)及項目實踐》課件05Spark-SQL實踐_第4頁
《Spark編程基礎(chǔ)及項目實踐》課件05Spark-SQL實踐_第5頁
已閱讀5頁,還剩48頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)

文檔簡介

項目5SparkSQL實踐學(xué)習(xí)目標(biāo)(1)(2)(5)了解SparkSQL的基本概念掌握SparkSQL與Shell交互的方法掌握DataFrame的查詢及輸出操作學(xué)習(xí)目標(biāo)(3)掌握創(chuàng)建DataFrame對象的方法(4)掌握DataFrame查看數(shù)據(jù)的方法任務(wù)1初識SparkSQL5.1.1SparkSQL的前世ApacheHive是一個構(gòu)建在Hadoop上的數(shù)據(jù)倉庫框架,它提供數(shù)據(jù)的概要信息、查詢和分析功能。Hive提供了一個類SQL的語言——HiveQL,它將對關(guān)系數(shù)據(jù)庫的模式操作轉(zhuǎn)換為Hadoop的map/reduce、ApacheTez和Spark執(zhí)行引擎所支持的操作。當(dāng)用戶向Hive輸入一段命令或查詢時,Hive需要與Hadoop交互工作來完成該操作。該命令或查詢首先進入驅(qū)動模塊,由驅(qū)動模塊中的編譯器進行解析編譯,并由優(yōu)化器對該操作進行優(yōu)化計算,然后交給執(zhí)行器去執(zhí)行,執(zhí)行器通常的任務(wù)是啟動一個或多個MapReduce任務(wù)。圖5-1描述了用戶提交一段SQL查詢后,Hive把SQL語句轉(zhuǎn)化成MapReduce任務(wù)執(zhí)行的詳細(xì)過程。前世?5.1.1SparkSQL的前世圖5-1Hive中SQL查詢轉(zhuǎn)化成MapReduce任務(wù)的過程5.1.1SparkSQL的前世Shark即HiveonSpark,為了與Hive兼容(見圖5-2),Shark在HiveQL方面重用了Hive中HiveQL的解析、邏輯執(zhí)行計劃翻譯、執(zhí)行計劃優(yōu)化等邏輯,可以近似認(rèn)為僅將物理執(zhí)行計劃從MapReduce作業(yè)替換成了Spark作業(yè),通過Hive的HiveQL解析,把HiveQL翻譯成Spark上的RDD操作。圖5-2Shark直接繼承了Hive的各個組件5.1.1SparkSQL的前世圖5-3Shark與Hive的性能比較Shark的最大特性就是運行速度快和與Hive完全兼容,且可以在Shell模式下使用rdd2sql()這樣的API,把HQL得到的結(jié)果集繼續(xù)在scala環(huán)境下運算,支持自己編寫簡單的機器學(xué)習(xí)或分析處理函數(shù),對HQL結(jié)果進行進一步分析計算。Shark的出現(xiàn)使得SQL-on-Hadoop的性能比Hive有了10~100倍的提高,如圖5-3所示。5.1.1SparkSQL的前世Shark的設(shè)計導(dǎo)致了兩個問題:(1)執(zhí)行計劃優(yōu)化完全依賴于Hive,不方便添加新的優(yōu)化策略;(2)因為Spark是線程級并行,而MapReduce是進程級并行,因此,Spark在兼容Hive的實現(xiàn)上存在線程安全問題,導(dǎo)致Shark不得不使用另外一套獨立維護的打了補丁的Hive源碼分支。5.1.2SparkSQL架構(gòu)SparkSQL架構(gòu)在Shark原有架構(gòu)上重寫了邏輯執(zhí)行計劃的優(yōu)化部分,解決了Shark存在的問題,如圖5-4所示。SparkSQL在Hive兼容層面僅依賴HiveQL解析和Hive元數(shù)據(jù),也就是說,從HQL被解析成抽象語法樹(AST)起,就全部由SparkSQL接管了。SparkSQL執(zhí)行計劃生成和優(yōu)化都由Catalyst(函數(shù)式關(guān)系查詢優(yōu)化框架)負(fù)責(zé)。5.1.2SparkSQL架構(gòu)圖5-4SparkSQL架構(gòu)5.1.2SparkSQL架構(gòu)SparkSQL增加了DataFrame(帶有Schema信息的RDD),使用戶可以在SparkSQL中執(zhí)行SQL語句,數(shù)據(jù)既可以來自RDD,也可以是Hive、HDFS、Cassandra等外部數(shù)據(jù)源,還可以是JSON格式的數(shù)據(jù)。SparkSQL目前支持Scala、Java、Python等編程語言,支持SQL-92規(guī)范,如圖5-5所示。圖5-5SparkSQL支持的數(shù)據(jù)格式和編程語言5.1.3SparkSQL的優(yōu)勢關(guān)系數(shù)據(jù)庫是建立在關(guān)系模型基礎(chǔ)上的數(shù)據(jù)庫,借助于集合代數(shù)等數(shù)學(xué)概念和方法來處理數(shù)據(jù)庫中的數(shù)據(jù),已經(jīng)流行多年。由于具有規(guī)范的行和列結(jié)構(gòu),存儲在關(guān)系數(shù)據(jù)庫中的數(shù)據(jù)通常被稱為結(jié)構(gòu)化數(shù)據(jù),用來查詢和操作關(guān)系數(shù)據(jù)庫的語言被稱為結(jié)構(gòu)化查詢語言(structuredquerylanguage,SQL)。目前主流的關(guān)系數(shù)據(jù)庫有Oracle、DB2、SQLServer、Sybase、MySQL等。5.1.3SparkSQL的優(yōu)勢關(guān)系SparkSQL的出現(xiàn)填補了這個空白。首先,SparkSQL可以提供DataFrameAPI,可以對內(nèi)部和外部各種數(shù)據(jù)源執(zhí)行各種關(guān)系操作;其次,SparkSQL可以支持大量的數(shù)據(jù)源和數(shù)據(jù)分析算法,組合使用SparkSQL和SparkMLlib,可以融合傳統(tǒng)關(guān)系數(shù)據(jù)庫的結(jié)構(gòu)化數(shù)據(jù)管理能力和機器學(xué)習(xí)算法的數(shù)據(jù)處理能力,有效滿足各種復(fù)雜的應(yīng)用需求。任務(wù)2DataFrame基礎(chǔ)操作5.2.1創(chuàng)建DataFrame對象1.從結(jié)構(gòu)化數(shù)據(jù)文件創(chuàng)建DataFramescala>valdfUsers=sqlContext.read.load("/data/users.parquet")dfUsers:org.apache.spark.sql.DataFrame=[name:string,favorite_color:string,favorite_numbers:array<int>]通常,把結(jié)構(gòu)化數(shù)據(jù)文件存儲在HDFS上,SparkSQL最常見的結(jié)構(gòu)化數(shù)據(jù)文件格式是Parquet文件或JSON文件。SparkSQL可以通過load()方法將HDFS上的格式化文件轉(zhuǎn)換為DataFrame,load默認(rèn)導(dǎo)入的文件格式是Parquet。將HDFS上的Parquet文件users.parquet轉(zhuǎn)換為DataFrame的命令如下。5.2.1創(chuàng)建DataFrame對象1.從結(jié)構(gòu)化數(shù)據(jù)文件創(chuàng)建DataFrame將JSON文件轉(zhuǎn)換為DataFrame,可以使用json()方法,命令如下。scala>valdfPeople=sqlContext.read.json("/data/people.json")dfPeople:org.apache.spark.sql.DataFrame=[age:bigint,name:string]將JSON文件轉(zhuǎn)換為DataFrame,還可以使用format()方法,命令如下。scala>valdfPeople=sqlContext.read.format("json").load("/data/people.json")dfPeople:org.apache.spark.sql.DataFrame=[age:bigint,name:string]5.2.1創(chuàng)建DataFrame對象2.從外部數(shù)據(jù)庫創(chuàng)建DataFrameSparkSQL還可以從外部數(shù)據(jù)庫中創(chuàng)建DataFrame,使用這種方式創(chuàng)建DataFrame需要通過JDBC或ODBC連接的方式訪問數(shù)據(jù)庫。例如,將MySQL數(shù)據(jù)庫test中的people表的數(shù)據(jù)轉(zhuǎn)換成DataFrame,代碼如下。scala>valurl="jdbc:mysql://32/test"url:String=jdbc:mysql://32/testscala>valjdbcDF=sqlContext.read.format("jdbc").options(|Map("url"->url,|"user"->"root",|"passwd"->"",|"dbtable"->"people")).load()jdbcDF:org.apache.spark.sql.DataFrame=[name:string,age:int]5.2.1創(chuàng)建DataFrame對象3.從RDD創(chuàng)建DataFrame將RDD數(shù)據(jù)轉(zhuǎn)化為DataFrame有兩種方式。第一種方式是利用反射機制推斷RDD模式,使用這種方式首先需要定義一個caseclass,只有caseclass才能被Spark隱式地轉(zhuǎn)換為DataFrame。使用SparkContext讀取HDFS上的people.txt文件,得到一個RDD數(shù)據(jù)集,將該RDD數(shù)據(jù)集轉(zhuǎn)換成DataFrame的命令如下。scala>caseclassPerson(name:String,age:Int)definedclassPersonscala>valdata=sc.textFile("/data/people.txt").map(_.split(","))data:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[2]atmapat<console>:24scala>valpeople=data.map(p=>Person(p(0),p(1).trim.toInt)).toDF()people:org.apache.spark.sql.DataFrame=[name:string,age:int]5.2.1創(chuàng)建DataFrame對象3.從RDD創(chuàng)建DataFrame通過編程指定Schema需要以下3步。從原來的RDD創(chuàng)建一個元組或列表的RDD。(1)用StructType創(chuàng)建一個和步驟(1)中創(chuàng)建的RDD元組或列表的結(jié)構(gòu)相匹配的Schema。通過sqlContext提供的createDataFrame方法將Schema應(yīng)用到RDD上。(2)(3)5.2.1創(chuàng)建DataFrame對象4.從Hive中的表創(chuàng)建DataFrame從Hive中的表創(chuàng)建DataFrame,可以先聲明一個HiveContext對象,然后使用該對象查詢Hive中的表并轉(zhuǎn)換成DataFrame,命令如下。scala>hiveContext.sql("usetest")res15:org.apache.spark.sql.DataFrame=[result:string]scala>valpeople=hiveContext.sql("select*frompeople")peopleDataFrame:org.apache.spark.sql.DataFrame=[name:string,age:string]5.2.2DataFrame查看數(shù)據(jù)SparkDataFrame派生于RDD類,因此類似于RDD。DataFrame只有在提交Action操作時才會進行計算。DataFrame提供了很多查看及獲取數(shù)據(jù)的操作函數(shù),常用的函數(shù)或方法見表5-1。表5-1SparkDataFrame常用操作函數(shù)或方法5.2.2DataFrame查看數(shù)據(jù)01printSchema:打印數(shù)據(jù)模式在創(chuàng)建完DataFrame之后,可以查看DataFrame中數(shù)據(jù)的模式。查看數(shù)據(jù)模式可以通過printSchema()函數(shù)來查看,它會打印出列的名稱和類型。查看DataFrame對象stocks的數(shù)據(jù)模式命令及結(jié)果如下。scala>stocks.printSchemaroot|--Ordernumber:string(nullable=true)|--Itemid:string(nullable=true)|--Price:double(nullable=true)|--Amount:double(nullable=true)5.2.2DataFrame查看數(shù)據(jù)02show:查看數(shù)據(jù)打印完模式之后,還需要查看加載進DataFrame里面的數(shù)據(jù)是否正確,從創(chuàng)建的DataFrame里面采樣數(shù)據(jù)的方法有很多種,其中最簡單的是使用show()方法。show()方法有4個版本,如表5-2所示。表5-2show()方法的4個版本5.2.2DataFrame查看數(shù)據(jù)02show:查看數(shù)據(jù)下面使用show()方法查看DataFrame對象stocks中的數(shù)據(jù)。show()方法與show(true)方法相同,只顯示前20條記錄,并且最多只顯示20個字符,如圖5-6所示。需要注意的是,圖中只截取結(jié)果前5條記錄(結(jié)果有20條記錄)。圖5-6使用show()方法查看stocks中的數(shù)據(jù)5.2.2DataFrame查看數(shù)據(jù)02show:查看數(shù)據(jù)show()方法默認(rèn)只顯示前20行記錄。若想查看前n行記錄,則可以使用show(numRows:Int)方法。例如,查看movies前5行記錄,查看結(jié)果如圖5-7所示。圖5-7查看stocks前5行記錄5.2.2DataFrame查看數(shù)據(jù)03first/head/take/takeAsList:獲取若干行記錄獲取DataFrame若干行記錄除了使用show()方法之外,還可以使用first()、head()、take()、takeAsList()等方法,如表5-3所示。表5-3DataFrame獲取若干行記錄的方法5.2.2DataFrame查看數(shù)據(jù)03first/head/take/takeAsList:獲取若干行記錄first()和head()方法功能相同,以Row或Array[Row]的形式返回一行或多行數(shù)據(jù)。take()和takeAsList()方法將獲得的數(shù)據(jù)返回到Driver端,為避免Driver發(fā)生OutofMemoryError,使用這兩個方法時需要注意數(shù)據(jù)量。這4個方法的使用如圖5-8所示。圖5-8first、head、take、takeAsList的使用方法5.2.2DataFrame查看數(shù)據(jù)04collect/collectAsList:獲取所有數(shù)據(jù)不同于show()方法,collect()方法可以將DataFrame中的所有數(shù)據(jù)都獲取到并返回一個Array對象,而collectAsList()方法可以獲取所有數(shù)據(jù)到List,其功能和collect()方法類似,只不過返回的結(jié)構(gòu)變成List對象。collect()和collectAsList()方法的用法如圖5-9和圖5-10所示。圖5-9collect()方法的使用5.2.2DataFrame查看數(shù)據(jù)04collect/collectAsList:獲取所有數(shù)據(jù)圖5-10collectAsList()方法的使用5.2.3DataFrame查詢操作表5-4DataFrame常用的查詢方法DataFrame提供了很多查詢的方法,類似于SparkRDD的Transformation操作,DataFrame的查詢操作也是一個懶操作,僅僅生成一個查詢計劃,只有觸發(fā)Action操作才會進行計算并返回查詢結(jié)果。表5-4所示是DataFrame常用的查詢方法。5.2.3DataFrame查詢操作圖5-11where()查詢1)where可以使用where(conditionExpr.String)根據(jù)指定條件進行查詢,參數(shù)中可以使用and或or,該方法的返回結(jié)果仍然為DataFrame類型。如圖5-11所示,查詢stocks對象中訂單號為BYSL00000897且價格為198的商品信息。1.條件查詢5.2.3DataFrame查詢操作圖5-12filter()查詢2)filterDataFrame還可以使用filter篩選符合條件的數(shù)據(jù),filter和where的使用方法一樣。如圖5-12所示,查詢stocks對象中訂單號為BYSL00000897且價格大于198的商品信息。1.條件查詢5.2.3DataFrame查詢操作圖5-13select()查詢2.查詢指定字段的數(shù)據(jù)信息1)select():獲取指定字段值select()方法根據(jù)傳入的String類型字段名獲取指定字段的值,以DataFrame類型返回。圖5-13所示為查詢stocks對象中Itemid及Price字段的信息。5.2.3DataFrame查詢操作圖5-14selectExpr()查詢2.查詢指定字段的數(shù)據(jù)信息selectExpr():對指定字段進行特殊處理例如,查詢stocks對象中的Itemid、Price及Amount字段,其中要求對Amount字段取別名為Total。具體操作命令如圖5-14所示。2)5.2.3DataFrame查詢操作圖5-15col()/apply()獲取指定字段2.查詢指定字段的數(shù)據(jù)信息3)col()/apply()col()/apply()也可以獲取DataFrame的指定字段,但是只能獲取一個字段,并且返回對象為Column類型。col()/apply()的用法如圖5-15所示。5.2.3DataFrame查詢操作圖5-16limit()查詢limit()方法獲取指定DataFrame的前n行記錄,得到一個新的DataFrame對象。不同于take()與head()方法,limit()方法不是Action操作。圖5-16所示為查詢stocks對象的前5行記錄。3.limit()5.2.3DataFrame查詢操作4.orderBy()/sort()sort()方法與orderBy()方法一樣,也是根據(jù)指定字段排序,用法也與orderBy一樣。下面使用sort()方法根據(jù)Price字段對stocks對象進行升序排序,如圖5-18所示。orderBy()方法是根據(jù)指定字段排序,默認(rèn)為升序排列。若要求降序排列,可以使用desc("字段名稱")或$"字段名".desc,也可以在指定字段前面加“-”。圖5-17所示是使用orderBy()方法根據(jù)Ordernumber字段對stocks對象進行降序排序。5.2.3DataFrame查詢操作4.orderBy()/sort()圖5-17orderBy()查詢圖5-18sort()排序查詢5.2.3DataFrame查詢操作5.groupBy()圖5-19groupBy()分組查詢groupBy()方法是根據(jù)字段進行分組操作,groupBy()方法有兩種調(diào)試方式,可以傳入String類型的字段名,也可以傳入Column類型的對象。圖5-19所示為根據(jù)Itemid字段對stocks對象進行分組。5.2.3DataFrame查詢操作5.groupBy()圖5-19groupBy()分組查詢5.2.3DataFrame查詢操作5.groupBy()表5-5GroupedData的常用方法groupBy()方法返回的是GroupedData對象,GroupedData的常用方法見表5-5。5.2.3DataFrame查詢操作5.groupBy()圖5-20GroupedData使用方法示例表5-5中的方法都可以用在groupBy()方法之后,如圖5-20所示,按照Itemid字段對stocks對象進行分組,然后計算分組中的元素個數(shù)。5.2.3DataFrame查詢操作6.join()表5-6常用join()方法有時根據(jù)業(yè)務(wù)需求,需要連接兩個表才可以查出業(yè)務(wù)所需的結(jié)果。DataFrame提供了3種join()方法用于連接兩個表,如表5-6所示。5.2.3DataFrame查詢操作6.join()圖5-21join(right:DataFramejoinExprs:Column)操作使用join(right:DataFramejoinExprs:Column)方法,根據(jù)Ordernumber字段連接stocks和orders對象,如圖5-21所示。5.2.4DataFrame輸出操作查看DataFrameAPI(/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.DataFrame),發(fā)現(xiàn)DataFrame中提供了很多種輸出操作方法。其中,save方法可以將DataFrame保存成文件,save操作有一個可選參數(shù)SaveMode,用這個參數(shù)可以指定如何處理數(shù)據(jù)已經(jīng)存在的情況。持久化的表會一直保留,即使Spark程序重啟也沒有影響,只要連接到同一個Metastore就可以讀取其數(shù)據(jù)。讀取持久化表時,只需要用表名作為參數(shù),調(diào)用SQLContext.table方法即可得到對應(yīng)的DataFrame。5.2.4DataFrame輸出操作將DataFrame保存到一個文件里面的具體操作包括以下幾步。(1)首先創(chuàng)建一個Map對象,用于存儲save()函數(shù)需要用到的一些數(shù)據(jù),這里將指定保存文件路徑及JSON文件的頭信息,如圖5-22所示。圖5-22創(chuàng)建Map對象(2)從DataFrame對象中選擇Itemid、Price和Amount這3列,如圖5-23所示。圖5-23創(chuàng)建copyOFUser對象5.2.4DataFrame輸出操作將DataFrame保存到一個文件里面的具體操作包括以下幾步。(3)用save()函數(shù)保存(2)中的DataFrame數(shù)據(jù)到copyOfStocks.json文件夾中,如圖5-24所示。圖5-24調(diào)用save()函數(shù)mode()函數(shù)可以接收的參數(shù)有Overwrite、Append、Ignore和ErrorIfExists。從名字就可以很好地理解,Overwrite代表覆蓋目錄中之前存在的數(shù)據(jù),Append代表給指定目錄下追加數(shù)據(jù),Ignore代表目錄下已經(jīng)有文件,那就什么都不執(zhí)行,ErrorIfExists代表如果保存目錄下已經(jīng)存在文件,那么拋出相應(yīng)的異常。小結(jié)本項目首先介紹了SparkSQL的基本概念,接著詳細(xì)介紹了SparkSQL的核心抽象編程模型DataFrame,包括創(chuàng)建DataFrame對象、Da

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論