# -*- coding: utf-8 -*- import logging import os import sys from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log from common.time_util import now_str, YMDHMS_FORMAT ''' 用户观看直播明细数据统计,定时任务,每2个小时运行一次 先上线,如果要用访问线索,需要调整这里的逻辑,包括(dws_user_share_event, dws_user_visitor_clues) 一张明细表和一张汇总表 p10505 小节课程播放页面 p10506 小节课程播放页面 p10503 课程包详情(未购买,在H5中访问课程包详情时,都传p10503页面,不用区分是否已购买和未购买) p10504 课程包详情(已购买, 免费课程也传这个页面) p10507 试听课程页面(课程包) ''' logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) file_name = sys.argv[0] task_file = os.path.split(__file__)[-1].split(".")[0] def ads_user_learn_course(data_dt): single_course_invite_dict = query_single_course_invite_people_record(data_dt) course_invite_dict = query_course_invite_people_record(data_dt) # 合并单节课程和课程包邀请人数,没有严格去重邀请人数,只是对单节课程,邀请人数做了去重,课程包邀请人数做了去重 invite_people_record_result_dict = merge_course_invite_people(single_course_invite_dict, course_invite_dict) learn_course_details_dict = query_user_learn_course_details(data_dt) merge_learn_course(learn_course_details_dict, invite_people_record_result_dict) # 保存明細数据 row = save_result('tamp_data_ads', 'ads_user_learn_course_details', learn_course_details_dict, file_name) now_time = now_str(YMDHMS_FORMAT) save_etl_log('tamp_data_ads', 'ads_user_learn_course_details', data_dt, row, 'done', task_file, now_time) single_course_invite_summary_dict = query_single_course_invite_people_summary() course_invite_summary_dict = query_course_invite_people_summary() invite_people_summary_result_dict = merge_course_invite_people(single_course_invite_summary_dict, course_invite_summary_dict) learn_course_summary_dict = query_user_learn_course_summary() merge_learn_course(learn_course_summary_dict, invite_people_summary_result_dict) # 保存汇总数据 save_result('tamp_data_ads', 'ads_user_learn_course_summary', learn_course_summary_dict, file_name) now_time = now_str(YMDHMS_FORMAT) save_etl_log('tamp_data_ads', 'ads_user_learn_course_summary', data_dt, row, 'done', task_file, now_time) # 单节课和课程包邀请人数,没有严格去重邀请人数,只是对单节课程,邀请人数做了去重,课程包邀请人数做了去重 # 单节课程邀请人数 def query_single_course_invite_people_record(data_dt): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select p.data_dt ,p.source_user_id as user_id ,t.package_id as course_id ,count(distinct p.user_id) as invite_num # ,t.invite_num from ( select data_dt ,source_user_id ,res_id ,user_id # ,count(distinct user_id) as invite_num from tamp_data_dwd.dwd_user_visit_clues where data_dt = %s and current_page in ('p10505', 'p10506') ) p left join tamp_analysis.course_res_view t on p.res_id = t.id group by p.data_dt, p.source_user_id, t.package_id ''' single_course_invite_dict = fetch_all(sql, data_dt) logging.info(f'{function_name} success') return single_course_invite_dict # 课程包邀请人数 def query_course_invite_people_record(data_dt): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select data_dt ,source_user_id as user_id ,res_id as course_id ,count(distinct user_id) as invite_num from tamp_data_dwd.dwd_user_visit_clues where data_dt = %s and current_page in ('p10503', 'p10504', 'p10507') group by data_dt,source_user_id,res_id ''' course_invite_dict = fetch_all(sql, data_dt) logging.info(f'{function_name} success') return course_invite_dict # 单节课程邀请人数 def query_single_course_invite_people_summary(): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select max(p.data_dt) as data_dt ,p.user_id ,t.package_id as course_id ,count(distinct p.user_id) as invite_num # ,t.invite_num from ( select data_dt ,source_user_id ,res_id ,user_id # ,count(distinct user_id) as invite_num from tamp_data_dwd.dwd_user_visit_clues where current_page in ('p10505', 'p10506') ) p left join tamp_analysis.course_res_view t on p.res_id = t.id group by p.user_id, t.package_id ''' single_course_invite_dict = fetch_all(sql, None) logging.info(f'{function_name} success') return single_course_invite_dict # 课程包邀请人数 def query_course_invite_people_summary(): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select max(data_dt) as data_dt ,source_user_id as user_id ,res_id as course_id ,count(distinct user_id) as invite_num from tamp_data_dwd.dwd_user_visit_clues where current_page in ('p10503', 'p10504', 'p10507') group by source_user_id,res_id ''' course_invite_dict = fetch_all(sql, None) logging.info(f'{function_name} success') return course_invite_dict def merge_course_invite_people(dict1_list, dict2_list): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') merge_dict_tmp = list() result_dict = list() # 有分享小节课程,有分享课程包 if dict1_list: for x in dict1_list: for y in dict2_list: # 有分享课程包 if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']: x['invite_num'] = x['invite_num'] + y['invite_num'] merge_dict_tmp.append(x) # 没有分享课程包 else: merge_dict_tmp.append(x) # 分享了课程包 if dict2_list: for x in dict2_list: for y in dict1_list: # if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']: x['invite_num'] = x['invite_num'] + y['invite_num'] pass # 沒有分享小节课程 else: merge_dict_tmp.append(x) # 去重 for i in merge_dict_tmp: if i not in result_dict: result_dict.append(i) logging.info(f'{function_name} success') return result_dict def query_user_learn_course_details(data_dt): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select p.data_dt ,p.user_id ,t.real_name ,t.user_name ,t.nickname ,t.team_id ,p.course_id ,p.course_name ,coalesce(q.course_type, '免费') as course_type ,p.total_dur ,p.learn_dur ,p.play_rate ,p.learn_classes ,p.total_classes ,p.online_time ,p.start_time ,p.end_time ,p.share_num from tamp_data_dws.dws_user_learn_course p left join tamp_analysis.user_info_view t on p.user_id = t.user_id left join ( select createby as user_id ,ab_proid as course_id ,case when ab_pay_mode = '1' then '免费' when ab_pay_mode = '2' then '积分兑换' when ab_pay_mode in ('3', '4', '5', '6', '7', '8') then '现金支付' when ab_pay_mode = '99' and ab_order_type = 0 then '系统赠送' when ab_pay_mode = '99' and ab_order_type = 2 then '好友赠送' end course_type ,count(1) from tamp_user.order_flow where deletetag = '0' and ab_type = '300' and ab_proid <> '' and ab_status = 'SUCCESS' and ab_proid is not null and group_buy_status in (1, 2) group by createby, ab_proid, course_type # 排除用户多次下单(同一个课程,存在多次下单的情况) ) q on p.user_id = q.user_id and p.course_id = q.course_id where p.data_dt = %s ''' learn_course_details_dict = fetch_all(sql, data_dt) logging.info(f'{function_name} success') return learn_course_details_dict def query_user_learn_course_summary(): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') sql = ''' select p.data_dt ,p.user_id ,q.real_name ,q.user_name ,q.nickname ,q.team_id ,p.course_id ,p.course_name ,coalesce(t.course_type, '免费') as course_type ,p.learn_dur ,p.total_dur ,if(p.play_rate >=100.00, 100.00, p.play_rate) as play_rate ,coalesce(p.share_num, 0) as share_num ,p.online_time from ( select max(data_dt) as data_dt ,user_id ,course_id ,course_name ,max(total_dur) as total_dur # 后面可能有新课程增加 ,sum(learn_dur) as learn_dur ,round(sum(learn_dur) / max(total_dur) * 100, 2) as play_rate ,online_time ,sum(share_num) as share_num from tamp_data_dws.dws_user_learn_course group by user_id,course_id,course_name,online_time ) p left join ( select createby as user_id ,ab_proid as course_id ,case when ab_pay_mode = '1' then '免费' when ab_pay_mode = '2' then '积分兑换' when ab_pay_mode in ('3', '4', '5', '6', '7', '8') then '现金支付' when ab_pay_mode = '99' and ab_order_type = 0 then '系统赠送' when ab_pay_mode = '99' and ab_order_type = 2 then '好友赠送' end course_type ,count(1) from tamp_user.order_flow where deletetag = '0' and ab_type = '300' and ab_proid <> '' and ab_status = 'SUCCESS' and ab_proid is not null and group_buy_status in (1, 2) group by createby, ab_proid, course_type # 排除用户多次下单(同一个课程,存在多次下单的情况) ) t on p.user_id = t.user_id and p.course_id = t.course_id left join tamp_analysis.user_info_view q on p.user_id = q.user_id order by p.user_id, p.data_dt desc, p.course_id ''' learn_course_summary_dict = fetch_all(sql, None) logging.info(f'{function_name} success') return learn_course_summary_dict def merge_course_invite_people(dict1_list, dict2_list): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') merge_dict_tmp = list() result_dict = list() # 有分享小节课程,有分享课程包 if dict1_list: for x in dict1_list: for y in dict2_list: # 有分享课程包 if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']: x['invite_num'] = x['invite_num'] + y['invite_num'] merge_dict_tmp.append(x) # 没有分享课程包 else: merge_dict_tmp.append(x) # 分享了课程包 if dict2_list: for x in dict2_list: for y in dict1_list: # if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']: x['invite_num'] = x['invite_num'] + y['invite_num'] pass # 沒有分享小节课程 else: merge_dict_tmp.append(x) # 去重 for i in merge_dict_tmp: if i not in result_dict: result_dict.append(i) logging.info(f'{function_name} success') return result_dict # 学习课程数据中包括(分享事件), 所以,有要求人,就一定会有学习课程数据 def merge_learn_course(learn_course_dict, invite_dict): function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') merge_dict_tmp = list() result_dict = list() if learn_course_dict: for x in learn_course_dict: for y in invite_dict: # 邀请到了人 if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']: x['invite_num'] = y['invite_num'] merge_dict_tmp.append(x) # 没有邀请到人 else: x['invite_num'] = 0 merge_dict_tmp.append(x) # 去重 for i in merge_dict_tmp: if i not in result_dict: result_dict.append(i) logging.info(f'{function_name} success') return result_dict if __name__ == '__main__': import datetime begin = datetime.date(2021, 9, 17) end = datetime.date(2021, 9, 22) data_dt = begin delta = datetime.timedelta(days=1) while data_dt <= end: print(data_dt.strftime("%Y-%m-%d")) ads_user_learn_course(data_dt) data_dt += delta