版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第如何使用Python讀取Hive數(shù)據(jù)庫(kù)?port=21051,
database=ur_AI_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None
self.host=host
self.port=port
self.database=database
self.auth_mechanism=auth_mechanism
self.user=user
self.password=password
self.logger=logger
self.impala_conn=None
self.conn=None
self.cursor=None
self.engine=None
self.session=None
defcreate_table_code(self,file_name):
創(chuàng)建表類(lèi)代碼
os.system(fsqlacodegen{self.connection_str}{file_name})
returnself.conn
defget_conn(self):
創(chuàng)建連接或獲取連接
ifself.connisNone:
engine=self.get_engine()
self.conn=engine.connect()
returnself.conn
defget_impala_conn(self):
創(chuàng)建連接或獲取連接
ifself.impala_connisNone:
self.impala_conn=connect(
host=self.host,
port=self.port,
database=self.database,
auth_mechanism=self.auth_mechanism,
user=self.user,
password=self.password
returnself.impala_conn
defget_engine(self):
創(chuàng)建連接或獲取連接
ifself.engineisNone:
self.engine=sqlalchemy.create_engine(impala://,creator=self.get_impala_conn)
returnself.engine
defget_cursor(self):
創(chuàng)建連接或獲取連接
ifself.cursorisNone:
self.cursor=self.conn.cursor()
returnself.cursor
defget_session(self)-sessionmaker:
創(chuàng)建連接或獲取連接
ifself.sessionisNone:
engine=self.get_engine()
Session=sessionmaker(bind=engine)
self.session=Session()
returnself.session
defclose_conn(self):
關(guān)閉連接
ifself.connisnotNone:
self.conn.close()
self.conn=None
self.dispose_engine()
self.close_impala_conn()
defclose_impala_conn(self):
關(guān)閉impala連接
ifself.impala_connisnotNone:
self.impala_conn.close()
self.impala_conn=None
defclose_session(self):
關(guān)閉連接
ifself.sessionisnotNone:
self.session.close()
self.session=None
self.dispose_engine()
defdispose_engine(self):
釋放engine
ifself.engineisnotNone:
#self.engine.dispose(close=False)
self.engine.dispose()
self.engine=None
defclose_cursor(self):
關(guān)閉cursor
ifself.cursorisnotNone:
self.cursor.close()
self.cursor=None
defget_data(self,sql,auto_close=True)-pd.DataFrame:
查詢(xún)數(shù)據(jù)
conn=self.get_conn()
data=None
try:
#異常重試3次
foriinrange(3):
try:
data=pd.read_sql(sql,conn)
break
exceptExceptionasex:
ifi==2:
raiseex#往外拋出異常
time.sleep(60)#一分鐘后重試
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
ifauto_close:
self.close_conn()
returndata
classVarsHelper():
def__init__(self,save_dir,auto_save=True):
self.save_dir=save_dir
self.auto_save=auto_save
self.values={}
ifnotos.path.exists(os.path.dirname(self.save_dir)):
os.makedirs(os.path.dirname(self.save_dir))
ifos.path.exists(self.save_dir):
withopen(self.save_dir,rb)asf:
self.values=pickle.load(f)
f.close()
defset_value(self,key,value):
self.values[key]=value
ifself.auto_save:
self.save_file()
defget_value(self,key):
returnself.values[key]
defhas_key(self,key):
returnkeyinself.values.keys()
defsave_file(self):
withopen(self.save_dir,wb)asf:
pickle.dump(self.values,f)
f.close()
classGlobalShareArgs():
args={
debug:False
defget_args():
returnGlobalShareArgs.args
defset_args(args):
GlobalShareArgs.args=args
defset_args_value(key,value):
GlobalShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnGlobalShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinGlobalShareArgs.args.keys()
defupdate(args):
GlobalShareArgs.args.update(args)
classShareArgs():
args={
labels_dir:./hjx/shop_group/month_w_amt/data/labels,#標(biāo)簽?zāi)夸?/p>
labels_output_dir:./hjx/shop_group/month_w_amt/data/labels_output,#聚類(lèi)導(dǎo)出標(biāo)簽?zāi)夸?/p>
common_datas_dir:./hjx/data,#共用數(shù)據(jù)目錄。ur_bi_dw的公共
only_predict:False,#只識(shí)別,不訓(xùn)練
delete_model:True,#先刪除模型,僅在訓(xùn)練時(shí)使用
export_excel:False,#導(dǎo)出excel
classes:12,#聚類(lèi)數(shù)
batch_size:16,
hidden_size:32,
max_nrof_epochs:100,
learning_rate:0.0005,
loss_type:categorical_crossentropy,
avg_model_num:10,
steps_per_epoch:4.0,#4.0
lr_callback_patience:4,
lr_callback_cooldown:1,
early_stopping_callback_patience:6,
get_data:True,
defget_args():
returnShareArgs.args
defset_args(args):
ShareArgs.args=args
defset_args_value(key,value):
ShareArgs.args[key]=value
defget_args_value(key,default_value=None):
returnShareArgs.args.get(key,default_value)
defcontain_key(key):
returnkeyinShareArgs.args.keys()
defupdate(args):
ShareArgs.args.update(args)
classUrBiGetDatasBase():
#線程鎖列表,同保存路徑共用鎖
lock_dict:Dict[str,threading.Lock]={}
#時(shí)間列表,用于判斷是否超時(shí)
time_dict:Dict[str,datetime.datetime]={}
#用于記錄是否需要更新超時(shí)時(shí)間
get_data_timeout_dict:Dict[str,bool]={}
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=None,
logger:logging.Logger=None,
self.save_dir=save_dir
self.logger=logger
self.db_helper=HiveHelper(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
#創(chuàng)建子目錄
ifself.save_dirisnotNoneandnotos.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.vars_helper=None
ifGlobalShareArgs.get_args_value(debug):
self.vars_helper=VarsHelper(./hjx/data/vars/UrBiGetDatas)
defclose(self):
關(guān)閉連接
self.db_helper.close_conn()
defget_last_time(self,key_name)-bool:
獲取是否超時(shí)
#轉(zhuǎn)靜態(tài)路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifself.vars_helperisnotNoneandself.vars_helper.has_key(UrBiGetDatasBase.time_list):
UrBiGetDatasBase.time_dict=self.vars_helper.get_value(UrBiGetDatasBase.time_list)
timeout=12#12小時(shí)
ifGlobalShareArgs.get_args_value(debug):
timeout=24#24小時(shí)
get_data_timeout=False
ifkey_namenotinUrBiGetDatasBase.time_dict.keys()or(datetime.datetime.today()-UrBiGetDatasBase.time_dict[key_name]).total_seconds()(timeout*60*60):
(超時(shí)%d小時(shí),重新查數(shù)據(jù):%s,timeout,key_name)
#UrBiGetDatasBase.time_list[key_name]=datetime.datetime.today()
get_data_timeout=True
else:
(未超時(shí)%d小時(shí),跳過(guò)查數(shù)據(jù):%s,timeout,key_name)
#ifself.vars_helperisnotNone:
#self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_list)
UrBiGetDatasBase.get_data_timeout_dict[key_name]=get_data_timeout
returnget_data_timeout
defsave_last_time(self,key_name):
更新?tīng)顟B(tài)超時(shí)
#轉(zhuǎn)靜態(tài)路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifUrBiGetDatasBase.get_data_timeout_dict[key_name]:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
ifself.vars_helperisnotNone:
UrBiGetDatasBase.time_dict[key_name]=datetime.datetime.today()
self.vars_helper.set_value(UrBiGetDatasBase.time_list,UrBiGetDatasBase.time_dict)
defget_lock(self,key_name)-threading.Lock:
獲取鎖
#轉(zhuǎn)靜態(tài)路徑,確保唯一性
key_name=os.path.abspath(key_name)
ifkey_namenotinUrBiGetDatasBase.lock_dict.keys():
UrBiGetDatasBase.lock_dict[key_name]=threading.Lock()
returnUrBiGetDatasBase.lock_dict[key_name]
defget_data_of_date(
self,
save_dir,
sql,
sort_columns:List[str],
del_index_list=[-1],#刪除最后下標(biāo)
start_date=datetime.datetime(2017,1,1),#開(kāi)始時(shí)間
offset=relativedelta(months=3),#時(shí)間間隔
date_format_fun=lambdad:%04d%02d01%(d.year,d.month),#查詢(xún)語(yǔ)句中替代時(shí)間參數(shù)的格式化
filename_format_fun=lambdad:%04d%02d.csv%(d.year,d.month),#查詢(xún)語(yǔ)句中替代時(shí)間參數(shù)的格式化
stop_date=20700101,#超過(guò)時(shí)間則停止
data_format_fun=None,#格式化數(shù)據(jù)
分時(shí)間增量讀取數(shù)據(jù)
#創(chuàng)建文件夾
ifnotos.path.exists(save_dir):
os.makedirs(save_dir)
else:
#刪除最后一個(gè)文件
file_list=os.listdir(save_dir)
iflen(file_list)0:
file_list.sort()
fordel_indexindel_index_list:
os.remove(os.path.join(save_dir,file_list[del_index]))
print(刪除最后一個(gè)文件:,file_list[del_index])
select_index=-1
#start_date=datetime.datetime(2017,1,1)
whileTrue:
end_date=start_date+offset
start_date_str=date_format_fun(start_date)
end_date_str=date_format_fun(end_date)
(date:%s-%s,start_date_str,end_date_str)
file_path=os.path.join(save_dir,filename_format_fun(start_date))
#(file_path:%s,file_path)
ifnotos.path.exists(file_path):
data:pd.DataFrame=self.db_helper.get_data(sql%(start_date_str,end_date_str))
ifdataisNone:
break
(data:%d,len(data))
#(data:%d,data.columns)
iflen(data)0:
select_index+=1
ifdata_format_funisnotNone:
data=data_format_fun(data)
#排序
data=data.sort_values(sort_columns)
data.to_csv(file_path)
elifselect_index!=-1:
break
elifstop_datestart_date_str:
raiseException(讀取數(shù)據(jù)異常,時(shí)間超出最大值!)
start_date=end_date
classUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_dw_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_dim_date(self):
日期數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_date.date_key])
data.to_csv(file_path)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_shop(self):
店鋪數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_shop.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_shop
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_shop.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_shop.shop_no])
data.to_csv(file_path)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_vip(self):
會(huì)員數(shù)據(jù)
sub_dir=os.path.join(self.save_dir,vip_no)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=SELECTdv.*,dd.date_key,dd.date_name2
FROMur_bi_dw.dim_vipasdv
INNERJOINur_bi_dw.dim_dateasdd
ONdv.card_create_date=dd.date_name2
wheredd.date_key=%s
anddd.date_key%s
#data:pd.DataFrame=self.db_helper.get_data(sql)
sort_columns=[dv.vip_no]
#TODO:
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開(kāi)始時(shí)間
offset=relativedelta(years=1)
#更新超時(shí)時(shí)間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_weather(self):
天氣數(shù)據(jù)
sub_dir=os.path.join(self.save_dir,weather)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=
selectweather.*fromur_bi_ods.ods_base_weather_data_1200asweather
whereweather.date_key=%sandweather.date_key%s
sort_columns=[weather.date_key,weather.areaid]
defdata_format_fun(data):
columns=list(data.columns)
columns={c:weather.+cforcincolumns}
data=data.rename(columns=columns)
returndata
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
del_index_list=[-2,-1],#刪除最后下標(biāo)
data_format_fun=data_format_fun,
#更新超時(shí)時(shí)間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_weather_city(self):
天氣城市數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.weather_city.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_weather_cityasweather_city
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:weather_city.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods(self):
貨品數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.dim_goods
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods.+cforcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_market_shop_date(self):
店鋪商品生命周期數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_shop_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=
selectshop_no,sku_no,shop_market_date,lifecycle_end_date,lifecycle_days
FROMur_bi_dw.dim_goods_market_shop_date
wherelifecycle_end_dateisnotnull
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(lifecycle_end_date.,)forcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([shop_market_date])
data.to_csv(file_path,index=False)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_market_date(self):
全國(guó)商品生命周期數(shù)據(jù)
file_path=os.path.join(self.save_dir,ur_bi_dw.dim_goods_market_date.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
select*FROMur_bi_dw.dim_goods_market_date
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:dim_goods_market_date.+cforcincolumns}
data=data.rename(columns=columns)
data=data.sort_values([dim_goods_market_date.sku_no])
data.to_csv(file_path,index=False)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_goods_color_dev_sizes(self):
商品開(kāi)發(fā)碼數(shù)數(shù)據(jù)
file_path=os.path.join(self.save_dir,dim_goods_color_dev_sizes.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
#sql=SELECT*FROMur_bi_dw.dim_goods_market_shop_dateasgoods_shop_date
sql=SELECT*FROMur_bi_dm.dim_goods_color_dev_sizes
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_goods_color_dev_sizes.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_sales_size(self):
實(shí)際銷(xiāo)售金額
sub_dir=os.path.join(self.save_dir,dwd_daily_sales_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(tag_price)as`tag_price`,
sum(sales_qty)as`sales_qty`,
sum(sales_tag_amt)as`sales_tag_amt`,
sum(sales_amt)as`sales_amt`,
count(0)as`sales_count`
fromur_bi_dw.dwd_daily_sales_sizeassales
wheresales.date_key=%sandsales.date_key%s
andsales.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開(kāi)始時(shí)間
#更新超時(shí)時(shí)間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_delivery_size(self):
實(shí)際配貨金額
sub_dir=os.path.join(self.save_dir,dwd_daily_delivery_size_all)
now_lock=self.get_lock(sub_dir)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(sub_dir):
return
sql=
selectshop_no,sku_no,date_key,`size`,
sum(delivery.shop_distr_received_qty)as`shop_distr_received_qty`,
sum(delivery.shop_distr_received_amt)as`shop_distr_received_amt`,
sum(delivery.online_distr_received_qty)as`online_distr_received_qty`,
sum(delivery.online_distr_received_amt)as`online_distr_received_amt`,
sum(delivery.pr_received_qty)as`pr_received_qty`,
count(0)as`delivery_count`
fromur_bi_dw.dwd_daily_delivery_sizeasdelivery
wheredelivery.date_key=%sanddelivery.date_key%s
anddelivery.currency_code=CNY
groupbyshop_no,sku_no,date_key,`size`
sort_columns=[date_key,shop_no,sku_no]
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017,1,1),#開(kāi)始時(shí)間
#更新超時(shí)時(shí)間
self.save_last_time(sub_dir)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_v_last_nation_sales_status(self):
商品暢滯銷(xiāo)數(shù)據(jù)
file_path=os.path.join(self.save_dir,v_last_nation_sales_status.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=SELECT*FROMur_bi_dw.v_last_nation_sales_status
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(v_last_nation_sales_status.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dwd_daily_finacial_goods(self):
商品成本價(jià)數(shù)據(jù)
file_path=os.path.join(self.save_dir,dwd_daily_finacial_goods.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
selectt1.sku_no,t1.`size`,t1.cost_tax_inclfromur_bi_dw.dwd_daily_finacial_goodsast1
innerjoin(
selectsku_no,`size`,max(date_key)asdate_key
fromur_bi_dw.dwd_daily_finacial_goods
wherecurrency_code=CNYandcountry_code=CN
groupbysku_no,`size`
)ast2
ont2.sku_no=t1.sku_no
andt2.`size`=t1.`size`
andt2.date_key=t1.date_key
wheret1.currency_code=CNYandt1.country_code=CN
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(t1.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_dim_size_group(self):
尺碼映射數(shù)據(jù)
file_path=os.path.join(self.save_dir,dim_size_group.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=select*fromur_bi_dw.dim_size_group
data:pd.DataFrame=self.db_helper.get_data(sql)
columns=list(data.columns)
columns={c:c.replace(dim_size_group.,)forcincolumns}
data=data.rename(columns=columns)
data.to_csv(file_path,index=False)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_common_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
logger:logging.Logger=None):
#共用文件
common_datas_dir=ShareArgs.get_args_value(common_datas_dir)
common_ur_bi_dir=os.path.join(common_datas_dir,ur_bi_data)
ur_bi_get_datas=UrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=common_ur_bi_dir,
logger=logger
try:
(正在查詢(xún)?nèi)掌跀?shù)據(jù)...)
ur_bi_get_datas.get_dim_date()
(查詢(xún)?nèi)掌跀?shù)據(jù)完成!)
(正在查詢(xún)店鋪數(shù)據(jù)...)
ur_bi_get_datas.get_dim_shop()
(查詢(xún)店鋪數(shù)據(jù)完成!)
(正在查詢(xún)天氣數(shù)據(jù)...)
ur_bi_get_datas.get_weather()
(查詢(xún)天氣數(shù)據(jù)完成!)
(正在查詢(xún)天氣城市數(shù)據(jù)...)
ur_bi_get_datas.get_weather_city()
(查詢(xún)天氣城市數(shù)據(jù)完成!)
(正在查詢(xún)貨品數(shù)據(jù)...)
ur_bi_get_datas.get_dim_goods()
(查詢(xún)貨品數(shù)據(jù)完成!)
(正在查詢(xún)實(shí)際銷(xiāo)量數(shù)據(jù)...)
ur_bi_get_datas.get_dwd_daily_sales_size()
(查詢(xún)實(shí)際銷(xiāo)量數(shù)據(jù)完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外拋出異常
finally:
ur_bi_get_datas.close()
classCustomUrBiGetDatas(UrBiGetDatasBase):
def__init__(
self,
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./hjx/data/ur_bi_data,
logger:logging.Logger=None
self.save_dir=save_dir
self.logger=logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
defget_sales_goal_amt(self):
銷(xiāo)售目標(biāo)金額
file_path=os.path.join(self.save_dir,month_of_year_sales_goal_amt.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
selectsales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial)as`sales_goal.serial`,
dates.month_of_year,
sum(sales_goal.sales_goal_amt)assales_goal_amt
fromur_bi_dw.dwd_sales_goal_westassales_goal
innerjoinur_bi_dw.dim_dateasdates
onsales_goal.date_key=dates.date_key
groupbysales_goal.shop_no,
if(sales_goal.serial=Y,W,sales_goal.serial),
dates.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:sales_goal.shop_no,
serial:sales_goal.serial,
month_of_year:dates.month_of_year,
#排序
data=data.sort_values([sales_goal.shop_no,sales_goal.serial,dates.month_of_year])
data.to_csv(file_path)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_shop_serial_area(self):
店-系列面積
file_path=os.path.join(self.save_dir,shop_serial_area.csv)
now_lock=self.get_lock(file_path)
now_lock.acquire()#加鎖
try:
#設(shè)置超時(shí)4小時(shí)才重新查數(shù)據(jù)
ifnotself.get_last_time(file_path):
return
sql=
selectshop_serial_area.shop_no,
if(shop_serial_area.serial=Y,W,shop_serial_area.serial)as`shop_serial_area.serial`,
shop_serial_area.month_of_year,
sum(shop_serial_area.area)as`shop_serial_area.area`
fromur_bi_dw.dwd_shop_serial_areaasshop_serial_area
whereshop_serial_area.areaisnotnull
groupbyshop_serial_area.shop_no,if(shop_serial_area.serial=Y,W,shop_serial_area.serial),shop_serial_area.month_of_year
data:pd.DataFrame=self.db_helper.get_data(sql)
data=data.rename(columns={
shop_no:shop_serial_area.shop_no,
serial:shop_serial_area.serial,
month_of_year:shop_serial_area.month_of_year,
area:shop_serial_area.area,
#排序
data=data.sort_values([shop_serial_area.shop_no,shop_serial_area.serial,shop_serial_area.month_of_year])
data.to_csv(file_path)
#更新超時(shí)時(shí)間
self.save_last_time(file_path)
exceptExceptionasex:
self.logger.exception(ex)
raiseex#往外拋出異常
finally:
now_lock.release()#釋放鎖
defget_datas(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger:logging.Logger=None):
ur_bi_get_datas=CustomUrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
try:
#店,系列,品類(lèi),年月,銷(xiāo)售目標(biāo)金額
(正在查詢(xún)年月銷(xiāo)售目標(biāo)金額數(shù)據(jù)...)
ur_bi_get_datas.get_sales_goal_amt()
(查詢(xún)年月銷(xiāo)售目標(biāo)金額數(shù)據(jù)完成!)
exceptExceptionasex:
logger.exception(ex)
raiseex#往外拋出異常
finally:
ur_bi_get_datas.close()
defgetdata_ur_bi_dw(
host=2,
port=21051,
database=ur_ai_dw,
auth_mechanism=LDAP,
user=urbi,
password=Ur#730xd,
save_dir=./data/sales_forecast/ur_bi_dw_data,
logger=None
get_common_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
get_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
#代碼入口
#getdata_ur_bi_dw(
#host=ur_bi_dw_host,
#port=ur_bi_dw_port,
#database=ur_bi_dw_database,
#auth_mechanism=ur_bi_dw_auth_mechanism,
#user=ur_bi_dw_user,
#password=ur_bi_dw_password,
#save_dir=ur_bi_dw_save_dir,
#logger=logger
#)
代碼說(shuō)明和領(lǐng)悟
每個(gè)類(lèi)的具體作用說(shuō)明,代碼
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2026年太湖創(chuàng)意職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)傾向性測(cè)試題庫(kù)及參考答案詳解1套
- 2026年吐魯番職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)適應(yīng)性測(cè)試題庫(kù)及參考答案詳解
- 2026年長(zhǎng)沙南方職業(yè)學(xué)院?jiǎn)握新殬I(yè)適應(yīng)性考試題庫(kù)及答案詳解一套
- 2026年江蘇省泰州市單招職業(yè)傾向性測(cè)試題庫(kù)及完整答案詳解1套
- 2026年西安電力機(jī)械制造公司機(jī)電學(xué)院?jiǎn)握新殬I(yè)傾向性考試題庫(kù)及答案詳解一套
- 2026年江西工業(yè)職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)技能考試題庫(kù)及答案詳解一套
- 2026年錦州師范高等專(zhuān)科學(xué)校單招職業(yè)技能考試題庫(kù)及參考答案詳解1套
- 2026年黑龍江藝術(shù)職業(yè)學(xué)院?jiǎn)握新殬I(yè)傾向性測(cè)試題庫(kù)及參考答案詳解
- 2026年遼寧建筑職業(yè)學(xué)院?jiǎn)握新殬I(yè)技能測(cè)試題庫(kù)及答案詳解1套
- 2026年吉林電子信息職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)技能測(cè)試題庫(kù)及參考答案詳解1套
- 2023年建筑涂料研發(fā)工程師年終總結(jié)及年后展望
- 新能源汽車(chē)充電樁專(zhuān)屬安裝竣工驗(yàn)收單模板
- 華文慕課計(jì)算機(jī)網(wǎng)絡(luò)原理和因特網(wǎng)(北京大學(xué))章節(jié)測(cè)驗(yàn)答案
- 員工激勵(lì)管理方案模板
- GB/T 5008.2-2005起動(dòng)用鉛酸蓄電池產(chǎn)品品種和規(guī)格
- GB/T 27696-2011一般起重用4級(jí)鍛造吊環(huán)螺栓
- GB/T 25000.10-2016系統(tǒng)與軟件工程系統(tǒng)與軟件質(zhì)量要求和評(píng)價(jià)(SQuaRE)第10部分:系統(tǒng)與軟件質(zhì)量模型
- GB/T 21470-2008錘上鋼質(zhì)自由鍛件機(jī)械加工余量與公差盤(pán)、柱、環(huán)、筒類(lèi)
- GB/T 14260-2010散裝重有色金屬浮選精礦取樣、制樣通則
- GB/T 1048-2019管道元件公稱(chēng)壓力的定義和選用
- 凱石量化對(duì)沖2號(hào)基金合同
評(píng)論
0/150
提交評(píng)論