版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
RDD
編程第5章目
錄01RDD編程基礎(chǔ)02鍵值對RDD03數(shù)據(jù)讀寫04綜合案例RDD
編程基礎(chǔ)5.1RDD編程基礎(chǔ)RDD創(chuàng)建0102RDD操作03持久化分區(qū)04一個綜合實例055.1.1RDD創(chuàng)建0102從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD通過并行集合(數(shù)組)創(chuàng)建RDD5.1.1RDD創(chuàng)建底層文件系統(tǒng)數(shù)據(jù)Spark采用textFile()方法來從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD5.1.1RDD創(chuàng)建textFile()RDDSpark的SparkContext通過textFile()讀取數(shù)據(jù)生成內(nèi)存中的RDDSparkContext5.1.1RDD創(chuàng)建.textFile()方法支持的數(shù)據(jù)類型AmazonS3等等分布式文件系統(tǒng)HDFS本地文件系統(tǒng)5.1.1RDD創(chuàng)建(1)從本地文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDDscala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[12]attextFileat<console>:275.1.1RDD創(chuàng)建5.1.1RDD創(chuàng)建(2)從分布式文件系統(tǒng)HDFS中加載數(shù)據(jù)scala>vallines=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>vallines=sc.textFile("/user/hadoop/word.txt")scala>vallines=sc.textFile("word.txt")三條語句是完全等價的,可以使用其中任意一種方式5.1.1RDD創(chuàng)建通過集合或數(shù)組創(chuàng)建RDD5.1.1RDD創(chuàng)建可調(diào)用SparkContext的parallelize方法,在Driver中已經(jīng)存在的集合(數(shù)組)上創(chuàng)建scala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]atparallelizeat<console>:29通過并行集合(數(shù)組)創(chuàng)建RDD的實例5.1.1RDD創(chuàng)建通過并行集合(數(shù)組)創(chuàng)建RDD:
也可以從列表中創(chuàng)建scala>vallist=List(1,2,3,4,5)list:List[Int]=List(1,2,3,4,5)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[14]atparallelizeat<console>:295.1.1RDD創(chuàng)建5.1.2RDD操作RDD操作轉(zhuǎn)換操作行動操作惰性機(jī)制5.1.2RDD操作轉(zhuǎn)換操作5.1.2RDD操作轉(zhuǎn)換類型操作(Transformation)只記錄轉(zhuǎn)換的軌跡發(fā)生計算5.1.2RDD操作轉(zhuǎn)換類型操作動作類型操作進(jìn)行從頭到尾的計算5.1.2RDD操作filtermapflatmapgroupByKeyreduceByKeyRDD常用轉(zhuǎn)換操作操作含義filter(func)篩選出滿足函數(shù)func的元素,并返回一個新的數(shù)據(jù)集map(func)將每個元素傳遞到函數(shù)func中,并將結(jié)果返回為一個新的數(shù)據(jù)集flatMap(func)與map()相似,但每個輸入元素都可以映射到0或多個輸出結(jié)果groupByKey()應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,Iterable)形式的數(shù)據(jù)集reduceByKey(func)應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,V)形式的數(shù)據(jù)集,其中每個值是將每個key傳遞到函數(shù)func中進(jìn)行聚合后的結(jié)果5.1.2RDD操作5.1.2RDD操作distinct對RDD內(nèi)部的元素進(jìn)行去重,并把去重后的元素放到新的RDD中union對兩個RDD進(jìn)行并集運(yùn)算,并返回新的RDDintersection對兩個RDD進(jìn)行交集運(yùn)算,并返回新的RDDsubtract對兩個RDD進(jìn)行差集運(yùn)算,并返回新的RDDzip把兩個RDD中的元素以鍵值對的形式進(jìn)行合并5.1.2RDD操作轉(zhuǎn)換操作·filter(func)scala>vallines=sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)scala>vallinesWithSpark=lines.filter(line=>line.contains("Spark"))5.1.2RDD操作map(func)操作將每個元素傳遞到函數(shù)func中,并將結(jié)果返回為一個新的數(shù)據(jù)集scala>data=Array(1,2,3,4,5)scala>valrdd1=sc.parallelize(data)scala>valrdd2=rdd1.map(x=>x+10)轉(zhuǎn)換操作·map(func)5.1.2RDD操作圖map()操作實例執(zhí)行過程示意圖5.1.2RDD操作map(func)操作將每個元素傳遞到函數(shù)func中,并將結(jié)果返回為一個新的數(shù)據(jù)集scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>valwords=lines.map(line=>line.split(""))轉(zhuǎn)換操作·map(func)5.1.2RDD操作圖map()操作實例執(zhí)行過程示意圖5.1.2RDD操作scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>valwords=lines.flatMap(line=>line.split(""))轉(zhuǎn)換操作flatMap(func)
的具體實例如下5.1.2RDD操作圖flatMap()操作實例執(zhí)行過程示意圖5.1.2RDD操作groupByKey()應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,Iterable)形式的數(shù)據(jù)集groupByKey()操作實例執(zhí)行過程5.1.2RDD操作reduceByKey(func)應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,V)形式的數(shù)據(jù)集,其中的每個值是將每個key傳遞到函數(shù)func中進(jìn)行聚合后得到的結(jié)果reduceByKey()操作實例5.1.2RDD操作<“spark”,<1,1,1>>words.reduceByKey(a,b)=>a+b5.1.2RDD操作重復(fù)的元素FlinkSparkSparkrdd1.distinct
distinct操作會對RDD內(nèi)部的元素進(jìn)行去重,該方法是對map方法及reduceByKey方法的封裝5.1.2RDD操作會對兩個RDD進(jìn)行并集運(yùn)算,并返回新的RDD,整個過程不會對元素進(jìn)行去重。具體實例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array(3,4,5))scala>valrdd3=rdd1.union(rdd2)union操作5.1.2RDD操作intersection操作會對兩個RDD進(jìn)行交集運(yùn)算,并返回新的RDD。具體實例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array(3,4,5))scala>valrdd3=ersection(rdd2)union操作5.1.2RDD操作會對兩個RDD進(jìn)行差集運(yùn)算,并返回新的RDD,整個過程不會對元素去重。具體實例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array(3,4,5))scala>valrdd3=rdd1.subtract(rdd2)subtract
操作5.1.2RDD操作會把兩個RDD中的元素以鍵值對的形式進(jìn)行合并。需要注意的是,在使用zip操作時,需要確保兩個RDD中的元素個數(shù)是相同的。具體實例如下scala>valrdd1=sc.parallelize(Array(1,2,3))scala>valrdd2=sc.parallelize(Array("Hadoop","Spark","Flink"))scala>valrdd3=rdd1.zip(rdd2)zip
操作5.1.2RDD操作行動操作Spark程序執(zhí)行執(zhí)行真正的計算5.1.2RDD操作操作含義count()返回數(shù)據(jù)集中的元素個數(shù)collect()以數(shù)組的形式返回數(shù)據(jù)集中的所有元素first()返回數(shù)據(jù)集中的第一個元素take(n)以數(shù)組的形式返回數(shù)據(jù)集中的前n個元素reduce(func)通過函數(shù)func(輸入兩個參數(shù)并返回一個值)聚合數(shù)據(jù)集中的元素foreach(func)將數(shù)據(jù)集中的每個元素傳遞到函數(shù)func中運(yùn)行5.1.2RDD操作scala>valrdd=sc.parallelize(Array(1,2,3,4,5))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atparallelizeat<console>:24scala>rdd.count()res0:Long=5scala>rdd.first()res1:Int=1scala>rdd.take(3)res2:Array[Int]=Array(1,2,3)scala>rdd.reduce((a,b)=>a+b)res3:Int=15scala>rdd.collect()res4:Array[Int]=Array(1,2,3,4,5)scala>rdd.foreach(elem=>println(elem))123455.1.3惰性機(jī)制轉(zhuǎn)換類型操作行動操作進(jìn)行從頭到尾的計算惰性機(jī)制scala>vallines=sc.textFile("data.txt")scala>vallineLengths=lines.map(s=>s.length)scala>valtotalLength=lineLengths.reduce((a,b)=>a+b)scala>vallist=List("Hadoop","Spark","Hive")list:List[String]=List(Hadoop,Spark,Hive)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[22]atparallelizeat<console>:29scala>println(rdd.count())//行動操作,觸發(fā)一次真正從頭到尾的計算3scala>println(rdd.collect().mkString(","))//行動操作,觸發(fā)一次真正從頭到尾的計算Hadoop,Spark,HiveRDD采用惰性求值的機(jī)制遇到行動操作,都會從頭開始執(zhí)行計算。多次計算同一個RDD實例如下5.1.4
持久化5.1.4
持久化數(shù)據(jù)集用戶持久化數(shù)據(jù)集數(shù)據(jù)集5.1.4
持久化persist()對一個RDD標(biāo)記為持久化并不會馬上計算生成RDD并把它持久化persist()真正持久化行動操作5.1.4
持久化persist()使用cache()方法時,會調(diào)用persist(MEMORY_ONLY)cache()可使用unpersist()方法手動地把持久化的RDD從緩存中移除unpersist()表示將RDD作為反序列化的對象存儲于JVM中,如果內(nèi)存不足,就要按照LRU原則替換緩存中的內(nèi)容persist(MEMORY_ONLY)表示將RDD作為反序列化的對象存儲在JVM中,如果內(nèi)存不足,超出的分區(qū)將會被存放在硬盤上persist(MEMORY_AND_DISK)5.1.4
持久化scala>vallist=List("Hadoop","Spark","Hive")list:List[String]=List(Hadoop,Spark,Hive)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[22]atparallelizeat<console>:29scala>rdd.cache()//會調(diào)用persist(MEMORY_ONLY),但是,語句執(zhí)行到這里,并不會緩存rdd,因為這時rdd還沒有被計算生成scala>println(rdd.count())//第一次行動操作,觸發(fā)一次真正從頭到尾的計算,這時上面的rdd.cache()才會被執(zhí)行,把這個rdd放到緩存中3scala>println(rdd.collect().mkString(","))//第二次行動操作,不需要觸發(fā)從頭到尾的計算,只需要重復(fù)使用上面緩存中的rddHadoop,Spark,Hive持久化—針對上面的實例,增加持久化語句以后的執(zhí)行過程如下RDDRDDRDD節(jié)點(diǎn)節(jié)點(diǎn)節(jié)點(diǎn)5.1.5
分區(qū)RDDRDDRDDRDD分區(qū)增加并行度RDD分區(qū)被保存到不同節(jié)點(diǎn)上5.1.5
分區(qū)減少通信開銷未分區(qū)時對UserData和Events兩個表進(jìn)行連接操作5.1.5
分區(qū)減少通信開銷
UserDataUserIdUserInfoUserIdLinkInfoEvents連接未分區(qū)時對UserData和Events兩個表進(jìn)行連接操作5.1.5
分區(qū)LinkInfoUserIDUserInfo未分區(qū)時對UserData和Events兩個表進(jìn)行連接操作做連接時不做分區(qū)
userData
um
u3
u2
u1……….幾千萬用戶表非常大一臺機(jī)器放不下放到多臺機(jī)器上5.1.5
分區(qū)減少通信開銷5.1.5
分區(qū)5.1.5
分區(qū)RDD分區(qū)原則分區(qū)個數(shù)集群中CPU核心數(shù)目Mesos模式Y(jié)ARN模式Standalone模式Local模式指
定設(shè)置spark.default.parallelism參數(shù)默認(rèn)的分區(qū)數(shù)目5.1.5
分區(qū)若設(shè)置了local[N]則默認(rèn)為NLocal模式spark.default.parallelism5.1.5
分區(qū)ApacheMesos模式?jīng)]有設(shè)置默認(rèn)分區(qū)數(shù)目為85.1.5
分區(qū)YARN模式Standalone模式集群中所有CPU核心數(shù)目總和“a”5.1.5
分區(qū)>“a”“2”默認(rèn)值5.1.5
分區(qū)>默認(rèn)值“a”“2”5.1.5
分區(qū)5.1.5
分區(qū)
設(shè)置分區(qū)的個數(shù)—創(chuàng)建RDD時手動指定分區(qū)個數(shù)在調(diào)用textFile()和parallelize()方法時手動指定分區(qū)個數(shù)即可,語法格式如下sc.textFile(path,partitionNum)指定要加載的文件的地址參數(shù)用于指定分區(qū)個數(shù)path參數(shù)partitionNum參數(shù)scala>valarray=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array,2)//設(shè)置兩個分區(qū)5.1.5
分區(qū)
設(shè)置分區(qū)的個數(shù)—使用reparititon方法重新設(shè)置分區(qū)個數(shù)通過轉(zhuǎn)換操作得到新RDD時,直接調(diào)用repartition方法即可。例如:scala>valdata=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)data:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[12]attextFileat<console>:24scala>data.partitions.size//顯示data這個RDD的分區(qū)數(shù)量res2:Int=2scala>valrdd=data.repartition(1)//對data這個RDD進(jìn)行重新分區(qū)rdd:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atrepartitionat:26scala>rdd.partitions.sizeres4:Int=15.1.5
分區(qū)HashPartitioner(哈希分區(qū))RangePartitioner(區(qū)域分區(qū))自定義分區(qū)5.1.5
分區(qū)numPartitions:Int返回創(chuàng)建出來的分區(qū)數(shù)getPartition(key:Any):Int返回給定鍵的分區(qū)編號(0到numPartitions-1)equals()Java判斷相等性的標(biāo)準(zhǔn)方法繼承org.apache.spark.Partitioner自定義分區(qū)方法定義Partitioner類5.1.5
分區(qū)10寫入到part-0000011寫入到part-0000119寫入到part-00009……根據(jù)key值的最后一位數(shù)字,寫到不同的文件5.1.5
分區(qū)importorg.apache.spark.{Partitioner,SparkContext,SparkConf}//自定義分區(qū)類,需要繼承org.apache.spark.Partitioner類classMyPartitioner(numParts:Int)extendsPartitioner{//覆蓋分區(qū)數(shù)overridedefnumPartitions:Int=numParts//覆蓋分區(qū)號獲取函數(shù)overridedefgetPartition(key:Any):Int={key.toString.toInt%10}}5.1.5
分區(qū)objectTestPartitioner{defmain(args:Array[String]){valconf=newSparkConf()valsc=newSparkContext(conf)//模擬5個分區(qū)的數(shù)據(jù)valdata=sc.parallelize(1to10,5)//根據(jù)尾號轉(zhuǎn)變?yōu)?0個分區(qū),分別寫到10個文件data.map((_,1)).partitionBy(newMyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")}}5.1.6
一個綜合實例多個單詞構(gòu)成word.txt文本文本文本文本如何進(jìn)行詞頻統(tǒng)計?5.1.6
一個綜合實例scala>vallines=sc.//代碼一行放不下,可以在圓點(diǎn)后回車,在下行繼續(xù)輸入|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")scala>valwordCount=lines.flatMap(line=>line.split("")).|map(word=>(word,1)).reduceByKey((a,b)=>a+b)scala>wordCount.collect()scala>wordCount.foreach(println)5.1.6
一個綜合實例5.1.6
一個綜合實例圖
在一個集群中同時部署Hadoop和Spark5.1.6
一個綜合實例圖
在集群中執(zhí)行詞頻統(tǒng)計過程示意圖鍵值對RDD5.2鍵值對RDD提綱常用的鍵值對RDD轉(zhuǎn)換操作一個綜合實例鍵值對RDD的創(chuàng)建1235.2.1鍵值對RDD的創(chuàng)建
(1)第一種創(chuàng)建方式:從文件中加載,可采用map()函數(shù)創(chuàng)建PairRDDscala>vallines=sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/pairrdd/word.txtMapPartitionsRDD[1]attextFileat<console>:27scala>valpairRDD=lines.flatMap(line=>line.split("")).map(word=>(word,1))pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[3]atmapat<console>:29scala>pairRDD.foreach(println)(i,1)(love,1)(hadoop,1)……5.2.1鍵值對RDD的創(chuàng)建
(2)第二種創(chuàng)建方式:通過并行集合(數(shù)組)創(chuàng)建RDDscala>vallist=List("Hadoop","Spark","Hive","Spark")list:List[String]=List(Hadoop,Spark,Hive,Spark)
scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[11]atparallelizeat<console>:29
scala>valpairRDD=rdd.map(word=>(word,1))pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[12]atmapat<console>:31
scala>pairRDD.foreach(println)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
joinreduceByKey(func)groupByKey()
keysvaluessortByKey()mapValues(func)combineByKey常用的鍵值對RDD轉(zhuǎn)換操作reduceByKey(func)使用func函數(shù)合并具有相同鍵的值功能5.2.2常用的鍵值對RDD轉(zhuǎn)換操作scala>pairRDD.reduceByKey((a,b)=>a+b).foreach(println)(Spark,2)(Hadoop,1)(Hive,1)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)
(reduceByKey(func)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作5.2.2常用的鍵值對RDD轉(zhuǎn)換操作功能groupByKey()對具有相同鍵的值進(jìn)行分組5.2.2常用的鍵值對RDD轉(zhuǎn)換操作5.2.1鍵值對RDD的創(chuàng)建groupByKey()("spark",1)("spark",2)("hadoop",3)("hadoop",5)(“spark”,(1,2))(“hadoop",(3,5))groupByKey()
groupByKey()(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.groupByKey()res15:org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[15]atgroupByKeyat<console>:345.2.2常用的鍵值對RDD轉(zhuǎn)換操作5.2.2常用的鍵值對RDD轉(zhuǎn)換操作只做分組更進(jìn)一步reduceByKey和groupByKey的區(qū)別groupByKeyreduceByKey(key,value-list)不會進(jìn)行匯總求和value-list進(jìn)行匯總求和5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
reduceByKey和groupByKey的區(qū)別,具體實例如下scala>val
words
=
Array("one",
"two",
"two",
"three",
"three",
"three")
scala>val
wordPairsRDD
=
sc.parallelize(words).map(word
=>
(word,
1))
scala>val
wordCountsWithReduce
=
wordPairsRDD.reduceByKey(_
+
_)
scala>val
wordCountsWithGroup
=
wordPairsRDD.groupByKey().map(t
=>
(t._1,
t._2.sum))
得到的wordCountsWithReduce和wordCountsWithGroup是一樣的,但內(nèi)部運(yùn)算過程不同
keys只會把PairRDD中的key返回形成一個新的RDD(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.keysres17:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[17]atkeysat<console>:34scala>pairRDD.keys.foreach(println)HadoopSparkHiveSpark5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
values只會把PairRDD中的value返回形成一個新的RDD(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.valuesres0:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[2]atvaluesat<console>:34
scala>pairRDD.values.foreach(println)11115.2.2常用的鍵值對RDD轉(zhuǎn)換操作
sortByKey()的功能是返回一個根據(jù)鍵排序的RDD(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.sortByKey()res0:org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[2]atsortByKeyat<console>:34scala>pairRDD.sortByKey().foreach(println)(Hadoop,1)(Hive,1)(Spark,1)(Spark,1)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
sortByKey()
和
sortBy()具體實例
scala>vald1=sc.parallelize(Array((“c",8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)))scala>d1.reduceByKey(_+_).sortByKey(false).collectres2:Array[(String,Int)]=Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))scala>vald1=sc.parallelize(Array((“c",8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)))scala>d1.reduceByKey(_+_).sortByKey(false).collectres2:Array[(String,Int)]=Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))
mapValues(func)對鍵值對RDD中每個value都應(yīng)用一個函數(shù)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.mapValues(x=>x+1)res2:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[4]atmapValuesat<console>:34scala>pairRDD.mapValues(x=>x+1).foreach(println)(Hadoop,2)(Spark,2)(Hive,2)(Spark,2)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
join就表示內(nèi)連接,只有在兩個數(shù)據(jù)集中都存在的key才會被輸出scala>valpairRDD1=sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))pairRDD1:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[24]atparallelizeat<console>:27
scala>valpairRDD2=sc.parallelize(Array(("spark","fast")))pairRDD2:org.apache.spark.rdd.RDD[(String,String)]=ParallelCollectionRDD[25]atparallelizeat<console>:27
scala>pairRDD1.join(pairRDD2)res9:org.apache.spark.rdd.RDD[(String,(Int,String))]=MapPartitionsRDD[28]atjoinat<console>:32
scala>pairRDD1.join(pairRDD2).foreach(println)(spark,(1,fast))(spark,(2,fast))
5.2.2常用的鍵值對RDD轉(zhuǎn)換操作mergeValuecreateCombinermergeCombinerspartitionermapSideCombinecombineByKey5.2.2常用的鍵值對RDD轉(zhuǎn)換操作mergeValuecreateCombinermergeCombinerspartitionermapSideCombine第一次遇到key時創(chuàng)建組合器函數(shù),將RDD數(shù)據(jù)集中V類型值轉(zhuǎn)換成C類型值(V=>C)合并值函數(shù),再次遇到相同的Key時,將createCombiner的C類型值與這次傳入的V類型值合并成一個C類型值(C,V)=>C合并組合器函數(shù),將C類型值兩兩合并成一個C類型值使用已有的或自定義的分區(qū)函數(shù),默認(rèn)是HashPartitioner是否在map端進(jìn)行Combine操作,默認(rèn)為true5.2.2常用的鍵值對RDD轉(zhuǎn)換操作下面通過一個實例來解釋如何使用combineByKey操作。假設(shè)有一些銷售數(shù)據(jù),數(shù)據(jù)采用鍵值對的形式,即<公司,當(dāng)月收入>,要求使用combineByKey操作求出每個公司的總收入和每月平均收入,并保存在本地文件中5.2.2常用的鍵值對RDD轉(zhuǎn)換操作importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfobjectCombine{defmain(args:Array[String]){valconf=newSparkConf().setAppName("Combine").setMaster("local")valsc=newSparkContext(conf)valdata=sc.parallelize(Array(("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)),3)valres=bineByKey((income)=>(income,1),(acc:(Int,Int),income)=>(acc._1+income,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map({case(key,value)=>(key,value._1,value._1/value._2.toFloat)})res.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/result")}}5.2.2常用的鍵值對RDD轉(zhuǎn)換操作Combine.scala代碼中combineByKey()的參數(shù)值參數(shù)名稱參數(shù)值createCombiner(income)=>(income,1)mergeValue(acc:(Int,Int),income)=>(acc._1+income,acc._2+1)mergeCombiners(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作01設(shè)置聚合時的初始值。需要注意的是,初始值并非總是數(shù)字,有時候可能是集合zeroValue03跨分區(qū)聚合,對數(shù)據(jù)進(jìn)行最終的匯總時調(diào)用此操作combOp02將值V聚合到類型為U的對象中seqOpaggregateByKey5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
aggregateByKey
具體實例如下scala>valrdd1=sc.parallelize(Array(("USER1","URL1"),("USER2","URL1"),("USER1","URL1"),("USER1","URL2"),("USER2","URL3")))scala>valrdd2=rdd1.aggregateByKey(collection.mutable.Set[String]())(|(urlSet,url)=>urlSet+url,|(urlSet1,urlSet2)=>urlSet1++=urlSet2)scala>rdd2.collectres12:Array[(String,scala.collection.mutable.Set[String])]=Array((USER1,Set(URL1,URL2)),(USER2,Set(URL1,URL3)))5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
下面給出一個實例來演示flatMapValues與mapValues的區(qū)別rdd1=sc.parallelize(Array(("file1","storm/hadoop/spark/flink"),("file1","hbase/hdfs/spark/flink"),("file2","zookeeper/flink/hadoop/hive"),("file2","flink/hive/flume")))scala>valrdd2=rdd1.flatMapValues(_.split("/"))scala>rdd2.collectres0:Array[(String,String)]=Array((file1,storm),(file1,hadoop),(file1,spark),(file1,flink),(file1,hbase),(file1,hdfs),(file1,spark),(file1,flink),(file2,zookeeper),(file2,flink),(file2,hadoop),(file2,hive),(file2,flink),(file2,hive),(file2,flume))scala>valrdd3=rdd1.mapValues(_.split("/"))scala>rdd3.collectres1:Array[(String,Array[String])]=Array((file1,Array(storm,hadoop,spark,flink)),(file1,Array(hbase,hdfs,spark,flink)),(file2,Array(zookeeper,flink,hadoop,hive)),(file2,Array(flink,hive,flume)))5.2.2常用的鍵值對RDD轉(zhuǎn)換操作flatMapValues(func)("spark",6)("hadoop",6)("hadoop",4)("spark",2)5.2.3一個綜合實例鍵值對
一個綜合實例—計算每種圖書的每天平均銷量scala>valrdd=sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))rdd:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[38]atparallelizeat<console>:27
scala>rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()res22:Array[(String,Int)]=Array((spark,4),(hadoop,5))5.2.3一個綜合實例5.2.3一個綜合實例計算圖書平均銷量過程示意圖
一個綜合實例—計算每種圖書的每天平均銷量scala>valrdd=sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))rdd:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[38]atparallelizeat<console>:27
scala>rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()res22:Array[(String,Int)]=Array((spark,4),(hadoop,5))5.2.3一個綜合實例數(shù)據(jù)讀寫5.3數(shù)據(jù)讀寫本地文件系統(tǒng)的數(shù)據(jù)讀寫分布式文件系統(tǒng)HDFS的數(shù)據(jù)讀寫JSON文件的數(shù)據(jù)讀寫5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫020103從文件中讀取數(shù)據(jù)創(chuàng)建RDD把RDD寫入到文本文件中JSON文件的讀取(1)從文件中讀取數(shù)據(jù)創(chuàng)建RDD具體實例
scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
因為Spark采用了惰性機(jī)制,即使輸入錯誤語句,spark-shell也不會馬上報錯
scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫
(2)把RDD寫入到文本文件中具體實例
scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")scala>textFile.|saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")$cd/usr/local/spark/mycode/wordcount/writeback/$lspart-00000__SUCCESS5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫5.2.2常用的鍵值對RDD轉(zhuǎn)換操作
(3)JSON文件的讀取具體實例
scala>valtextFile=sc.textFile("file:///usr/local/spark/mycode/wordcount/writeback")
如果想再次把數(shù)據(jù)加載在RDD中,只要使用writeback這個目錄即可
{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}JSON(JavaScriptObjectNotation)是一種輕量級的數(shù)據(jù)交換格式JSON樣例數(shù)據(jù)文件,存放在“/usr/local/spark/examples/src/main/resources/people.json”scala>valjsonStr=sc.|textFile("file:///usr/local/spark/examples/src/main/resources/people.json")scala>jsonStr.foreach(println){"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}
(3)JSON文件的讀取具體實例:把的people.json文件加載到RDD中
5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫
任務(wù):編寫程序完成對JSON數(shù)據(jù)的解析工作
(1)Scala中有一個自帶的JSON庫——scala.util.parsing.json.JSON,可以實現(xiàn)對JSON數(shù)據(jù)的解析JSON.parseFull(jsonString:String)函數(shù),以一個JSON字符串作為輸入并進(jìn)行解析,如果解析成功則返回一個Some(map:Map[String,Any]),如果解析失敗則返回None(2)5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._importorg.apache.spark.SparkConfimportscala.util.parsing.json.JSONobjectJSONRead{defmain(args:Array[String]){
val
inputFile="file:///usr/local/spark/examples/src/main/resources/people.json"
valconf=newSparkConf().setAppName("JSONRead")
valsc=newSparkContext(conf)
val
jsonStrs=sc.textFile(inputFile)
valresult=jsonStrs.map(s=>JSON.parseFull(s))
result.foreach({r=>rmatch{caseSome(map:Map[String,Any])=>println(map)caseNone=>println("Parsingfailed")caseother=>println("Unknowndatastructure:"+other)}})}}JSONRead.scalaname:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%%"spark-core"%"3.2.0"libraryDependencies+="org.scala-lang.modules"%%"scala-parser-combinators"%"1.0.4"
把simple.sbt文件設(shè)置為如下內(nèi)容
5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫5.3.1本地文件系統(tǒng)的數(shù)據(jù)讀寫將整個應(yīng)用程序打包成JAR包通過spark-submit運(yùn)行程序$/usr/local/spark/bin/spark-submit\>--class"JSONRead”\
>/usr/local/spark/mycode/json/target/scala-2.12/json-project_2.12-1.0.jar
執(zhí)行后可以在屏幕上的大量輸出信息中找到如下結(jié)果
Map(name->Michael)Map(name->Andy,age->30.0)Map(name->Justin,age->19.0)5.3.2分布式文件系統(tǒng)HDFS的數(shù)據(jù)讀寫
如下三條語句都是等價的
scala>valtextFile=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>textFile.first()
從分布式文件系統(tǒng)HDFS中讀取數(shù)據(jù),也是采用textFile()方法
scala>valtextFile=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>valtextFile=sc.textFile("/user/hadoop/word.txt")scala>valtextFile=sc.textFile("word.txt")可以使用saveAsTextFile()方法把RDD中的數(shù)據(jù)保存到HDFS文件中scala>textFile.saveAsTextFile("writeback")5.3.3讀寫MySQL數(shù)據(jù)庫
在MySQLShell環(huán)境中,輸入下面SQL語句完成數(shù)據(jù)庫和表的創(chuàng)建
$servicemysqlstart$mysql-uroot-p#屏幕會提示輸入密碼
安裝成功以后,在Linux中啟動MySQL數(shù)據(jù)庫,命令如下
mysql>createdatabasespark;mysql>usespark;mysql>createtablestudent(idint(4),namechar(20),genderchar(4),ageint(4));mysql>insertintostudentvalues(1,'Xueqian','F',23);mysql>insertintostudentvalues(2,'Weiliang','M',24);mysql>select*fromstudent;5.3.3讀寫MySQL數(shù)據(jù)庫
2.從MySQL數(shù)據(jù)庫讀取數(shù)據(jù):執(zhí)行如下命令新建一個代碼文件ReadMySQL.scala
$cd~/Downloads$sudotar-zxvfmysql-connector-java-5.1.40.tar.gz-C/usr/local/spark/jars
1.
準(zhǔn)備工作:把該驅(qū)動程序解壓縮到Spark安裝目錄“/usr/local/spark/jars”下$cd~/sparkapp/src/main/scala#假設(shè)該目錄已經(jīng)存在$vimReadMySQL.scala5.3.3讀寫MySQL數(shù)據(jù)庫importjava.sql.DriverManagerimportorg.apache.spark.rdd.JdbcRDDimportorg.apache.spark.{SparkConf,SparkContext}objectReadMySQL{defmain(args:Array[String]){valconf=newSparkConf().setAppName("ReadMySQL").setMaster("local[2]")valsc=newSparkContext(conf)sc.setLogLevel("ERROR")valinputMySQL=newJdbcRDD(sc,()=>{Class.forName("com.mysql.jdbc.Driver")DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?useUnicode=true&characterEncoding=utf8","root","123456")//root是數(shù)據(jù)庫用戶名,123456是密碼},"SELECT*FROMstudentwhereid>=?andid<=?;",1,//設(shè)置條件查詢中id的下界2,//設(shè)置條件查詢中id的上界1,//設(shè)置分區(qū)數(shù)r=>(r.getInt(1),r.getString(2),r.getString(3),r.getInt(4)))inputMySQL.foreach(println)sc.stop()}}5.3.3讀寫MySQL數(shù)據(jù)庫
執(zhí)行如下命令對代碼進(jìn)行編譯打包name:="SimpleProject"version:="1.0"scalaVersion:="2.12.15"libraryDependencies+="org.apache.spark"%%"spark-core"%"3.2.0"
在sparkapp目錄下新建simple.sbt文件并輸入以下內(nèi)容
$cd~/sparkapp$/usr/local/sbt/sbtpackage5.3.3讀寫MySQL數(shù)據(jù)庫
執(zhí)行如下命令對代碼進(jìn)行編譯打包$cd~/sparkapp$/usr/local/spark/bin/spark-submit\>--jars\>/usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar\>--class"ReadMySQL"\>./target/scala-2.12/simple-project_2.12-1.0.jar
然后執(zhí)行如下命令運(yùn)行程序
(1,Xueqian,F,23)(2,Weiliang,M,24)5.3.3讀寫MySQL數(shù)據(jù)庫importjava.sql.DriverManagerimportorg.apache.spark.rdd.JdbcRDDimportorg.apache.spark.{SparkConf,SparkContext}objectWriteMySQL{defmain(args:Array[String]){valconf=newSparkConf().setAppName("WriteMySQL").setMaster("local[2]")valsc=newSparkContext(conf)sc.setLogLevel("ERROR")Class.forName("com.mysql.jdbc.Driver")valrddData=sc.parallelize(List((3,"Rongcheng","M",26),(4,"Guanhua","M",27)))
3.
向MySQL數(shù)據(jù)庫寫入數(shù)據(jù)
5.3.3讀寫MySQL數(shù)據(jù)庫
rddData.foreachPartition((iter:Iterator[(Int,String,String,Int)])=>{valconn=DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?useUnicode=true&characterEncoding=utf8","root","123456")conn.setAutoCommit(false)valpreparedStatement=conn.prepareStatement("INSERTINTOstudent(id,name,gender,age)VALUES(?,?,?,?)")iter.foreach(t=>{preparedStatement.setInt(1,t._1)preparedStatement.setString(2,t._2)preparedStatement.setString(3,t._3)preparedStatement.setInt(4,t._4)preparedStatement.addBatch()})preparedStatement.executeBatch()mit()conn.close()})sc.stop()}}$/usr/local/spark/bin/spark-submit\>--jars\>/usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar\>--class"WriteMySQL"\>./target/scala-2.12/simple-project_2.12-1.0.jar
對代碼進(jìn)行編譯打包,然后執(zhí)行如下命令運(yùn)行程序
5.3.3讀寫MySQL數(shù)據(jù)庫mysql>select*fromstudent;+------+-----------+--------+------+|id|name|gender|age|+------+-----------+--------+------+|1|Xueqian|F|23||2|Weiliang|M|24||3|Rongcheng|M|26||4|Guanhua|M|27|+------+-----------+--------+------+4rowsinset(0.00sec)(2,Weiliang,M,24)
可以到MySQLShell環(huán)境中使用SQL語句查詢student表5.3.3讀寫MySQL數(shù)據(jù)庫5.4
綜合案例提綱12求最大最小值求TOP值5文件排序二次排序連接操作345.4.1案例1:求TOP值ordereiduseridpaymentproductid1,1768,50,1552,1218,600,2113,2239,788,2424,3101,28,5995,4899,290,1296,3110,54,12017,4436,259,8778,2369,7890,27100,4287,226,233101,6562,489,124102,1124,33,17103,3267,159,179104,4569,57,125105,1438,37,116file1.txt
file2.txt5.4.1案例1:求TOP值求TopN個payment值1,1768,50,1552,1218,600,2113,2239,788,2424,3101,28,5995,4899,290,1296,3110,54,12017,4436,259,8778,2369,7890,27100,4287,226,233101,6562,489,124102,1124,33,17103,3267,159,179104,4569,57,125105,1438,37,116file1.txt
file2.txt5.4.1案例1:求TOP值
importorg.apache.spark.{SparkConf,SparkContext}objectTopN{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("TopN").setMaster("local")valsc=newSparkContext(conf)sc.setLogLevel("ERROR")vallines=sc.textFile("hdfs://localhost:9000/user/hadoop/spark/mycode/rdd/examples",2)varnum=0;valresu
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年飲料及冷飲服務(wù)項目合作計劃書
- 門診護(hù)理禮儀與情緒管理
- VSD護(hù)理質(zhì)量控制標(biāo)準(zhǔn)
- 自考護(hù)理本科護(hù)理倫理與法律
- 帕金森病護(hù)理團(tuán)隊協(xié)作模式探討
- 告別外號煩惱課件
- 貼面護(hù)理的藝術(shù)之美
- 護(hù)理管理與團(tuán)隊協(xié)作
- 早產(chǎn)兒家庭護(hù)理環(huán)境布置
- 單器官血管炎的護(hù)理
- 數(shù)字化轉(zhuǎn)型賦能高校課程思政的實施進(jìn)路與評價創(chuàng)新
- 捷盟-03-京唐港組織設(shè)計與崗位管理方案0528-定稿
- 基于SystemView的數(shù)字通信仿真課程設(shè)計
- 物業(yè)二次裝修管理規(guī)定
- GB 10133-2014食品安全國家標(biāo)準(zhǔn)水產(chǎn)調(diào)味品
- FZ/T 92023-2017棉紡環(huán)錠細(xì)紗錠子
- 采氣工程課件
- 非洲豬瘟實驗室診斷電子教案課件
- 工時的記錄表
- 金屬材料與熱處理全套ppt課件完整版教程
- 熱拌瀝青混合料路面施工機(jī)械配置計算(含表格)
評論
0/150
提交評論