ads_user_learn_course.py 14.6 KB
Newer Older
1 2
# -*- coding: utf-8 -*-
import logging
侯双强's avatar
侯双强 committed
3
import os
4 5
import sys

侯双强's avatar
侯双强 committed
6 7
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
8 9 10 11 12 13 14 15 16 17 18 19 20

'''
用户观看直播明细数据统计,定时任务,每2个小时运行一次
先上线,如果要用访问线索,需要调整这里的逻辑,包括(dws_user_share_event, dws_user_visitor_clues)
一张明细表和一张汇总表
p10505  小节课程播放页面
p10506  小节课程播放页面
p10503	课程包详情(未购买,在H5中访问课程包详情时,都传p10503页面,不用区分是否已购买和未购买)
p10504	课程包详情(已购买, 免费课程也传这个页面)
p10507	试听课程页面(课程包)
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
侯双强's avatar
侯双强 committed
21
task_file = os.path.split(__file__)[-1].split(".")[0]
22 23 24 25 26 27 28 29


def ads_user_learn_course(data_dt):
    single_course_invite_dict = query_single_course_invite_people_record(data_dt)
    course_invite_dict = query_course_invite_people_record(data_dt)
    # 合并单节课程和课程包邀请人数,没有严格去重邀请人数,只是对单节课程,邀请人数做了去重,课程包邀请人数做了去重
    invite_people_record_result_dict = merge_course_invite_people(single_course_invite_dict, course_invite_dict)

侯双强's avatar
侯双强 committed
30 31 32
    learn_course_details_dict = query_user_learn_course_details(data_dt)
    merge_learn_course(learn_course_details_dict, invite_people_record_result_dict)
    # 保存明細数据
侯双强's avatar
侯双强 committed
33 34 35
    row = save_result('tamp_data_ads', 'ads_user_learn_course_details', learn_course_details_dict, file_name)
    now_time = now_str(YMDHMS_FORMAT)
    save_etl_log('tamp_data_ads', 'ads_user_learn_course_details', data_dt, row, 'done', task_file, now_time)
侯双强's avatar
侯双强 committed
36

37 38 39 40 41
    single_course_invite_summary_dict = query_single_course_invite_people_summary()
    course_invite_summary_dict = query_course_invite_people_summary()
    invite_people_summary_result_dict = merge_course_invite_people(single_course_invite_summary_dict, course_invite_summary_dict)
    learn_course_summary_dict = query_user_learn_course_summary()
    merge_learn_course(learn_course_summary_dict, invite_people_summary_result_dict)
侯双强's avatar
侯双强 committed
42
    # 保存汇总数据
43
    save_result('tamp_data_ads', 'ads_user_learn_course_summary', learn_course_summary_dict, file_name)
侯双强's avatar
侯双强 committed
44 45
    now_time = now_str(YMDHMS_FORMAT)
    save_etl_log('tamp_data_ads', 'ads_user_learn_course_summary', data_dt, row, 'done', task_file, now_time)
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102


# 单节课和课程包邀请人数,没有严格去重邀请人数,只是对单节课程,邀请人数做了去重,课程包邀请人数做了去重
# 单节课程邀请人数
def query_single_course_invite_people_record(data_dt):
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''
        select   p.data_dt       
                ,p.source_user_id as user_id         
                ,t.package_id as course_id
                ,count(distinct p.user_id) as invite_num
                # ,t.invite_num    
        from  
        (
            select  data_dt
                    ,source_user_id
                    ,res_id
                    ,user_id
                    # ,count(distinct user_id) as invite_num
            from    tamp_data_dwd.dwd_user_visit_clues
            where   data_dt = %s
            and     current_page in ('p10505', 'p10506')
        ) p
        left    join tamp_analysis.course_res_view t
        on      p.res_id = t.id
        group   by p.data_dt, p.source_user_id, t.package_id
    '''
    single_course_invite_dict = fetch_all(sql, data_dt)
    logging.info(f'{function_name} success')
    return single_course_invite_dict


# 课程包邀请人数
def query_course_invite_people_record(data_dt):
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''
        select  data_dt
                ,source_user_id as user_id
                ,res_id as course_id
                ,count(distinct user_id) as invite_num
        from    tamp_data_dwd.dwd_user_visit_clues
        where   data_dt = %s
        and     current_page in ('p10503', 'p10504', 'p10507')
        group   by data_dt,source_user_id,res_id
    '''
    course_invite_dict = fetch_all(sql, data_dt)
    logging.info(f'{function_name} success')
    return course_invite_dict


# 单节课程邀请人数
def query_single_course_invite_people_summary():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''
侯双强's avatar
侯双强 committed
103
        select   max(p.data_dt) as data_dt        
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
                ,p.user_id         
                ,t.package_id as course_id
                ,count(distinct p.user_id) as invite_num
                # ,t.invite_num    
        from  
        (
            select  data_dt
                    ,source_user_id
                    ,res_id
                    ,user_id
                    # ,count(distinct user_id) as invite_num
            from    tamp_data_dwd.dwd_user_visit_clues
            where    current_page in ('p10505', 'p10506')
        ) p
        left    join tamp_analysis.course_res_view t
        on      p.res_id = t.id
侯双强's avatar
侯双强 committed
120
        group   by p.user_id, t.package_id
121 122 123 124 125 126 127 128 129 130 131
    '''
    single_course_invite_dict = fetch_all(sql, None)
    logging.info(f'{function_name} success')
    return single_course_invite_dict


