# -*- coding: utf-8 -*- import logging import sys from common.mysql_uitl import fetch_all, save_result, insert_batch, insert ''' 用户观看直播明细数据统计,定时任务,每2个小时运行一次 先上线,如果要用访问线索,需要调整这里的逻辑,包括(dws_user_share_event, dws_user_visitor_clues) 一张明细表和一张汇总表 ''' logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) file_name = sys.argv[0] def ads_user_watch_live(data_dt): watch_live_details_dict = query_user_watch_live_details(data_dt) watch_live_summary_dict = query_user_watch_live_summary() save_result('tamp_data_ads', 'ads_user_watch_live_details', watch_live_details_dict, file_name) save_result('tamp_data_ads', 'ads_user_watch_live_summary', watch_live_summary_dict, file_name) def query_user_watch_live_details(data_dt): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select p.data_dt ,p.user_id ,q.real_name ,q.user_name ,q.nickname ,q.team_id ,p.res_id ,p.res_name ,p.res_dur ,p.learn_dur ,p.play_rate ,p.event_type ,p.watch_type ,p.room_type ,p.live_type ,p.live_start ,p.live_end ,p.start_time ,p.end_time ,coalesce(p.share_num, 0) as share_num ,coalesce(t.invite_num, 0) as invite_num from tamp_data_dws.dws_user_watch_live p left join ( select data_dt ,source_user_id ,res_id ,count(distinct user_id) as invite_num from tamp_data_dwd.dwd_user_visit_clues where data_dt = %s and current_page in ('p1005', 'p1006') group by data_dt,source_user_id,res_id ) t on p.user_id = t.source_user_id and p.res_id = t.res_id left join tamp_analysis.user_info_view q on p.user_id = q.user_id where p.data_dt = %s ''' watch_live_details_dict = fetch_all(sql, (data_dt, data_dt)) logging.info(f'{function_name} success') return watch_live_details_dict def query_user_watch_live_summary(): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select p.data_dt ,p.user_id ,q.real_name ,q.user_name ,q.nickname ,q.team_id ,p.res_id ,p.res_name ,p.res_dur ,p.total_dur ,if(p.play_rate >=100.00, 100.00, p.play_rate) as play_rate ,p.live_start ,p.live_end ,coalesce(p.share_num, 0) as share_num ,coalesce(t.invite_num, 0) as invite_num from ( select user_id ,max(data_dt) as data_dt ,res_id ,res_name ,live_start ,live_end ,res_dur ,sum(learn_dur) as total_dur ,round(sum(learn_dur) / res_dur * 100, 2) as play_rate ,sum(share_num) as share_num from tamp_data_dws.dws_user_watch_live group by user_id,res_id,res_name,live_start,live_end,res_dur ) p left join ( select source_user_id ,res_id ,count(distinct user_id) as invite_num from tamp_data_dwd.dwd_user_visit_clues where current_page in ('p1005', 'p1006') group by source_user_id,res_id ) t on p.user_id = t.source_user_id and p.res_id = t.res_id left join tamp_analysis.user_info_view q on p.user_id = q.user_id order by p.user_id, p.data_dt desc, p.res_id ''' watch_live_summary_dict = fetch_all(sql, None) logging.info(f'{function_name} success') return watch_live_summary_dict if __name__ == '__main__': import datetime begin = datetime.date(2021, 9, 14) end = datetime.date(2021, 9, 14) data_dt = begin delta = datetime.timedelta(days=1) while data_dt <= end: print(data_dt.strftime("%Y-%m-%d")) ads_user_watch_live(data_dt) data_dt += delta