1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-
'''
用户浏览基金详情,定时任务,每2个小时运行一次
'''
import logging
import sys
from common.mysql_uitl import fetch_all, insert_batch
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
def dwd_user_browse_fund(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
browse_fund_dict = query_user_browse_fund(start_time, end_time)
save_dwd_user_browse_fund(browse_fund_dict)
def query_user_browse_fund(start_time, end_time):
'''
p2060 产品详情(包括白名单和私募)(专栏中)
p2107 产品详情(公募)
p2108 产品详情(理财师添加)
p6020 导入基金(添加导入基金的页面)
p2109 无净值产品详情页面
'''
logging.info(f'query_user_browse_fund start')
sql = '''
select p.data_dt
,p.user_id
,t.real_name
,t.user_name
,t.nickname
,t.team_id
,t.level_grade
,p.res_id
,q.fund_type as res_type
,q.fund_name as res_name
,q.fund_short_name as res_short_name
,p.browse_dur
,p.browse_num
,p.start_time
,p.end_time
from
(
select date_format(server_time,'%%Y-%%m-%%d') as data_dt
,uid as user_id
,res_id
,sum(if(dur >= 180, 180, dur)) as browse_dur
,count(1) as browse_num
,min(start_time) as start_time
,max(end_time) as end_time
from tamp_analysis.access_log
where server_time between %s and %s
and event_type = '1002'
and `to` in ('p2060', 'p2107' ,'p2108', 'p6020', 'p2109')
and dur >= 1
and uid <> ''
and uid is not null
and res_id <> ''
and res_id is not null
group by date_format(server_time,'%%Y-%%m-%%d'), uid, res_id
) p
left join tamp_analysis.user_info_view t
on p.user_id = t.user_id
left join tamp_analysis.fund_info_view q
on p.res_id = q.fund_id
where t.user_id is not null
and q.fund_id is not null
order by p.user_id,p.start_time
'''
browse_fund_dict = fetch_all(sql, (start_time, end_time))
logging.info(f'query_user_browse_fund success')
return browse_fund_dict
def save_dwd_user_browse_fund(ret):
if ret:
logging.info('save_dwd_user_browse_fund start')
row = 0
fields = None
place_holder = None
values = list()
for i in ret:
row = row + 1
fields = ','.join([f"`{k}`" for k in i.keys()])
place_holder = ','.join(["%s" for _ in i.keys()])
values.append(tuple(i.values()))
sql = f'''replace into tamp_data_dwd.dwd_user_browse_fund ( {fields} ) values ( {place_holder} )'''
rs = insert_batch(sql, values)
if rs == row or rs == 2 * row: # 因为这里用的是replace
logging.info(f'save_dwd_user_browse_fund success {row}')
else:
logging.error(f'save_dwd_user_browse_fund error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_browse_fund(data_dt)
data_dt += delta