# 课程包邀请人数
def query_course_invite_people_summary():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''
侯双强's avatar
侯双强 committed
132
        select  max(data_dt) as data_dt
133 134 135 136 137
                ,source_user_id as user_id
                ,res_id as course_id
                ,count(distinct user_id) as invite_num
        from    tamp_data_dwd.dwd_user_visit_clues
        where   current_page in ('p10503', 'p10504', 'p10507')
侯双强's avatar
侯双强 committed
138
        group   by source_user_id,res_id
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
    '''
    course_invite_dict = fetch_all(sql, None)
    logging.info(f'{function_name} success')
    return course_invite_dict


def merge_course_invite_people(dict1_list, dict2_list):
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    merge_dict_tmp = list()
    result_dict = list()
    # 有分享小节课程,有分享课程包
    if dict1_list:
        for x in dict1_list:
            for y in dict2_list:
                # 有分享课程包
                if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']:
                    x['invite_num'] = x['invite_num'] + y['invite_num']
                    merge_dict_tmp.append(x)
                # 没有分享课程包
                else:
                    merge_dict_tmp.append(x)

    # 分享了课程包
    if dict2_list:
        for x in dict2_list:
            for y in dict1_list:
                #
                if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']:
                    x['invite_num'] = x['invite_num'] + y['invite_num']
                    pass
                # 沒有分享小节课程
                else:
                    merge_dict_tmp.append(x)

    # 去重
    for i in merge_dict_tmp:
        if i not in result_dict:
            result_dict.append(i)
    logging.info(f'{function_name} success')
    return result_dict


def query_user_learn_course_details(data_dt):
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''
        select	p.data_dt          
                ,p.user_id          
                ,t.real_name        
                ,t.user_name        
                ,t.nickname         
                ,t.team_id          
                ,p.course_id        
                ,p.course_name      
侯双强's avatar
侯双强 committed
194
                ,coalesce(q.course_type, '免费') as course_type    
195 196
                ,p.total_dur        
                ,p.learn_dur        
侯双强's avatar
侯双强 committed
197
                ,p.play_rate         
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
                ,p.learn_classes    
                ,p.total_classes    
                ,p.online_time      
                ,p.start_time       
                ,p.end_time         
                ,p.share_num  
        from    tamp_data_dws.dws_user_learn_course p
        left    join tamp_analysis.user_info_view t
        on      p.user_id = t.user_id
        left    join  
        (
            select  createby as user_id
                    ,ab_proid as course_id
                    ,case when ab_pay_mode = '1' then '免费'
                          when ab_pay_mode = '2' then '积分兑换'
                          when ab_pay_mode in ('3', '4', '5', '6', '7', '8') then '现金支付'
                          when ab_pay_mode = '99' and ab_order_type = 0 then '系统赠送'
                          when ab_pay_mode = '99' and ab_order_type = 2 then '好友赠送'
                    end course_type
侯双强's avatar
侯双强 committed
217
                    ,count(1)
218 219 220 221 222 223 224
            from    tamp_user.order_flow
            where   deletetag = '0'
            and     ab_type = '300'
            and     ab_proid <> ''
            and 	ab_status = 'SUCCESS'
            and     ab_proid is not null
            and     group_buy_status in (1, 2)
侯双强's avatar
侯双强 committed
225
            group   by createby, ab_proid, course_type # 排除用户多次下单(同一个课程,存在多次下单的情况)
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
        ) q
        on      p.user_id = q.user_id
        and     p.course_id = q.course_id
        where   p.data_dt = %s
    '''
    learn_course_details_dict = fetch_all(sql, data_dt)
    logging.info(f'{function_name} success')
    return learn_course_details_dict


def query_user_learn_course_summary():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''
        select   p.data_dt
                ,p.user_id
                ,q.real_name
                ,q.user_name 
                ,q.nickname
                ,q.team_id
                ,p.course_id
侯双强's avatar
侯双强 committed
247
                ,p.course_name
侯双强's avatar
侯双强 committed
248
                ,coalesce(t.course_type, '免费') as course_type    
