1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-
'''
用户学习课程明细统计,定时任务,每2个小时运行一次
'''
import logging
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]
def dwd_user_learn_course(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
learn_course_dict = query_dwd_user_learn_course(start_time, end_time)
save_dwd_user_learn_course(learn_course_dict)
# save_ads_user_learn_course(learn_course_dict) # 明细数据双写
def query_dwd_user_learn_course(start_time, end_time):
logging.info(f'query_dwd_user_learn_course start')
sql = '''
select p.data_dt
,p.user_id
,t.real_name
,t.user_name
,t.nickname
,t.team_id
,t.level_grade
,q.package_id as course_id
,q.main_title as course_name
,q.course_online_time
,p.res_id
,q.title as res_name
,q.res_online_time
,p.learn_dur
,q.dur as res_dur
,round(p.learn_dur / q.dur * 100, 2) as play_rate
# ,round(q.res_dur, 2) as res_dur
# ,round(p.learn_dur / q.res_dur, 2) as play_rate
,p.start_time
,p.end_time
from
(
select date_format(server_time,'%%Y-%%m-%%d') as data_dt
,uid as user_id
,res_id
# ,round(sum(dur) / 60) as learn_dur
,sum(dur) as learn_dur
,min(start_time) as start_time
,max(end_time) as end_time
from tamp_analysis.access_log
where server_time between %s and %s
and event_type = '10301'
and dur >= 1
and uid <> ''
and uid is not null
and res_id <> ''
and res_id is not null
group by uid,res_id,date_format(server_time,'%%Y-%%m-%%d')
) p
left join tamp_analysis.user_info_view t
on p.user_id = t.user_id
left join tamp_analysis.course_res_view q
on p.res_id = q.id
where t.user_id is not null
and q.id is not null
order by p.user_id,p.start_time
'''
learn_course_dict = fetch_all(sql, (start_time, end_time))
logging.info(f'query_dwd_user_learn_course success')
return learn_course_dict
def save_dwd_user_learn_course(ret):
if ret:
logging.info('save_dwd_user_learn_course start')
row = 0
fields = None
place_holder = None
values = list()
for i in ret:
row = row + 1
fields = ','.join([f"`{k}`" for k in i.keys()])
place_holder = ','.join(["%s" for _ in i.keys()])
values.append(tuple(i.values()))
sql = f'''replace into tamp_data_dwd.dwd_user_learn_course ( {fields} ) values ( {place_holder} )'''
rs = insert_batch(sql, values)
if rs == row or rs == 2 * row: # 因为这里用的是replace
logging.info(f'save_dwd_user_learn_course success {row}')
else:
logging.error(f'save_dwd_user_learn_course error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_learn_course(data_dt)
data_dt += delta