版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
Spark大數據技術與應用案例教程主講教師:目錄項目一Spark入門項目二SparkRDD——彈性分布式數據集項目三SparkSQL——結構化數據處理項目四SparkStreaming——實時計算框架目錄項目五SparkMLlib——機器學習庫項目六GraphFrames——圖計算框架項目七綜合案例——分析銀行個人信貸
業(yè)務數據項目二SparkRDD——彈性分布式數據集第5頁在實際應用中,已有的迭代算法和交互式數據挖掘工具存在兩個問題。SparkRDD的出現解決了它們,它提供了一個抽象的數據架構,實現了通用性;不同RDD之間的轉換操作形成依賴關系,可以實現管道化,避免了中間結果的存儲,降低了開銷。本項目結合分析學生成績數據實例,首先介紹創(chuàng)建RDD的方法,然后介紹RDD的轉換操作和行動操作,最后介紹存儲RDD的方法。第6頁掌握RDD的執(zhí)行過程和依賴關系。理解RDD持久化和分區(qū)。熟悉Spark中常見的文件格式。第7頁能讀取數據創(chuàng)建RDD。能使用RDD的不同操作處理數據。能對RDD進行持久化和分區(qū)操作。能將RDD存儲為不同類型的文件。第8頁掌握編程思路,培養(yǎng)邏輯思維能力。任務一讀取學生成績創(chuàng)建RDD任務二查詢考試成績排名前三的學生信息任務三計算學生的平均成績任務四存儲歸納后的學生成績數據任務一讀取學生成績創(chuàng)建RDD第11頁通過分析和統計學生的成績,學??梢粤私鈱W生的學習情況,并根據學習情況對學生進行評優(yōu)評先,從而激發(fā)學生的學習動力。使用Spark分析和統計學生成績時,需要讀取學生成績數據創(chuàng)建RDD。創(chuàng)建RDD之前我們先學習SparkRDD的執(zhí)行過程、RDD之間的依賴關系,以及創(chuàng)建RDD的不同方法。第12頁一、SparkRDD的執(zhí)行過程彈性分布式數據集(RDD)是Spark中最基本的抽象概念之一,它是一個不可變的、彈性的、可分區(qū)的分布式數據集合。每個RDD可以分成多個分區(qū),每個分區(qū)就是一個數據集片段。第13頁一、SparkRDD的執(zhí)行過程一個RDD的不同分區(qū)可以存儲在集群的不同節(jié)點上,從而實現分布式計算。RDD的特性如圖所示。第14頁一、SparkRDD的執(zhí)行過程RDD屏蔽了復雜的底層分布式計算,為用戶提供了方便的數據轉換和求值方法,其典型的執(zhí)行過程如圖所示。第15頁一、SparkRDD的執(zhí)行過程Spark會讀取數據源(如本地文件系統、HDFS、數據庫等)中的數據創(chuàng)建RDD。(1)創(chuàng)建RDDRDD典型的執(zhí)行過程第16頁一、SparkRDD的執(zhí)行過程對已有RDD進行各種轉換操作。這些操作都是惰性求值的,不會立即執(zhí)行,而是在遇到行動操作時才會執(zhí)行。每次轉換操作都會生成一個新的RDD,該RDD會記錄其與前一個RDD的依賴關系,并形成一條有向無環(huán)圖(DAG)。(2)轉換操作RDD典型的執(zhí)行過程第17頁一、SparkRDD的執(zhí)行過程執(zhí)行針對RDD的行動操作得到值(值也是RDD),可以將值返回給驅動程序或者輸出到外部存儲系統。(3)行動操作RDD典型的執(zhí)行過程第18頁一、SparkRDD的執(zhí)行過程例如,RDD執(zhí)行過程的一個實例如圖所示。第19頁一、SparkRDD的執(zhí)行過程其中,讀取輸入的數據,邏輯上創(chuàng)建A和C兩個RDD,經過一系列轉換操作,邏輯上生成F(也是RDD)。此時,計算并沒有真正發(fā)生,Spark中形成記錄了RDD之間依賴關系的DAG。當遇到針對F的行動操作時,Spark生成一個作業(yè),向DAG調度器提交作業(yè),觸發(fā)從起點開始的真正計算,輸出計算結果。這一系列處理稱為一個“血緣關系”,即DAG拓撲排序的結果。第20頁二、SparkRDD之間的依賴關系在Spark中,不同的操作使不同的RDD之間產生了不同的依賴關系,這些依賴關系可以分為窄依賴(narrowdependency)和寬依賴(widedependency)。第21頁二、SparkRDD之間的依賴關系(1)父RDD父RDD是指在Spark中生成當前RDD的原始數據集,每個RDD可以有一個或多個父RDD。第22頁二、SparkRDD之間的依賴關系(2)子RDD子RDD指的是通過對父RDD進行轉換操作派生出來的新RDD。第23頁二、SparkRDD之間的依賴關系(3)Shuffle操作Shuffle操作是指根據某個鍵(key)對元素進行重新分區(qū)和重新組合的過程。Shuffle操作將數據從源分區(qū)移動到目標分區(qū),并對數據進行排序或聚合等操作第24頁二、SparkRDD之間的依賴關系窄依賴是指父RDD的每個分區(qū)最多被子RDD的一個分區(qū)所使用,子RDD分區(qū)通常對應一個或多個父RDD分區(qū)。當RDD執(zhí)行map()、filter()、union()和對輸入做協同劃分的join()等操作時,就會產生窄依賴。1.窄依賴第25頁二、SparkRDD之間的依賴關系1.窄依賴窄依賴操作不涉及數據的重分區(qū),因此它們的執(zhí)行效率通常比較高,同時也可以避免大量的數據傳輸和網絡I/O。第26頁二、SparkRDD之間的依賴關系寬依賴是指父RDD的每個分區(qū)都可能被子RDD的多個分區(qū)所使用,子RDD分區(qū)通常對應所有的父RDD分區(qū)。當RDD執(zhí)行groupByKey()和對輸入做非協同劃分的join()等操作時,就會產生寬依賴。2.寬依賴第27頁二、SparkRDD之間的依賴關系2.寬依賴寬依賴操作需要通過Shuffle操作重新分配數據,從而滿足計算需求。Shuffle是一種非常耗費計算資源和網絡帶寬的操作,因此寬依賴的執(zhí)行效率通常比較低。第28頁二、SparkRDD之間的依賴關系RDD之間的依賴關系使得Spark天生具有容錯性。這主要得益于每個RDD都有一個與之對應的血緣關系,該關系記錄了RDD是如何從父RDD轉換得到的。第29頁二、SparkRDD之間的依賴關系如果某個中間RDD分區(qū)丟失或損壞,Spark可以使用血緣關系追溯所有依賴該分區(qū)的RDD和轉換操作,重新計算出受影響的RDD分區(qū),從而實現數據的自動恢復。第30頁二、SparkRDD之間的依賴關系窄依賴:一個分區(qū)的節(jié)點發(fā)生故障時,其他分區(qū)都不受影響,只需要重新計算該分區(qū)即可。寬依賴:當一個分區(qū)的節(jié)點發(fā)生故障時,所有依賴其輸出的節(jié)點所在的分區(qū)都必須重新計算這導致Spark在容錯時需要重新計算更多的分區(qū),從而增加了運行時間和資源消耗。比較而言,窄依賴的故障恢復更高效。第31頁三、SparkRDD的創(chuàng)建在Spark中創(chuàng)建RDD的方法(1)集合并行化創(chuàng)建RDD(2)讀取外部存儲系統的數據創(chuàng)建RDD(3)對已有的RDD進行轉換得到新的RDD第32頁三、SparkRDD的創(chuàng)建SparkContext是Spark與集群通信的主要入口點,它支持從不同的數據源加載數據創(chuàng)建RDD,還支持執(zhí)行不同的RDD操作。1.集合并行化創(chuàng)建RDD第33頁三、SparkRDD的創(chuàng)建在開發(fā)獨立應用程序時,首先使用SparkConf()方法創(chuàng)建SparkConf對象,然后使用setter方法配置Spark應用程序的相關屬性,最后基于上述配置創(chuàng)建一個SparkContext對象。參考示例如下。frompysparkimportSparkConf,SparkContext#創(chuàng)建SparkConf對象conf=SparkConf().setAppName("AppName").setMaster("URL")#創(chuàng)建SparkContext對象sc=SparkContext(conf=conf)1.集合并行化創(chuàng)建RDD第34頁三、SparkRDD的創(chuàng)建SparkConf對象常用的setter方法如表所示。1.集合并行化創(chuàng)建RDD方法說明setAppName(value)設置Spark應用程序的名稱。參數value為名稱setMaster(value)設置Spark應用程序的主節(jié)點URL第35頁三、SparkRDD的創(chuàng)建在Spark中,使用SparkContext類中的parallelize()方法可以將一個已經存在的Python列表、元組或集合轉換為RDD,從而實現并行化處理。該方法的基本格式如下。parallelize(c,numSlices)1.集合并行化創(chuàng)建RDD第36頁三、SparkRDD的創(chuàng)建其中,參數的含義如下。(1)c:要并行化的數據集合,該集合可以是Python列表、元組或集合。(2)numSlices(可選):分區(qū)數,若不設置分區(qū)數,則RDD的分區(qū)數默認為該程序分配到的資源的CPU核心數。1.集合并行化創(chuàng)建RDD第37頁三、SparkRDD的創(chuàng)建【例2-1】使用parallelize()方法創(chuàng)建RDD,然后輸出RDD的元素和默認分區(qū)數,如圖所示。1.集合并行化創(chuàng)建RDD#進入pyspark交互式執(zhí)行環(huán)境[hadoop@bogon~]$pyspark#使用parallelize()創(chuàng)建RDD>>>rdd=sc.parallelize([1,2,3,4,5])#輸出RDD的元素>>>rdd.collect()#查看RDD默認分區(qū)數>>>rdd.getNumPartitions()第38頁三、SparkRDD的創(chuàng)建外部存儲系統(1)本地文件系統(2)Hadoop分布式文件系統(Hadoopdistributedfilesystem,HDFS)(3)Hadoop支持的其他文件系統2.讀取外部存儲系統的數據創(chuàng)建RDD第39頁三、SparkRDD的創(chuàng)建在Spark中,使用SparkContext對象的textFile()方法可以從外部存儲系統中讀取不同格式文件中的數據創(chuàng)建RDD。該方法的基本格式如下。textFile(name,minPartitions,use_unicode)2.讀取外部存儲系統的數據創(chuàng)建RDD第40頁三、SparkRDD的創(chuàng)建其中,參數的含義如下。(1)name:要讀取的數據文件的路徑。(2)minPartitions(可選):建議生成的RDD的最小分區(qū)數。(3)use_unicode(可選):是否在讀取文件時使用Unicode編碼。默認值為True,表示使用Unicode編碼解析文本文件;若設置為False,則用其他編碼(如UTF-8)來處理文本數據。2.讀取外部存儲系統的數據創(chuàng)建RDD第41頁三、SparkRDD的創(chuàng)建
【例2-2】本地文件系統中存放一個“/usr/local/saprk/mycode/hello_spark.txt”文件,該文件包含兩句話,分別是“HelloSpark”和“IloveSpark”。使用textFile()方法讀取本地文件系統中文本文件的數據創(chuàng)建RDD,然后輸出RDD的元素,如圖所示。2.讀取外部存儲系統的數據創(chuàng)建RDD第42頁三、SparkRDD的創(chuàng)建2.讀取外部存儲系統的數據創(chuàng)建RDD[hadoop@bogon~]$pyspark#讀取本地文件創(chuàng)建RDD>>>words=sc.textFile("file:///usr/local/spark/mycode/hello_spark.txt")#輸出RDD的元素>>>words.foreach(print)第43頁三、SparkRDD的創(chuàng)建【例2-3】在HDFS中創(chuàng)建“/hellospark.txt”文件,該文件包含兩句話,分別是“HelloSpark”和“IloveSpark”。使用textFile()方法讀取HDFS中的文本文件創(chuàng)建RDD,然后輸出RDD中的元素,如圖所示。2.讀取外部存儲系統的數據創(chuàng)建RDD第44頁三、SparkRDD的創(chuàng)建#啟動Hadoop[hadoop@bogon~]cd/usr/local/hadoop/sbin[hadoop@bogonsbin]$./start-dfs.sh#在HDFS中創(chuàng)建hellospark.txt文件[hadoop@bogonsbin]$echo-e"HelloSpark\nIloveSpark"|hadoopfs-put-/hellospark.txt[hadoop@bogon~]$pyspark#讀取HDFS中的文本文件創(chuàng)建RDD>>>words_HDFS=sc.textFile("/hellospark.txt")#輸出RDD的元素>>>words_HDFS.foreach(print)2.讀取外部存儲系統的數據創(chuàng)建RDD第45頁任務分析讀取學生成績創(chuàng)建RDD學生成績由考試成績和操行成績組成,該數據存放在“/usr/local/spark/mycode/rdd”目錄下。其中,學生的考試成績存放在“Student_Exam_Scores.csv”文件中,操行成績存放在“Student_Conduct_Scores.csv”文件中。兩份文件的數據格式和字段均一致,字段的解釋說明如表所示。點擊此處播放微課第46頁注意:每個任務實施開始之前都需要確保Hadoop已經啟動成功,以保證Spark應用程序可以順利執(zhí)行。后面各任務實施不再重復強調。字段名稱說明字段名稱說明Stu_ID學號Age年齡Stu_name姓名Class班級Gerder性別Scores成績Birth出生日期——第47頁步驟1在終端中執(zhí)行以下命令,切換至“/usr/local/spark/mycode/rdd”目錄下,然后將該目錄下的“Student_Exam_Scores.csv”文件和“Student_Conduct_Scores.csv”文件上傳到HDFS的根目錄下。[hadoop@bogon~]$cd/usr/local/spark/mycode/rdd[hadoop@bogonmycode]$hdfsdfs-put/usr/local/spark/mycode/rdd/Student_Exam_Scores.csv/[hadoop@bogonmycode]$hdfsdfs-put/usr/local/spark/mycode/rdd/Student_Conduct_Scores.csv/第48頁步驟2在PyCharm中新建“rdd”目錄,然后在“rdd”目錄下新建“read_exam_information.py”文件。第49頁步驟3在“read_exam_information.py”文件中編寫應用程序。讀取“Student_Exam_Scores.csv”文件中的數據創(chuàng)建RDD,輸出學生信息(包括學號、姓名、性別、出生日期、年齡、班級和考試成績)。實現過程如下。第50頁步驟3首先,配置Spark應用程序并創(chuàng)建SparkContext對象。創(chuàng)建Spark的配置對象sparkConf,使用setAppName()方法設置Spark應用程序的名稱,使用setMaster()方法設置Spark應用程序的運行方式,并基于該配置創(chuàng)建SparkContext對象。第51頁步驟3然后,使用textFile()方法從HDFS的“Student_Exam_Scores.csv”文件中讀取數據創(chuàng)建RDD。接著,執(zhí)行foreach()行動操作(后面會詳細介紹)輸出RDD中的元素,即學生信息。最后,使用stop()方法停止SparkContext對象,釋放執(zhí)行資源。第52頁步驟3frompysparkimportSparkConf,SparkContext#配置Spark應用程序conf=SparkConf().setAppName("ReadExamInformation")\.setMaster(“l(fā)ocal”)(詳見教材)【參考代碼】第53頁步驟3在PyCharm中運行代碼,控制臺顯示輸出的學生信息(篇幅原因,只截取部分輸出信息),如圖所示?!具\行結果】第54頁SparkRDD的執(zhí)行過程SparkRDD之間的依賴關系SparkRDD的創(chuàng)建任務二查詢考試成績排名前三的學生信息第56頁為評選出優(yōu)秀學生獎學金獲得者,學校需要查詢考試成績排名前三的學生信息。本任務將使用RDD的轉換操作和行動操作對學生考試成績進行排名,并輸出考試成績排名前三的學生信息。開始本任務前我們先學習一下SparkRDD操作、持久化和分區(qū)的方法。第57頁一、SparkRDD操作SparkRDD提供了一系列的操作方法,用于操作分布式數據集。RDD操作可以分為轉換(transformation)操作和行動(action)操作。第58頁一、SparkRDD操作轉換操作是指將一個RDD轉換成另一個RDD的操作,它們主要用于處理和清洗數據。常用的轉換操作有map()、filter()、flatMap()、sortBy()、union()和distinct()等,詳細說明如表所示。1.SparkRDD的轉換操作第59頁一、SparkRDD操作轉換操作說明map(func)將一個RDD中的每個元素都應用一個指定的函數func,返回一個新的RDDfilter(func)將一個RDD中的每個元素都應用一個指定的函數func,篩選出符合條件的元素,返回一個新的RDDflatMap(func)與map()類似,但flatMap()可以將每個元素映射到多個輸出結果中1.SparkRDD的轉換操作第60頁一、SparkRDD操作轉換操作說明sortBy(keyfunc,
ascending,numPartitions)按照指定規(guī)則對RDD中的元素進行排序,并返回一個新的RDD。其中,參數keyfunc表示計算鍵(key)的函數;ascending(可選)用于指定鍵的排列順序,默認值為True,即升序排列;numPartitions(可選)表示分區(qū)數,默認排序后的分區(qū)個數和排序之前的分區(qū)個數相等union(otherRDD)將兩個RDD合并為一個新的RDD,使得新的RDD包含原來兩個RDD中的所有元素distinct()去除RDD中重復的元素,返回一個新的RDD1.SparkRDD的轉換操作第61頁一、SparkRDD操作【例2-4】創(chuàng)建一個RDD,執(zhí)行map()操作,將RDD中的每個元素都加2,輸出結果如圖(a)所示。然后,執(zhí)行sortBy()操作,對RDD的所有元素進行排序,輸出結果如圖(b)所示。最后,執(zhí)行filter()操作,過濾出RDD中的偶數,輸出結果如圖(c)所示。(a)(b)(c)1.SparkRDD的轉換操作1.SparkRDD的轉換操作第62頁一、SparkRDD操作>>>sorted_numbers=add_rdd.sortBy(lambdax:x)#輸出排序后的RDD元素>>>sorted_numbers.collect()#過濾出RDD中的偶數>>>filtered_rdd=sorted_numbers.filter(lambdax:x%2==0)#輸出RDD中的偶數元素>>>filtered_rdd.collect()[hadoop@bogon~]$pyspark#創(chuàng)建一個包含數字的RDD>>>rdd=sc.parallelize([6,11,1,8,2,9,4,5])#將RDD中的每個元素加2>>>add_rdd=rdd.map(lambdax:x+2)#輸出加2后的RDD元素>>>add_rdd.collect()#排序第63頁一、SparkRDD操作1.SparkRDD的轉換操作【例2-5】讀取本地文件“/usr/local/spark/mycode/hello_spark.txt”中的數據創(chuàng)建RDD。然后,執(zhí)行flatMap()操作,以空格為分隔符將每行字符串分割成一個個單詞,輸出結果如圖示。第64頁一、SparkRDD操作1.SparkRDD的轉換操作[hadoop@bogon~]$pyspark#讀取hello_spark.txt文件中的數據創(chuàng)建RDD>>>lines=sc.textFile("file:///usr/local/spark/mycode/hello_spark.txt")#將每行字符串分割成一個個單詞>>>words_rdd=lines.flatMap(lambdaline:line.split(""))#輸出分割成單詞后的RDD元素>>>words_rdd.foreach(print)1.SparkRDD的轉換操作第65頁一、SparkRDD操作【例2-6】創(chuàng)建兩個RDD(即rdd1和rdd2),執(zhí)行union()操作合并rdd1和rdd2,輸出結果如圖(a)所示。然后,執(zhí)行distinct()操作,去除RDD中重復的元素,輸出結果如圖(b)所示。(a)(b)第66頁一、SparkRDD操作[hadoop@bogon~]$pyspark>>>rdd1=sc.parallelize(["apple","banana","orange"])>>>rdd2=sc.parallelize(["pear","grape","apple"])#合并RDD>>>rdd3=rdd1.union(rdd2)#輸出合并后的RDD元素>>>rdd3.collect()#去重>>>rdd_distinct=rdd3.distinct()#輸出去重后的RDD元素>>>rdd_distinct.collect()1.SparkRDD的轉換操作第67頁一、SparkRDD操作行動操作是指對RDD數據集進行實際計算并返回結果的操作。常用的行動操作有count()、collect()、first()、take()、reduce()和foreach()等,詳細說明如表所示。2.SparkRDD的行動操作第68頁一、SparkRDD操作行動操作說明count()返回RDD中元素的數量collect()將RDD中的所有元素收集到一個數組中,并返回該數組first()返回RDD中的第一個元素take(n)返回RDD中的前n個元素reduce(func)使用指定的函數func對RDD中的元素進行聚合計算,返回最終結果foreach(func)對RDD中的每個元素應用指定的函數func2.SparkRDD的行動操作2.SparkRDD的行動操作第69頁一、SparkRDD操作【例2-7】創(chuàng)建RDD,執(zhí)行不同的行動操作,代碼和運行結果如圖所示。第70頁二、SparkRDD持久化在迭代計算中,通常需要多次使用同一組數據。如果需要多次使用同一個RDD,則每次調用都需要執(zhí)行與該RDD相關的一系列轉換操作,這可能導致計算機消耗較大的資源。為了避免重復計算的情況,可以在Spark中設置RDD持久化。第71頁二、SparkRDD持久化RDD持久化操作是指將一個RDD標記為持久化,首次遇到行動操作觸發(fā)計算時,該RDD將會緩存在計算節(jié)點的內存中以便后續(xù)的行動操作重用。第72頁二、SparkRDD持久化持久化RDD的主要方法persist():可以以不同的存儲級別存儲持久化的RDD,PySpark中可用的存儲級別如下表所示。cache():使用默認存儲級別的快捷方法,使用該方法會調用persist(MEMORY_ONLY)。第73頁二、SparkRDD持久化存儲級別所需內存空間程度所需CPU計算時間程度是否在內存中是否在磁盤上說明MEMORY_ONLY高低是否默認級別,將RDD作為反序列化的對象存儲在JVM內存中。若內存不足,則按照一定原則替換緩存中的內容MEMORY_AND_DISK高中等部分部分將RDD作為反序列化的對象存儲在JVM內存中。若內存不足,超出的分區(qū)會被存儲在磁盤上DISK_ONLY低高否是僅將RDD分區(qū)存儲在磁盤上MEMORY_ONLY_2
MEMORY_AND_DISK_2DISK_ONLY_2————與上面級別相同,但在兩個集群節(jié)點上復制RDD分區(qū)第74頁【例2-8】使用cache()方法對RDD進行持久化,輸出結果如圖所示。二、SparkRDD持久化第75頁[hadoop@bogon~]$pyspark>>>rdd=sc.parallelize(["Hadoop","Spark","RDD"])#調用persist(MEMORY_ONLY),由于RDD還沒有計算,此時并不會緩存RDD>>>rdd.cache()#首次執(zhí)行行動操作,觸發(fā)一次從頭到尾的計算,這時cache()被執(zhí)行,把RDD放入緩存中>>>print(rdd.count())#第二次執(zhí)行行動操作,不觸發(fā)從頭到尾的計算,只需要重復使用上面緩存中的RDD>>>print(','.join(rdd.collect()))二、SparkRDD持久化第76頁三、SparkRDD分區(qū)SparkRDD分區(qū)是指將RDD分成多個分區(qū),分別保存在不同的節(jié)點上,如圖所示。進行RDD分區(qū)的主要目的是提高作業(yè)的并行度和容錯性,減少網絡傳輸開銷,優(yōu)化內存利用率,從而充分利用集群中的計算資源,加快數據處理速度。第77頁三、SparkRDD分區(qū)RDD分區(qū)的常用方法(1)創(chuàng)建RDD時進行分區(qū)(2)創(chuàng)建RDD后重分區(qū)RDD分區(qū)的原則是使分區(qū)的個數盡可能等于集群中的CPU核心數。第78頁三、SparkRDD分區(qū)創(chuàng)建RDD時,可以通過parallelize(c,numSlices)方法中的numSlices參數或textFile(name,minPartitions)的minPartitions參數指定RDD的分區(qū)數。如果沒有指定分區(qū)數,則根據當前可用的CPU核心數設置默認分區(qū)數。1.創(chuàng)建RDD時進行分區(qū)1.創(chuàng)建RDD時進行分區(qū)第79頁三、SparkRDD分區(qū)【例2-9】使用parallelize()方法分別創(chuàng)建RDD,然后分別使用getNumPartitions()方法輸出分區(qū)數,對比默認分區(qū)與手動設置分區(qū)的分區(qū)數,如圖所示。1.創(chuàng)建RDD時進行分區(qū)第80頁三、SparkRDD分區(qū)【例2-10】使用textFile()方法分別創(chuàng)建RDD,然后分別使用getNumPartitions()方法輸出分區(qū)數,對比默認分區(qū)與手動設置分區(qū)的分區(qū)數,如圖所示。第81頁三、SparkRDD分區(qū)創(chuàng)建RDD后,也可以重新對RDD進行分區(qū)。Spark提供的coalesce()和repartition()方法可以重新對任何類型的RDD進行簡單分區(qū)。coalesce()方法使用哈希分區(qū)方式減少分區(qū)數量。該方法的基本格式如下。coalesce(numPartitions,shuffle)2.創(chuàng)建RDD后重分區(qū)2.創(chuàng)建RDD后重分區(qū)第82頁三、SparkRDD分區(qū)其中,參數的含義如下。(1)numPartitions:想要減少到的分區(qū)數。(2)shuffle:表示是否需要進行數據的洗牌。當shuffle為False(默認值)時,如果重新設置的分區(qū)數大于RDD現有的分區(qū)數,則RDD的分區(qū)數不變;當shuffle為True時,如果重新設置的分區(qū)數大于RDD現有的分區(qū)數,RDD新的分區(qū)數也能重設成功。2.創(chuàng)建RDD后重分區(qū)第83頁三、SparkRDD分區(qū)repartition()方法本質上是coalesce()方法的簡單實現。該方法的基本格式如下。repartition(numPartitions)2.創(chuàng)建RDD后重分區(qū)第84頁三、SparkRDD分區(qū)【例2-11】創(chuàng)建RDD并設置分區(qū)數為2,然后使用coalesce()方法并設置不同的shuffle參數值,重新對RDD進行分區(qū),輸出結果如圖所示。2.創(chuàng)建RDD后重分區(qū)第85頁三、SparkRDD分區(qū)【例2-12】創(chuàng)建RDD并設置分區(qū)數為2,使用repartition()方法對RDD進行重分區(qū),分區(qū)數為1,輸出結果如圖所示。第86頁任務分析(1)讀取學生的考試成績創(chuàng)建RDD(2)對學生的考試成績進行排序(3)輸出排名前三的學生信息。查詢考試成績排名前三的學生信息第87頁任務分析打開PyCharm,在“rdd”目錄下新建“top_three_information.py”文件,然后在該文件中編寫應用程序,輸出考試成績排名前三的學生信息(包括學號、姓名、班級和考試成績)。實現步驟如下。查詢考試成績排名前三的學生信息點擊此處播放微課第88頁1.搭建Spark偽分布式集群環(huán)境步驟1配置Spark應用程序并創(chuàng)建SparkContext對象。步驟2執(zhí)行first()操作提取出首行數據(表頭),然后執(zhí)行filter()操作過濾掉首行數據,只保留考試成績數據。第89頁1.搭建Spark偽分布式集群環(huán)境步驟3執(zhí)行map()操作按照分割符“,”分割RDD中的數據元素,返回新的RDD。步驟4使用textFile()方法讀取HDFS文件中的考試成績數據創(chuàng)建RDD。第90頁1.搭建Spark偽分布式集群環(huán)境步驟5執(zhí)行sortBy()操作按照降序對考試成績進行排序,然后執(zhí)行take()操作獲取前三名的成績。步驟6使用print()方法輸出學生的信息。首先輸出表頭,然后循環(huán)打印前三名學生的學號、姓名、班級和考試成績。第91頁1.搭建Spark偽分布式集群環(huán)境步驟7使用stop()方法停止SparkContext對象,釋放執(zhí)行資源。frompysparkimportSparkContext,SparkConf#配置Spark應用程序conf=SparkConf().setAppName("Top3ExamScores").setMaster("local")#創(chuàng)建SparkContext對象sc=SparkContext(conf=conf)(詳見教材)【參考代碼】第92頁1.搭建Spark偽分布式集群環(huán)境步驟7【運行結果】在PyCharm中運行代碼,控制臺輸出考試成績排名前三的學生信息,如圖。第93頁SparkRDD操作SparkRDD持久化SparkRDD分區(qū)任務三計算學生的平均成績第95頁為評選出德智體美勞全面發(fā)展的三好學生,學校需要篩選出平均成績(即考試成績和操行成績的平均值)排名前三的學生。本任務將使用鍵值對RDD的轉換操作和行動操作計算學生的平均成績,為后續(xù)三好學生的評選提供數據支持。開始本任務前,我們先學習一下鍵值對RDD的創(chuàng)建和轉換操作。第96頁一、鍵值對RDD的創(chuàng)建鍵值對RDD(pairRDD)是指一種特殊類型的RDD,其中每個元素都是一個鍵值對(key,value),由一個鍵(key)和一個相應的值(value)組成。讀取的數據形式不同,創(chuàng)建鍵值對RDD的方式也不同。第97頁一、鍵值對RDD的創(chuàng)建直接創(chuàng)建鍵值對RDD。若讀取的數據形式為鍵值對,則可以使用創(chuàng)建普通RDD的方法直接創(chuàng)建鍵值對RDD。使用map()方法創(chuàng)建鍵值對RDD。若讀取的數據形式不是鍵值對,則可以先創(chuàng)建普通RDD,再將其轉換為鍵值對RDD。第98頁一、鍵值對RDD的創(chuàng)建【例2-13】直接創(chuàng)建鍵值對RDD,輸出結果如圖所示。[hadoop@bogon~]$pyspark#創(chuàng)建鍵值對RDD>>>pair_rdd=sc.parallelize([("key1",1),("key2",2),("key3",3)])#輸出鍵值對RDD的元素>>>pair_rdd.foreach(print)第99頁一、鍵值對RDD的創(chuàng)建【例2-14】使用map()方法創(chuàng)建鍵值對RDD,輸出結果如圖所示。[hadoop@bogon~]$pyspark#讀取本地文件hello_spark.txt,并創(chuàng)建名為“words”的RDD>>>words=sc.textFile("file:///usr/local/spark/mycode/hello_spark.txt")#將每行文本按空格拆分為單詞,返回一個新的RDD>>>pair_rdd=words.flatMap(lambdaline:line.split(""))#將每個單詞映射為(單詞,1)鍵值對,創(chuàng)建一個鍵值對RDD>>>pair_rdd_map=pair_rdd.map(lambdaword:(word,1))#輸出鍵值對RDD的元素>>>pair_rdd_map.foreach(print)第100頁二、鍵值對RDD的轉換操作鍵值對RDD能夠使用RDD基本的轉換操作,同時它具有其獨有的轉換操作,如keys()、values()、groupByKey()、reduceByKey()、join()等。keys()values()reduceByKey()join()groupByKey()第101頁二、鍵值對RDD的轉換操作常用的鍵值對RDD轉換操作如表所示。轉換操作說明keys()返回一個由所有鍵組成的RDDvalues()返回一個由所有值組成的RDDgroupByKey(numPartitions)根據鍵(key)對RDD中的元素進行分組,并將每個唯一鍵(key)對應的值(value)放入一個迭代器中,返回一個(K,Iterable<V>)類型的數據集第102頁二、鍵值對RDD的轉換操作轉換操作說明reduceByKey(func,numPartitions)在(K,V)鍵值對的RDD上調用時,返回一個(K,V)類型的數據集,其中每個鍵對應的值都使用給定的func函數進行聚合??梢岳斫鉃樵趃roupByKey()的基礎上,再對相同鍵的元素進行聚合操作join(otherRDD)將兩個鍵值對RDD中鍵(key)相同數據的值(value)存放在一個元組中,只返回兩個鍵值對RDD中都存在的鍵(key)的連接結果常用的鍵值對RDD轉換操作如表所示。第103頁二、鍵值對RDD的轉換操作轉換操作說明combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions,partitionFunc)針對鍵值對RDD進行求和、求平均值、計數等聚合操作,返回一個新的鍵值對RDD,其中每個鍵關聯一個聚合結果。createCombiner代表一個函數,用于將每個鍵的第一個值轉換為累加器的初始值;mergeValue代表一個函數,用于將當前鍵的值合并到相應的累加器中;mergeCombiners代表一個函數,用于合并兩個累加器;numPartitions(可選)代表分區(qū)數;partitionFunc(可選)代表一個函數,用于自定義數據分區(qū)方式sortByKey(ascending)按照鍵(key)對RDD中的元素進行排序。參數ascending的默認值為True,即升序排列mapValues(func)對RDD中的每個值應用指定的函數func,并保持鍵不變常用的鍵值對RDD轉換操作如表所示。第104頁二、鍵值對RDD的轉換操作
【例2-15】創(chuàng)建一個包含4個鍵值對的RDD,執(zhí)行keys()和values()操作分別查看RDD鍵值對的鍵和值,如圖所示。第105頁二、鍵值對RDD的轉換操作[hadoop@bogon~]$pyspark#創(chuàng)建一個鍵值對RDD>>>pairs=sc.parallelize([(1,"apple"),(2,"orange"),(3,"banana"),(4,"pear")])#獲取所有key組成的RDD>>>keys=pairs.keys()#輸出所有key>>>print(keys.collect())#獲取所有value組成的RDD>>>values=pairs.values()#輸出所有value>>>print(values.collect())第106頁二、鍵值對RDD的轉換操作【例2-16】創(chuàng)建一個包含5個鍵值對的RDD,執(zhí)行groupByKey()操作按照城市對數據進行分組,輸出結果如圖(a)所示。執(zhí)行reduceByKey()操作按照城市對數據進行分組,并對同一個城市內的人數進行累加,輸出結果如圖(b)所示。(a)(b)第107頁二、鍵值對RDD的轉換操作[hadoop@bogon~]$pyspark>>>rdd=sc.parallelize([("北京","張三"),("上海","李四"),("北京","王五"),("廣州","趙六"),("上海","錢七")])#按照城市對數據進行分組>>>grouped_rdd=rdd.groupByKey()#輸出(城市,可迭代對象)>>>grouped_rdd.foreach(print)#按照城市對數據進行分組,并對同一個城市內的人數進行累加>>>reduced_rdd=rdd.mapValues(lambdax:1).reduceByKey(lambdaa,b:a+b)#輸出(城市,人數)>>>reduced_rdd.collect()第108頁二、鍵值對RDD的轉換操作【例2-17】創(chuàng)建兩個鍵值對RDD,執(zhí)行join()操作合并兩個RDD,輸出結果如圖(a)所示。同樣,執(zhí)行l(wèi)eftOuterJoin()和rightOuterJoin()操作合并兩個RDD,輸出結果如圖(b)和圖(c)所示。(a)(b)(c)第109頁二、鍵值對RDD的轉換操作[hadoop@bogon~]$pyspark#創(chuàng)建第一個鍵值對RDD>>>rdd1=sc.parallelize([("apple",1),("orange",2),("banana",3),("pear",4)])#創(chuàng)建第二個鍵值對RDD>>>rdd2=sc.parallelize([("apple",5),("orange",6),("peach",7),("pear",8)])#執(zhí)行join()操作合并rdd1和rdd2>>>joined_rdd=rdd1.join(rdd2)#輸出合并結果>>>joined_rdd.foreach(print)(詳見教材)第110頁二、鍵值對RDD的轉換操作【例2-18】創(chuàng)建一個鍵值對RDD,然后定義三個函數執(zhí)行combineByKey()操作,得到每個鍵對應的平均值,輸出結果如圖所示。第111頁二、鍵值對RDD的轉換操作[hadoop@bogon~]$pyspark#創(chuàng)建鍵值對RDD>>>rdd=sc.parallelize([("key1",1),("key1",2),("key2",3),("key2",4),("key2",5)])#初始值為一個元組,第一個元素為該鍵的總和,第二個元素為該鍵的數量>>>createCombiner=lambdax:(x,1)#對于每個新的值,將其累加到總和中,并增加鍵的數量>>>mergeValue=lambdaacc,x:(acc[0]+x,acc[1]+1)#合并兩個鍵的結果>>>mergeCombiners=lambdaacc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])(詳見教材)第112頁任務分析計算每位學生的平均成績,首先需要篩選出考試成績和操行成績的學號和成績兩個字段,并創(chuàng)建RDD;然后將篩選后創(chuàng)建的兩個RDD進行合并,再根據學號對考試成績和操行成績進行求和;最后根據求和結果計算每位學生的平均成績。打開PyCharm,在“rdd”目錄下新建“average.py”文件,然后在該文件中編寫應用程序,輸出學號、姓名、考試成績、操行成績和平均成績。實現步驟如下。計算學生的平均成績點擊此處播放微課第113頁步驟1配置Spark應用程序并創(chuàng)建SparkContext對象。步驟2獲取考試成績RDD(exam_scores_rdd)。使用textFile()方法讀取考試成績文件中的數據創(chuàng)建RDD;然后執(zhí)行map()操作以“,”為分割符,分割每行數據;接著執(zhí)行filter()操作,過濾掉首行數據(表頭),保留其余有效的數據。第114頁步驟3創(chuàng)建考試成績鍵值對RDD(exam_scores_pairRdd)。執(zhí)行map()操作將考試成績RDD中的元素轉換為鍵值對形式,其中鍵為學號,值為考試成績,用于后續(xù)求考試成績和操行成績的平均值。第115頁步驟4創(chuàng)建學生信息鍵值對RDD(name_pairRdd)。執(zhí)行map()操作將考試成績RDD中的元素轉換為鍵值對形式,其中鍵為學號,值為姓名,用于后續(xù)輸出學生姓名。步驟5參照步驟2,對操行成績進行同樣的處理,獲取操行成績RDD(conduct_scores_rdd)。第116頁步驟6創(chuàng)建操行成績鍵值對RDD(conduct_scores_pairRdd)。執(zhí)行map()操作將操行成績RDD中的元素轉換為鍵值對形式,其中鍵為學號,值為操行成績,用于后續(xù)求考試成績和操行成績的平均值。第117頁步驟7獲取合并后的成績鍵值對RDD(merged_scores_pairRDD)。執(zhí)行union()操作將考試成績鍵值對RDD和操行成績鍵值對RDD合并為一個新的鍵值對RDD。步驟8定義combineByKey()操作所需的三個函數createCombiner、mergeValue、mergeCombiners,用于計算每位學生的考試成績和操行成績的和。第118頁步驟9獲取求和后的成績鍵值對RDD(sum_scores_pairRdd)。執(zhí)行combineByKey()操作,以學號作為鍵對合并后的成績鍵值對RDD執(zhí)行聚合操作,求學生考試成績和操行成績的和。步驟10獲取平均成績鍵值對RDD(average_scores_pairRDD)。執(zhí)行map()操作計算學生的平均成績。第119頁步驟11獲取考試成績和操行成績并轉換為字典格式。執(zhí)行collect()操作獲取考試成績和操行成績,然后使用dict()函數將它們轉換為字典格式,用于后續(xù)輸出考試成績和操行成績。第120頁步驟12獲取結果RDD(result_rdd)。執(zhí)行join()操作連接平均成績鍵值對RDD和學生信息鍵值對RDD,連接的鍵為學號;然后執(zhí)行map()操作將連接后的結果映射為最終輸出結果的格式,包括學號、姓名、考試成績、操行成績和平均成績。第121頁步驟13打印輸出學生平均成績。使用print()方法輸出表頭;然后執(zhí)行collect()操作獲取結果RDD中的數據,并通過循環(huán)遍歷輸出每條數據的學號、姓名、考試成績、操行成績和平均成績。第122頁步驟13frompysparkimportSparkConf,SparkContext#創(chuàng)建Spark應用程序conf=SparkConf().setAppName("averageCore").setMaster("local")#創(chuàng)建SparkContext對象sc=SparkContext(conf=conf)#獲取考試成績RDD(exam_scores_rdd)exam_scores_rdd=sc.textFile("/Student_Exam_Scores.csv")\.map(lambdaline:line.split(","))\(詳見教材)【參考代碼】第123頁步驟13在PyCharm中運行代碼,控制臺顯示輸出的學生平均成績(篇幅原因,只截取部分輸出信息),如圖所示?!具\行結果】第124頁鍵值對RDD的創(chuàng)建鍵值對RDD的轉換操作任務四存儲歸納后的學生成績數據第126頁學生成績是學校了解學生情況的重要依據,學校需要存儲歸納后的學生成績數據,以便后期查閱。本任務計劃將歸納后的學生成績RDD保存為CSV文件。開始本任務前,我們先學習一下Spark中常見的文件格式和將RDD保存為特定文件的方法。第127頁一、Spark中常見的文件格式Spark中常見的文件格式有普通的文本文件、JSON文件、CSV文件等。文本文件是指以txt為擴展名的文件,比較常見,此處不再贅述。第128頁一、Spark中常見的文件格式JSON(javascriptobjectnotation)是一種輕量級、文本格式的數據交換標準,它是基于JavaScript的一個子集,適用于多種不同語言。①JSON文件以“{}”形式開始和結束,表示一個對象。②在對象內部,每個鍵值對內部使用“:”分隔。鍵必須是字符串,值可以是任何有效的JSON類型。1.JSON文件第129頁一、Spark中常見的文件格式③鍵值對之間使用“,”分隔。④數組用“[]”表示,并且數組元素之間使用“,”分隔。⑤文本字符串必須使用雙引號包圍,不支持單引號。1.JSON文件第130頁一、Spark中常見的文件格式JSON文件采用鍵值對的方式描述數據,其中key為字符串類型,value可以是數字、字符串、布爾值、數組、對象等類型。JSON文件數據的書寫格式如下,示例如圖所示。1.JSON文件2.CSV文件第131頁一、Spark中常見的文件格式CSV是一種常用的電子表格文件格式。CSV文件以“,”為分隔符將數據值組織成行和列的形式,其中每行表示一條記錄,如圖所示。第132頁二、將RDD保存為特定文件使用Spark提供的方法可以讀取多種文件中的數據,以便進行必要的數據處理和分析。在完成數據分析后,可以將分析結果存儲至特定文件中,以便后續(xù)對現有分析結果進行進一步的處理與分析。第133頁二、將RDD保存為特定文件在Spark中,使用textFile()方法可以讀取多種文件創(chuàng)建RDD。該方法前面已經介紹過,此處不再贅述。使用saveAsTextFile()方法可以將RDD保存為特定文件,即將RDD中的數據存儲至特定文件中。該方法的基本格式如下。saveAsTextFile(path,compressionCodecClass)第134頁二、將RDD保存為特定文件其中,參數的含義如下。(1)path:文件保存的目錄地址。要求該目錄地址事先不存在;如果存在,運行代碼時Spark就會報錯。(2)compressionCodecClass(可選):壓縮編解碼器類的完全限定類名(如press.GzipCodec)。第135頁二、將RDD保存為特定文件【例2-19】文本文件的讀取與存儲?,F有“/usr/local/saprk/mycode/hello_spark.txt”文件,內容如圖所示。第136頁二、將RDD保存為特定文件使用textFile()方法讀取“hello_spark.txt”文件創(chuàng)建RDD,執(zhí)行foreach()操作輸出RDD的元素,如圖所示。第137頁二、將RDD保存為特定文件使用repartition()方法將分區(qū)設置為1。使用saveAsTextFile()方法將RDD保存為文本文件,生成一個“output.txt”目錄(見圖),目錄中包含“part-00000”和“_SUCCESS”文件,其中“part-00000”文件用于存儲RDD的元素。第138頁二、將RDD保存為特定文件[hadoop@bogon~]$pyspark#讀取本地文件hello_spark.txt創(chuàng)建名為“words”的RDD>>>words=sc.textFile("file:///usr/local/spark/mycode/hello_spark.txt")#輸出RDD的元素>>>words.foreach(print)#分區(qū)設置為1,使用saveAsTextFile()方法將RDD保存為文本文件>>>words.repartition(1)\.saveAsTextFile("file:///usr/local/spark/mycode/output.txt")第139頁二、將RDD保存為特定文件【例2-20】JSON文件的讀取與存儲。在“/usr/local/spark/mycode”目錄下,創(chuàng)建一個“example.json”文件,生成的文件內容如圖所示。第140頁二、將RDD保存為特定文件[hadoop@bogon~]$echo'{"name":"Alice","age":25}{"name":"Bob","age":30}{"name":"Charlie","age":35}'>/usr/local/spark/mycode/example.json第141頁二、將RDD保存為特定文件讀取“example.json”文件創(chuàng)建RDD。然后,執(zhí)行map()操作解析RDD中的JSON字符串元素并將其轉換為Python字典,輸出RDD的元素,如圖所示。第142頁二、將RDD保存為特定文件[hadoop@bogon~]$pyspark#導入json庫>>>importjson#讀取文件創(chuàng)建RDD>>>json_rdd=sc.textFile("file:///usr/local/spark/mycode/example.json")(詳見教材)第143頁二、將RDD保存為特定文件【例2-21】CSV文件的讀取與存儲。讀取“id_card.csv”文件中的數據創(chuàng)建RDD并執(zhí)行map()操作分割RDD元素,輸出RDD的元素,如圖所示。第144頁二、將RDD保存為特定文件執(zhí)行map()操作將RDD元素轉換為CSV格式的字符串。最后,使用saveAsTextFile()方法將RDD保存為CSV文件,與文本文件類似,生成一個“output.csv”目錄,該目錄中包含“part-00000”和“_SUCCESS”文件。第145頁二、將RDD保存為特定文件[hadoop@bogon~]$pyspark>>>id_card="file:///usr/local/spark/mycode/id_card.csv"#讀取文件創(chuàng)建RDD并分割RDD元素>>>id_card_rdd=sc.textFile(id_card)\.map(lambdaline:line.split(','))>>>id_card_rdd.foreach(print)(詳見教材)第146頁二、將RDD保存為特定文件[hadoop@bogon~]$pyspark>>>id_card="file:///usr/local/spark/mycode/id_card.csv"#讀取文件創(chuàng)建RDD并分割RDD元素>>>id_card_rdd=sc.textFile(id_card)\.map(lambdaline:line.split(','))>>>id_card_rdd.foreach(print)(詳見教材)第147頁任務分析將學號、姓名、考試成績、操行成績和平均成績保存到CSV文件中,需要先獲取任務三任務實施中的結果RDD(result_rdd),然后直接將該RDD保存為CSV文件即可。存儲歸納后的學生成績數據點擊此處播放微課第148頁步驟1導入任務三任務實施中的average.py文件,以便后續(xù)調用文件中的對象或RDD等。打開PyCharm,在“rdd”目錄下新建“output_result.py”文件,然后在該文件中編寫應用程序,將歸納后的學生成績數據保存為CSV文件。實現步驟如下。第149頁步驟2配置Spark應用程序。使用average.sc獲取已經存在的SparkContext對象。步驟3從average.py文件中獲取名為result_rdd的RDD,即待保存的數據。第150頁步驟4執(zhí)行map()操作將每個RDD元素格式化成CSV字符串形式,其中x[0]、x[1]、x[2]、x[3]和x[4]表示每個元素的具體字段;然后執(zhí)行coalesce()操作將RDD的分區(qū)數設置為1,確保結果保存為單個文件;最后使用saveAsTextFile()方法將轉換后的RDD保存為CSV文件。第151頁步驟5打印CSV文件保存成功的提示消息。frompysparkimportSparkConf#導入任務三任務實施中的average.py文件importaverage#配置Spark應用程序conf=SparkConf().setAppName("savescores").setMaster("local")(詳見教材)第152頁步驟5#使用average.sc獲取已經存在的SparkContext對象sc=average.sc#獲取average.py文件中的result_rddresult_rdd=average.result_rdd#將RDD保存為CSV文件result_rdd.map(lambdax:"{},{},{},{},{}".format(x[0],x[1],x[2],x[3],x[4]))\.coalesce(1)\.saveAsTextFile("file:///usr/local/spark/mycode/rdd/result.csv")print("CSV文件保存成功!")第153頁步驟5【運行結果】
在PyCharm中運行代碼,控制臺顯示提示信息,如圖a所示。在“/usr/local/spark/mycode”目錄下生成“result.csv”目錄,生成的目錄中包含“part-00000”和“_SUCCESS”文件圖b,其中“part-00000”文件用于存儲歸納后的學生成績數據,內容如圖c所示。(a)(b)(c)第154頁Spark中常見的文件格式將RDD保存為特定文件第155頁(1)熟練掌握RDD的創(chuàng)建。1.實訓目標(2)熟練掌握RDD的轉換操作和行動操作(3)熟練掌握鍵值對RDD的轉換操作(4)熟練掌握不同類型文件的讀取與存儲第156頁2.實訓內容現有3張表格,分別為違章記錄表(records.csv),用于記錄違章詳情,包含的字段如表所示。字段名稱說明字段名稱說明違章日期車主交通違章的日期車牌號違章車輛的車牌號違章代碼包括X01、X02等——字段名稱說明字段名稱說明車牌號車牌號信息電話車主電話姓名車主姓名——第157頁2.實訓內容車主信息表(owner.csv),用于記錄車主信息,包含的字段如表所示。字段名稱說明字段名稱說明違章代碼包括X01、X02等罰款金額違章對應的罰款金額扣分數違章對應的扣分數違章名稱包括超速、闖紅燈等第158頁2.實訓內容違章條目對照表(violation.csv),用于記錄違章明細,包含的字段如表所示。第159頁2.實訓內容請使用SparkRDD分析交通違章情況。(1)查詢違章對應的扣分數最多的違章條目,輸出違章代碼、扣分數、罰款金額和違章名稱。(2)查詢違章1次以上的車輛,輸出車牌號和違章次數。第160頁2.實訓內容請使用SparkRDD分析交通違章情況。(3)查詢違章車牌號為“粵A1**45”的違章信息,并將歸納后的違章信息保存為CSV文件,包括車牌號、違章日期、違章代碼、車主姓名、車主電話。第161頁3.實訓提示打開PyCharm,新建“car”目錄,并在該目錄下新建不同的Python文件分析交通違章情況。第162頁3.實訓提示(1)新建“violation.py”文件,然后在該文件中編寫應用程序,查詢扣分數最多的違章條目,輸出違章代碼、扣分數、罰款金額和違章名稱。①讀取違章條目對照
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025-2030淺析智能倉儲管理軟件精細化運營策略分析擴展檢測
- 2025-2030洗護用品母嬰消費群體細分與高端市場培育首選方案
- 2025-2030汽車零部件配套協同產業(yè)鏈技術升級研究投資適配方案指導
- 2025年滕州市招聘教師考試真題
- 公司節(jié)能減排措施實施方案
- 英語四六級考試報名流程試題及答案
- 高效團隊建設與管理實戰(zhàn)方案
- 叉車司機實操技能水平測驗試卷及答案
- 科技型中小企業(yè)融資方案匯編
- 醫(yī)學課程教學方案及案例分析
- TOC基本課程講義學員版-王仕斌
- T-GDWCA 0035-2018 HDMI 連接線標準規(guī)范
- 面板堆石壩面板滑模結構設計
- 初中語文新課程標準與解讀課件
- 無人機裝調檢修工培訓計劃及大綱
- 中建通風與空調施工方案
- 高考語言運用題型之長短句變換 學案(含答案)
- 春よ、來い(春天來了)高木綾子演奏長笛曲譜鋼琴伴奏
- ARJ21機型理論知識考試題庫(匯總版)
- 2023年婁底市建設系統事業(yè)單位招聘考試筆試模擬試題及答案解析
- GB/T 4623-2014環(huán)形混凝土電桿
評論
0/150
提交評論