# -*- coding: utf-8 -*- import logging import os import sys from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log from common.time_util import now_str, YMDHMS_FORMAT ''' 用户观看直播明细数据统计,定时任务,每2个小时运行一次 先上线,如果要用访问线索,需要调整这里的逻辑,包括(dws_user_share_event, dws_user_visitor_clues) ''' logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) file_name = sys.argv[0] task_file = os.path.split(__file__)[-1].split(".")[0] def dws_user_browse_fund(data_dt): browse_fund_dict = query_dws_user_browse_fund(data_dt) row = save_result('tamp_data_dws', 'dws_user_browse_fund', browse_fund_dict, file_name) now_time = now_str(YMDHMS_FORMAT) save_etl_log('tamp_data_dws', 'dws_user_browse_fund', data_dt, row, 'done', task_file, now_time) def query_dws_user_browse_fund(data_dt): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = f''' select p.data_dt ,p.user_id ,p.real_name ,p.user_name ,p.nickname ,p.team_id ,p.level_grade ,p.res_id ,p.res_type ,p.res_name ,p.res_short_name ,p.browse_dur ,coalesce(p.browse_num, 0) as browse_num ,coalesce(t.share_num, 0) as share_num ,p.start_time ,p.end_time from tamp_data_dwd.dwd_user_browse_fund p left join ( select data_dt ,user_id ,res_id ,count(1) as share_num from tamp_data_dwd.dwd_user_share_event where event_type = '4040' and data_dt = %s group by data_dt, user_id, res_id ) t on p.user_id = t.user_id and p.res_id = t.res_id where p.data_dt = %s ''' browse_fund_dict = fetch_all(sql, (data_dt, data_dt)) logging.info(f'{function_name} success') return browse_fund_dict if __name__ == '__main__': import datetime begin = datetime.date(2021, 9, 15) end = datetime.date(2021, 9, 22) data_dt = begin delta = datetime.timedelta(days=1) while data_dt <= end: print(data_dt.strftime("%Y-%m-%d")) dws_user_browse_fund(data_dt) data_dt += delta