pyspark自定義UDAF函數(shù)調(diào)用報(bào)錯問題解決_第1頁
pyspark自定義UDAF函數(shù)調(diào)用報(bào)錯問題解決_第2頁
pyspark自定義UDAF函數(shù)調(diào)用報(bào)錯問題解決_第3頁
pyspark自定義UDAF函數(shù)調(diào)用報(bào)錯問題解決_第4頁
pyspark自定義UDAF函數(shù)調(diào)用報(bào)錯問題解決_第5頁
已閱讀5頁,還剩1頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

第pyspark自定義UDAF函數(shù)調(diào)用報(bào)錯問題解決目錄問題場景:問題描述原因分析及解決方案:

問題場景:

在SparkSQL中,因?yàn)樾枰玫阶远x的UDAF函數(shù),所以用pyspark自定義了一個(gè),但是遇到了一個(gè)問題,就是自定義的UDAF函數(shù)一直報(bào)

AttributeError:'NoneType'objecthasnoattribute'_jvm'

在此將解決過程記錄下來

問題描述

在新建的py文件中,先自定義了一個(gè)UDAF函數(shù),然后在if__name__==__main__:中調(diào)用,死活跑不起來,一遍又一遍的對源碼,看起來自定義的函數(shù)也沒錯:過程如下:

importdecimal

importos

importpandasaspd

frompyspark.sqlimportSparkSession

frompyspark.sqlimportfunctionsasF

os.environ['SPARK_HOME']='/export/server/spark'

os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"

os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"

@F.pandas_udf('decimal(17,12)')

defudaf_lx(qx:pd.Series,lx:pd.Series)-decimal:

#初始值也一定是decimal類型

tmp_qx=decimal.Decimal(0)

tmp_lx=decimal.Decimal(0)

forindexinrange(0,qx.size):

ifindex==0:

tmp_qx=decimal.Decimal(qx[index])

tmp_lx=decimal.Decimal(lx[index])

else:

#計(jì)算lx:計(jì)算后,保證數(shù)據(jù)小數(shù)位為12位,與返回類型的設(shè)置小數(shù)位保持一致

tmp_lx=(tmp_lx*(1-tmp_qx)).quantize(decimal.Decimal('0.000000000000'))

tmp_qx=decimal.Decimal(qx[index])

returntmp_lx

if__name__=='__main__':

#1)創(chuàng)建SparkSession對象,此對象連接hive

spark=SparkSession.builder.master('local[*]')\

.appName('insurance_main')\

.config('spark.sql.shuffle.partitions',4)\

.config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\

.config('hive.metastore.uris','thrift://node1:9083')\

.enableHiveSupport()\

.getOrCreate()

#注冊UDAF支持在SQL中使用

spark.udf.register('udaf_lx',udaf_lx)

#2)編寫SQL執(zhí)行

excuteSQLFile(spark,'_04_insurance_dw_prem_std.sql')

然后跑起來就報(bào)了以下錯誤:

Traceback(mostrecentcalllast):

File"/root/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py",line835,in_parse_datatype_string

returnfrom_ddl_datatype(s)

File"/root/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py",line827,infrom_ddl_datatype

sc._.apache.spark.sql.api.python.PythonSQLUtils.parseDataType(type_str).json())

AttributeError:'NoneType'objecthasnoattribute'_jvm'

Duringhandlingoftheaboveexception,anotherexceptionoccurred:

Traceback(mostrecentcalllast):

File"/root/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py",line839,in_parse_datatype_string

returnfrom_ddl_datatype("struct%s"%s.strip())

File"/root/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py",line827,infrom_ddl_datatype

sc._.apache.spark.sql.api.python.PythonSQLUtils.parseDataType(type_str).json())

AttributeError:'NoneType'objecthasnoattribute'_jvm'

Duringhandlingoftheaboveexception,anotherexceptionoccurred:

Traceback(mostrecentcalllast):

File"/root/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py",line841,in_parse_datatype_string

raisee

File"/root/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py",line831,in_parse_datatype_string

returnfrom_ddl_schema(s)

File"/root/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py",line823,infrom_ddl_schema

sc._.apache.spark.sql.types.StructType.fromDDL(type_str).json())

AttributeError:'NoneType'objecthasnoattribute'_jvm'

我左思右想,百思不得騎姐,嗐,跑去看types.py里面的type類型,以為我的udaf_lx函數(shù)的裝飾器里面的decimal(17,12)類型錯了,但是一看,好家伙,types.py里面的774行

_FIXED_DECIMAL=pile(r"decimal\(\s*(\d+)\s*,\s*(-\d+)\s*\)")

這是能匹配上的,沒道理??!

原因分析及解決方案:

然后再往回看報(bào)錯的信息的最后一行:

AttributeError:'NoneType'objecthasnoattribute'_jvm'

竟然是空對象沒有_jvm這個(gè)屬性!

一拍腦瓜子,得了,pyspark的SQL在執(zhí)行的時(shí)候,需要用到JVM,而運(yùn)行pyspark的時(shí)候,需要先要為spark提供環(huán)境,也就說,內(nèi)存中要有SparkSession對象,而python在執(zhí)行的時(shí)候,是從上往下,將方法加載到內(nèi)存中,在加載自定義的UDAF函數(shù)時(shí),由于有裝飾器@F.pandas_udf的存在,F則是pyspark.sql.functions,此時(shí)加載自定義的UDAF到內(nèi)存中,需要有SparkSession的環(huán)境提供JVM,而此時(shí)的內(nèi)存中尚未有SparkSession環(huán)境!因此,將自定義的UDAF函數(shù)挪到if__name__==__main__:創(chuàng)建完SparkSession的后面,如下:

importdecimal

importos

importpandasaspd

frompyspark.sqlimportSparkSession

frompyspark.sqlimportfunctionsasF

os.environ['SPARK_HOME']='/export/server/spark'

os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"

os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"

if__name__=='__main__':

#1)創(chuàng)建SparkSession對象,此對象連接hive

spark=SparkSession.builder.master('local[*]')\

.appName('insurance_main')\

.config('spark.sql.shuffle.partitions',4)\

.config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\

.config('hive.metastore.uris','thrift://node1:9083')\

.enableHiveSupport()\

.getOrCreate()

@F.pandas_udf('decimal(17,12)')

defudaf_lx(qx:pd.Series,lx:pd.Series)-decimal:

#初始值也一定是decimal類型

tmp_qx=decimal.Decimal(0)

tmp_lx=decimal.Decimal(0)

forindexinrange(0,qx.size):

ifindex==0:

tmp_qx=decimal.Decimal(qx[index])

tmp_lx=decimal.Decimal(lx[index])

else:

#計(jì)算lx:計(jì)算后,保證數(shù)據(jù)小數(shù)位為12位,與返回類型的設(shè)置小數(shù)位保持一致

tmp_lx=(tmp_lx*(1-tmp_qx)).quantize(decimal.Decimal('0.000000000000'))

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論