dwd_user_learn_course.py 3.88 KB
# -*- coding: utf-8 -*-


'''
用户学习课程明细统计,定时任务,每2个小时运行一次
'''
import logging
import sys

from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch


logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]


def dwd_user_learn_course(data_dt):
    start_time = str(data_dt) + ' 00:00:00'
    end_time = str(data_dt) + ' 23:59:59'
    learn_course_dict = query_dwd_user_learn_course(start_time, end_time)
    save_dwd_user_learn_course(learn_course_dict)
    # save_ads_user_learn_course(learn_course_dict)  # 明细数据双写


def query_dwd_user_learn_course(start_time, end_time):
    logging.info(f'query_dwd_user_learn_course start')
    sql = '''
        select	p.data_dt
                ,p.user_id
                ,t.real_name 
                ,t.user_name
                ,t.nickname
                ,t.team_id
                ,t.level_grade
                ,q.package_id as course_id
                ,q.main_title as course_name
                ,q.course_online_time
                ,p.res_id
                ,q.title as res_name
                ,q.res_online_time
                ,p.learn_dur
                ,q.dur as res_dur
                ,round(p.learn_dur / q.dur * 100, 2) as play_rate
                # ,round(q.res_dur, 2) as  res_dur
                # ,round(p.learn_dur / q.res_dur, 2) as play_rate  
                ,p.start_time
                ,p.end_time
        from 	
        (
            select  date_format(server_time,'%%Y-%%m-%%d') as data_dt
                    ,uid as user_id
                    ,res_id
                    # ,round(sum(dur) / 60) as learn_dur
                    ,sum(dur) as learn_dur
                    ,min(start_time) as start_time
                    ,max(end_time) as end_time
            from    tamp_analysis.access_log
            where   server_time between %s and %s
            and     event_type = '10301'
            and     dur >= 1
            and     uid <> ''
            and     uid is not null
            and     res_id <> ''
            and     res_id is not null
            group   by uid,res_id,date_format(server_time,'%%Y-%%m-%%d')
        )  p     
        left    join  tamp_analysis.user_info_view t
        on      p.user_id = t.user_id
        left    join    tamp_analysis.course_res_view q
        on      p.res_id = q.id
        where   t.user_id is not null 
        and     q.id is not null 
        order   by p.user_id,p.start_time 
    '''
    learn_course_dict = fetch_all(sql, (start_time, end_time))
    logging.info(f'query_dwd_user_learn_course success')
    return learn_course_dict


def save_dwd_user_learn_course(ret):
    if ret:
        logging.info('save_dwd_user_learn_course start')
        row = 0
        fields = None
        place_holder = None
        values = list()
        for i in ret:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append(tuple(i.values()))
        sql = f'''replace into tamp_data_dwd.dwd_user_learn_course ( {fields} ) values ( {place_holder} )'''
        rs = insert_batch(sql, values)
        if rs == row or rs == 2 * row:  # 因为这里用的是replace
            logging.info(f'save_dwd_user_learn_course success {row}')
        else:
            logging.error(f'save_dwd_user_learn_course error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')


if __name__ == '__main__':
    import datetime
    begin = datetime.date(2021, 4, 1)
    end = datetime.date(2021, 9, 14)
    data_dt = begin
    delta = datetime.timedelta(days=1)
    while data_dt <= end:
        print(data_dt.strftime("%Y-%m-%d"))
        dwd_user_learn_course(data_dt)
        data_dt += delta