# -*- coding: utf-8 -*- import logging import sys from common.mysql_uitl import fetch_all, save_result, insert_batch, insert ''' 用户观看直播明细数据统计,定时任务,每2个小时运行一次 先上线,如果要用访问线索,需要调整这里的逻辑,包括(dws_user_share_event, dws_user_visitor_clues) ''' logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) file_name = sys.argv[0] def dws_user_watch_live(data_dt): query_dws_user_share_live_num(data_dt) share_live_dict = query_dws_user_share_live_num(data_dt) watch_live_dict = query_dws_user_watch_live(data_dt) merge_live_result_dict = merge_live_dict(watch_live_dict, share_live_dict) print(merge_live_result_dict) # save_dws_user_watch_live_test(merge_live_result_dict) # save_dws_user_watch_live(merge_live_result_dict) save_result('tamp_data_dws', 'dws_user_watch_live', merge_live_result_dict, file_name) def query_dws_user_share_live_num(data_dt): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} success') sql = ''' select p.data_dt ,p.source_user_id as user_id ,p.res_id ,t.zt_name as res_name # ,t.real_duration_second as res_dur ,coalesce(t.real_duration_second, 0) as res_dur ,0 as learn_dur ,0.0 as play_rate ,1000 as event_type -- 未观看 ,'未观看' as watch_type ,t.room_type ,if(t.room_type = 1, '母直播间', '子直播间') as live_type ,t.zt_starttime as live_start ,t.zt_endtime as live_end ,p.local_time as start_time ,p.local_time as end_time ,count(p.res_id) as share_num from tamp_data_dwd.dwd_user_share_event p left join tamp_zhibo.zhibo_theme t on p.res_id = t.id where p.data_dt = %s and p.event_type = '1008' # and p.user_id = '220351106097303552' # and p.res_id = 'ZHIBO_THEME16300638117514' # and p.user_id in ('213993773729746944' # ,'213993773729746944' # ,'240898270812655616' # ,'238092653945376768' # ,'235753274405261312' # ,'238092653945376768' # ,'238845342891331584' # ) # and res_id in ('ZHIBO_THEME16161212225773' # ,'ZHIBO_THEME16161212225773' # ,'ZHIBO_THEME16183007936720' # ,'ZHIBO_THEME16183007936720' # ,'ZHIBO_THEME16183007936720' # ,'ZHIBO_THEME16183007936720' # ,'ZHIBO_THEME16073067863314' # ) group by p.source_user_id, p.res_id, p.local_time ''' share_live_dict = fetch_all(sql, data_dt) logging.info(f'{function_name} success') return share_live_dict def query_dws_user_watch_live(data_dt): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = f''' select data_dt ,user_id ,res_id ,res_name ,coalesce(res_dur,0) as res_dur ,learn_dur ,coalesce(play_rate, 0) as play_rate ,event_type ,watch_type ,room_type ,live_type ,live_start ,live_end ,start_time ,end_time from tamp_data_dwd.dwd_user_watch_live where data_dt = %s # and user_id = '220351106097303552' # and res_id = 'ZHIBO_THEME16300638117514' ''' watch_live_dict = fetch_all(sql, data_dt) logging.info(f'{function_name} success') return watch_live_dict def merge_live_dict(watch_live_dict, share_live_dict): function_name = sys._getframe().f_code.co_name merge_live_tmp = list() merge_live_result = list() if watch_live_dict: print(share_live_dict) logging.info(f'{function_name} start') for x in watch_live_dict: for y in share_live_dict: # 用户观看并且分享了直播 if x['res_id'] == y['res_id'] and x['user_id'] == y['user_id']: print('_____________________') print(x, '|', y) x['share_num'] = y['share_num'] merge_live_tmp.append(x) print(x) print('_____________________') else: # merge_live_tmp.append(x) print('************************') print(x, '|', y) x['share_num'] = 0 merge_live_tmp.append(x) merge_live_tmp.append(y) print(x, '|', y) print('************************') # # 存在分享直播事件,却没有观看直播 # if 'share_num' in y.keys(): # merge_live_tmp.append(y) # # # 用户观看了直播,却没有分享直播, 给分享次数赋值为:0 # if 'share_num' not in y.keys(): # print('*******************************') # x['share_num'] = x.get('share_num', 0) # merge_live_tmp.append(y) else: print(share_live_dict) logging.info(f'{function_name} start') for x in share_live_dict: merge_live_tmp.append(x) print('************************') # # 存在分享直播事件,却没有观看直播 # if 'share_num' in y.keys(): # merge_live_tmp.append(y) # # # 用户观看了直播,却没有分享直播, 给分享次数赋值为:0 # if 'share_num' not in y.keys(): # print('*******************************') # x['share_num'] = x.get('share_num', 0) # merge_live_tmp.append(y) # 去重 # print(merge_live_tmp) for i in merge_live_tmp: if i not in merge_live_result: merge_live_result.append(i) logging.info(f'{function_name} success') return merge_live_result def save_dws_user_watch_live(ret): function_name = sys._getframe().f_code.co_name if ret: logging.info(f'{function_name} start') row = 0 fields = None place_holder = None values = list() for i in ret: row = row + 1 fields = ','.join([f"`{k}`" for k in i.keys()]) place_holder = ','.join(["%s" for _ in i.keys()]) # print(tuple(i.values())) values.append(tuple(i.values())) # print(fields) sql = f'''replace into tamp_data_dws.dws_user_watch_live ( {fields} ) values ( {place_holder} )''' # print(sql) ret = insert_batch(sql, values) if ret == row or ret == 2 * row: # 因为这里用的是replace logging.info(f'{function_name} success {row}') else: logging.error(f'{function_name} error 数据为:{row}行,插入成功为:{ret} 行 执行程序为:{file_name}') # def save_dws_user_watch_live(ret): # if ret: # logging.info('save_dws_user_login_area start') # row = 0 # fields = None # place_holder = None # values = list() # for i in ret: # # row = row + 1 # row = 1 # fields = ','.join([f"`{k}`" for k in i.keys()]) # place_holder = ','.join([f"'{v}'" for v in i.values()]) # # place_holder = ','.join(["%s" for _ in i.keys()]) # print(place_holder) # values.append(tuple(i.values())) # sql = f'''replace into tamp_data_dws.dws_user_watch_live ( {fields} ) values ( {place_holder} )''' # # print(sql, tuple(i.values())) # print(sql) # rs = insert(sql, None) # if rs == row or rs == 2 * row: # 因为这里用的是replace # logging.info(f'save_dws_user_login_area success {row}') # else: # logging.error(f'save_dws_user_login_area error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}') # def save_dws_user_watch_live_test(user_login_arr): # logging.info('save_user_login start') # for i in user_login_arr: # fields = ','.join([f"`{k}`" for k in i.keys()]) # place_holder = ','.join(["%s" for _ in i.keys()]) # sql = f"replace into tamp_data_dws.dws_user_watch_live_test ( {fields} ) values ( {place_holder} )" # try: # insert(sql, tuple(i.values())) # except Exception as e: # logging.error('save_user_login error : ' + str(e)) # logging.info(f'save_user_login success') if __name__ == '__main__': import datetime begin = datetime.date(2021, 4, 18) end = datetime.date(2021, 4, 18) data_dt = begin delta = datetime.timedelta(days=1) while data_dt <= end: print(data_dt.strftime("%Y-%m-%d")) dws_user_watch_live(data_dt) data_dt += delta