249 250 251 252
                ,p.learn_dur
                ,p.total_dur
                ,if(p.play_rate >=100.00, 100.00,  p.play_rate) as play_rate      
                ,coalesce(p.share_num, 0) as share_num
侯双强's avatar
侯双强 committed
253
                ,p.online_time
254 255 256 257 258 259
        from 
        (
            select  max(data_dt) as data_dt          
                    ,user_id                  
                    ,course_id        
                    ,course_name           
侯双强's avatar
侯双强 committed
260
                    ,max(total_dur) as total_dur  # 后面可能有新课程增加    
261
                    ,sum(learn_dur) as learn_dur
侯双强's avatar
侯双强 committed
262
                    ,round(sum(learn_dur) / max(total_dur) * 100, 2) as play_rate 
263 264 265
                    ,online_time            
                    ,sum(share_num) as share_num
            from    tamp_data_dws.dws_user_learn_course    
侯双强's avatar
侯双强 committed
266
            group   by user_id,course_id,course_name,online_time
267 268 269 270 271 272 273 274 275 276 277
        ) p
        left    join    
        (
            select  createby as user_id
                    ,ab_proid as course_id
                    ,case when ab_pay_mode = '1' then '免费'
                          when ab_pay_mode = '2' then '积分兑换'
                          when ab_pay_mode in ('3', '4', '5', '6', '7', '8') then '现金支付'
                          when ab_pay_mode = '99' and ab_order_type = 0 then '系统赠送'
                          when ab_pay_mode = '99' and ab_order_type = 2 then '好友赠送'
                    end course_type
侯双强's avatar
侯双强 committed
278
                    ,count(1)
279 280 281 282 283 284 285
            from    tamp_user.order_flow
            where   deletetag = '0'
            and     ab_type = '300'
            and     ab_proid <> ''
            and     ab_status = 'SUCCESS'
            and     ab_proid is not null
            and     group_buy_status in (1, 2)
侯双强's avatar
侯双强 committed
286
            group   by createby, ab_proid, course_type # 排除用户多次下单(同一个课程,存在多次下单的情况)
287 288 289 290 291 292 293
        ) t
        on      p.user_id = t.user_id
        and     p.course_id = t.course_id
        left    join tamp_analysis.user_info_view q 
        on      p.user_id = q.user_id
        order   by p.user_id, p.data_dt desc, p.course_id
    '''
侯双强's avatar
侯双强 committed
294
    learn_course_summary_dict = fetch_all(sql, None)
295
    logging.info(f'{function_name} success')
侯双强's avatar
侯双强 committed
296
    return learn_course_summary_dict
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363


def merge_course_invite_people(dict1_list, dict2_list):
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    merge_dict_tmp = list()
    result_dict = list()
    # 有分享小节课程,有分享课程包
    if dict1_list:
        for x in dict1_list:
            for y in dict2_list:
                # 有分享课程包
                if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']:
                    x['invite_num'] = x['invite_num'] + y['invite_num']
                    merge_dict_tmp.append(x)
                # 没有分享课程包
                else:
                    merge_dict_tmp.append(x)

    # 分享了课程包
    if dict2_list:
        for x in dict2_list:
            for y in dict1_list:
                #
                if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']:
                    x['invite_num'] = x['invite_num'] + y['invite_num']
                    pass
                # 沒有分享小节课程
                else:
                    merge_dict_tmp.append(x)

    # 去重
    for i in merge_dict_tmp:
        if i not in result_dict:
            result_dict.append(i)
    logging.info(f'{function_name} success')
    return result_dict


# 学习课程数据中包括(分享事件), 所以,有要求人,就一定会有学习课程数据
def merge_learn_course(learn_course_dict, invite_dict):
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    merge_dict_tmp = list()
    result_dict = list()

    if learn_course_dict:
        for x in learn_course_dict:
            for y in invite_dict:
                # 邀请到了人
                if x['course_id'] == y['course_id'] and x['user_id'] == y['user_id']:
                    x['invite_num'] = y['invite_num']
                    merge_dict_tmp.append(x)
                # 没有邀请到人
                else:
                    x['invite_num'] = 0
                    merge_dict_tmp.append(x)
    # 去重
    for i in merge_dict_tmp:
        if i not in result_dict:
            result_dict.append(i)
    logging.info(f'{function_name} success')
    return result_dict


if __name__ == '__main__':
    import datetime
侯双强's avatar
侯双强 committed
364 365
    begin = datetime.date(2021, 9, 17)
    end = datetime.date(2021, 9, 22)
366 367 368 369 370 371 372
    data_dt = begin
    delta = datetime.timedelta(days=1)
    while data_dt <= end:
        print(data_dt.strftime("%Y-%m-%d"))
        ads_user_learn_course(data_dt)
        data_dt += delta