python3線程池ThreadPoolExecutor處理csv文件數(shù)據(jù)_第1頁
python3線程池ThreadPoolExecutor處理csv文件數(shù)據(jù)_第2頁
python3線程池ThreadPoolExecutor處理csv文件數(shù)據(jù)_第3頁
python3線程池ThreadPoolExecutor處理csv文件數(shù)據(jù)_第4頁
python3線程池ThreadPoolExecutor處理csv文件數(shù)據(jù)_第5頁
全文預(yù)覽已結(jié)束

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第python3線程池ThreadPoolExecutor處理csv文件數(shù)據(jù)目錄背景知識點拓展庫流程實現(xiàn)代碼解釋

背景

由于不同乙方對服務(wù)商業(yè)務(wù)接口字段理解不一致,導(dǎo)致線上上千萬數(shù)據(jù)量數(shù)據(jù)存在問題,為了修復(fù)數(shù)據(jù),通過Python腳本進行修改

知識點

Python3、線程池、pymysql、CSV文件操作、requests

拓展

當我們程序在使用到線程、進程或協(xié)程的時候,以下三個知識點可以先做個基本認知

CPU密集型、IO密集型、GIL全局解釋器鎖

pip3installrequests

pip3installpymysql

流程

實現(xiàn)代碼

#-*-coding:utf-8-*-

#@FileName:grade_update.py

#@Desc:在一臺超級計算機上運行過的牛逼Python代碼

importtime

fromconcurrent.futuresimportThreadPoolExecutor,FIRST_COMPLETED,wait

importrequests

importpymysql

fromprojectPathimportpath

gradeId=[4303,4304,1000926,1000927]

defwrit_mysql():

數(shù)據(jù)庫連接

returnpymysql.connect(host="localhost",

port=3306,

user="admin",

password="admin",

database="test"

defoprationdb(grade_id,member_id):

操作數(shù)據(jù)庫

db=writ_mysql()

try:

cursor=db.cursor()

sql=f"UPDATE`t_m_member_grade`SET`current_grade_id`={grade_id},`modified`=now()WHERE`member_id`={member_id};"

cursor.execute(sql)

mit()

print(f"提交的SQL-{sql}")

exceptpymysql.Errorase:

db.rollback()

print("DB數(shù)據(jù)庫異常:",e)

db.close()

returnTrue

definterface(rows,thead):

調(diào)用第三方接口

print(f"處理數(shù)據(jù)行數(shù)---{thead}----數(shù)據(jù)---{rows}")

try:

url="http://xxxx/api/xxx-data/Tmall/bindQuery"

body={

"nickname":str(rows[0]),

"seller_name":"test",

"mobile":"111"

heade={"Content-Type":"application/x-www-form-urlencoded"}

res=requests.post(url=url,data=body,headers=heade)

result=res.json()

ifresult["data"]["status"]in[1,2]:

grade=result["data"]["member"]["level"]

grade_id=gradeId[grade]

oprationdb(grade_id=grade_id,member_id=rows[1])

returnTrue

returnTrue

exceptExceptionase:

print(f"調(diào)用異常:{e}")

defread_csv():

importcsv

#db=writ_mysql()

#線程數(shù)

MAX_WORKERS=5

withThreadPoolExecutor(MAX_WORKERS)aspool:

withopen(path+'/file/result2_colu.csv','r',newline='',encoding='utf-8')asf:

#set()函數(shù)創(chuàng)建無序不重復(fù)元素集

seq_notdone=set()

seq_done=set()

#使用csv的reader()方法,創(chuàng)建一個reader對象

reader=csv.reader(f)

n=0

forrowinreader:

n+=1

#遍歷reader對象的每一行

try:

seq_notdone.add(pool.submit(interface,rows=row,thead=n))

iflen(seq_notdone)=MAX_WORKERS:

#FIRST_COMPLETED文檔說明--Returnwhenanyfuturefinishesoriscancelled.

done,seq_notdone=wait(seq_notdone,return_when=FIRST_COMPLETED)

seq_done.update(done)

exceptExceptionase:

print(f"解析結(jié)果出錯:{e}")

#db.close()

return"完成"

if__name__=='__main__':

read_csv()

解釋

引入線程池庫

fromconcurrent.futuresimportThreadPoolExecutor,FIRST_COMPLETED,wait

pool.submit(interface,rows=row,thead=n)

提交任務(wù),interface調(diào)用的函數(shù),rows、thead為interface()函數(shù)的入?yún)?/p>

任務(wù)持續(xù)提交,線程池通過MAX_WORKERS定義的線程數(shù)持續(xù)消費

說明像這種I

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論