# -*- coding: utf-8 -*- ''' 用户学习直播明细统计,定时任务,每2个小时运行一次 ''' import logging import os import sys from common.file_uitil import get_file_path, get_file_name from common.mysql_uitl import fetch_all, insert_batch logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) # file_path = get_file_path() file_name = sys.argv[0] def dwd_user_watch_live(data_dt): start_time = str(data_dt) + ' 00:00:00' end_time = str(data_dt) + ' 23:59:59' user_watch_dict = query_dwd_user_watch_live(start_time, end_time) save_dwd_user_watch_live(user_watch_dict) def query_dwd_user_watch_live(start_time, end_time): logging.info(f'query_dwd_user_watch_live start') sql = ''' select p.data_dt ,p.user_id ,t.real_name ,t.user_name ,t.nickname ,t.team_id ,t.level_grade ,p.res_id ,q.zt_name as res_name ,p.event_type ,if(p.event_type = '1014','观看','回看') as watch_type ,q.room_type ,if(q.room_type = 1, '母直播间', '子直播间') as live_type ,p.learn_dur ,q.real_duration_second as res_dur ,round(p.learn_dur / q.real_duration_second * 100, 2) as play_rate ,p.start_time ,p.end_time ,q.zt_starttime as live_start ,q.zt_endtime as live_end from ( select date_format(server_time,'%%Y-%%m-%%d') as data_dt ,uid as user_id ,res_id ,event_type ,sum(dur) as learn_dur ,min(start_time) as start_time ,max(end_time) as end_time from tamp_analysis.access_log where server_time between %s and %s and event_type in ('1014', '1015') and dur >= 1 and uid <> '' and uid is not null and res_id <> '' and res_id is not null group by date_format(server_time,'%%Y-%%m-%%d'), uid, res_id, event_type ) p left join tamp_analysis.user_info_view t on p.user_id = t.user_id left join tamp_zhibo.zhibo_theme q on p.res_id = q.id where t.user_id is not null and q.id is not null and q.isPrivacy = 0 order by p.user_id,p.start_time ''' watch_live_dict = fetch_all(sql, (start_time, end_time)) logging.info(f'query_dwd_user_watch_live success') return watch_live_dict def save_dwd_user_watch_live(ret): if ret: logging.info('save_dwd_user_watch_live start') row = 0 fields = None place_holder = None values = list() for i in ret: row = row + 1 fields = ','.join([f"`{k}`" for k in i.keys()]) place_holder = ','.join(["%s" for _ in i.keys()]) values.append(tuple(i.values())) sql = f'''replace into tamp_data_dwd.dwd_user_watch_live ( {fields} ) values ( {place_holder} )''' rs = insert_batch(sql, values) if rs == row or rs == 2 * row: # 因为这里用的是replace logging.info(f'save_dwd_user_watch_live success {row}') else: logging.error(f'save_dwd_user_watch_live error 数据为:{row}行,插入成功为:{rs} 执行程序为:{file_name}') if __name__ == '__main__': import datetime begin = datetime.date(2021, 4, 1) end = datetime.date(2021, 9, 14) data_dt = begin delta = datetime.timedelta(days=1) while data_dt <= end: print(data_dt.strftime("%Y-%m-%d")) dwd_user_watch_live(data_dt) data_dt += delta