dwd_user_learn_course.py 4.13 KB
Newer Older
1 2 3 4 5 6 7
# -*- coding: utf-8 -*-


'''
用户学习课程明细统计,定时任务,每2个小时运行一次
'''
import logging
侯双强's avatar
侯双强 committed
8
import os
9 10 11
import sys

from common.file_uitil import get_file_path, get_file_name
侯双强's avatar
侯双强 committed
12 13
from common.mysql_uitl import fetch_all, insert_batch, save_etl_log, save_result
from common.time_util import YMDHMS_FORMAT, now_str
14 15 16 17

logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]
侯双强's avatar
侯双强 committed
18
task_file = os.path.split(__file__)[-1].split(".")[0]
19 20 21 22 23 24


def dwd_user_learn_course(data_dt):
    start_time = str(data_dt) + ' 00:00:00'
    end_time = str(data_dt) + ' 23:59:59'
    learn_course_dict = query_dwd_user_learn_course(start_time, end_time)
侯双强's avatar
侯双强 committed
25 26 27
    row = save_result('tamp_data_dwd', 'dwd_user_learn_course', learn_course_dict, file_name)
    now_time = now_str(YMDHMS_FORMAT)
    save_etl_log('tamp_data_dwd', 'dwd_user_learn_course', data_dt, row, 'done', task_file, now_time)
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106


def query_dwd_user_learn_course(start_time, end_time):
    logging.info(f'query_dwd_user_learn_course start')
    sql = '''
        select	p.data_dt
                ,p.user_id
                ,t.real_name 
                ,t.user_name
                ,t.nickname
                ,t.team_id
                ,t.level_grade
                ,q.package_id as course_id
                ,q.main_title as course_name
                ,q.course_online_time
                ,p.res_id
                ,q.title as res_name
                ,q.res_online_time
                ,p.learn_dur
                ,q.dur as res_dur
                ,round(p.learn_dur / q.dur * 100, 2) as play_rate
                # ,round(q.res_dur, 2) as  res_dur
                # ,round(p.learn_dur / q.res_dur, 2) as play_rate  
                ,p.start_time
                ,p.end_time
        from 	
        (
            select  date_format(server_time,'%%Y-%%m-%%d') as data_dt
                    ,uid as user_id
                    ,res_id
                    # ,round(sum(dur) / 60) as learn_dur
                    ,sum(dur) as learn_dur
                    ,min(start_time) as start_time
                    ,max(end_time) as end_time
            from    tamp_analysis.access_log
            where   server_time between %s and %s
            and     event_type = '10301'
            and     dur >= 1
            and     uid <> ''
            and     uid is not null
            and     res_id <> ''
            and     res_id is not null
            group   by uid,res_id,date_format(server_time,'%%Y-%%m-%%d')
        )  p     
        left    join  tamp_analysis.user_info_view t
        on      p.user_id = t.user_id
        left    join    tamp_analysis.course_res_view q
        on      p.res_id = q.id
        where   t.user_id is not null 
        and     q.id is not null 
        order   by p.user_id,p.start_time 
    '''
    learn_course_dict = fetch_all(sql, (start_time, end_time))
    logging.info(f'query_dwd_user_learn_course success')
    return learn_course_dict


def save_dwd_user_learn_course(ret):
    if ret:
        logging.info('save_dwd_user_learn_course start')
        row = 0
        fields = None
        place_holder = None
        values = list()
        for i in ret:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append(tuple(i.values()))
        sql = f'''replace into tamp_data_dwd.dwd_user_learn_course ( {fields} ) values ( {place_holder} )'''
        rs = insert_batch(sql, values)
        if rs == row or rs == 2 * row:  # 因为这里用的是replace
            logging.info(f'save_dwd_user_learn_course success {row}')
        else:
            logging.error(f'save_dwd_user_learn_course error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')


if __name__ == '__main__':
    import datetime
侯双强's avatar
侯双强 committed
107 108
    begin = datetime.date(2021, 4, 15)
    end = datetime.date(2021, 9, 22)
109 110 111 112 113 114
    data_dt = begin
    delta = datetime.timedelta(days=1)
    while data_dt <= end:
        print(data_dt.strftime("%Y-%m-%d"))
        dwd_user_learn_course(data_dt)
        data_dt += delta