Commit 86295666 authored by 侯双强's avatar 侯双强

新上线CRM用户行为数据

parent baa4de36
# -*- coding: utf-8 -*-
import logging
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
用户观看短视频明细数据统计,定时任务,每2个小时运行一次
先上线,如果要用访问线索,需要调整这里的逻辑,包括(dws_user_share_event, dws_user_visitor_clues)
一张明细表和一张汇总表
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
def ads_user_watch_short_video(data_dt):
watch_short_video_details_dict = query_user_watch_short_video_details(data_dt)
watch_short_video_summary_dict = query_user_watch_short_video_summary()
save_result('tamp_data_ads', 'ads_user_watch_short_video_details', watch_short_video_details_dict, file_name)
save_result('tamp_data_ads', 'ads_user_watch_short_video_summary', watch_short_video_summary_dict, file_name)
def query_user_watch_short_video_details(data_dt):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select p.data_dt
,p.user_id
,q.real_name
,q.user_name
,q.nickname
,q.team_id
,p.res_id
,p.res_name
,p.res_dur
,p.learn_dur
,p.play_rate
,p.online_time
,p.start_time
,p.end_time
,coalesce(p.share_num, 0) as share_num
,coalesce(t.invite_num, 0) as invite_num
from tamp_data_dws.dws_user_watch_short_video p
left join
(
select data_dt
,source_user_id
,res_id
,count(distinct user_id) as invite_num
from tamp_data_dwd.dwd_user_visit_clues
where data_dt = %s
and current_page = 'p1037'
group by data_dt,source_user_id,res_id
) t
on p.user_id = t.source_user_id
and p.res_id = t.res_id
left join tamp_analysis.user_info_view q
on p.user_id = q.user_id
where p.data_dt = %s
'''
watch_short_video_details_dict = fetch_all(sql, (data_dt, data_dt))
logging.info(f'{function_name} success')
return watch_short_video_details_dict
def query_user_watch_short_video_summary():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select p.data_dt
,p.user_id
,q.real_name
,q.user_name
,q.nickname
,q.team_id
,p.res_id
,p.res_name
,p.res_dur
,p.total_dur
,if(p.play_rate >=100.00, 100.00, p.play_rate) as play_rate
,coalesce(p.share_num, 0) as share_num
,coalesce(t.invite_num, 0) as invite_num
from
(
select user_id
,max(data_dt) as data_dt
,res_id
,res_name
,res_dur
,sum(learn_dur) as total_dur
,round(sum(learn_dur) / res_dur * 100, 2) as play_rate
,sum(share_num) as share_num
from tamp_data_dws.dws_user_watch_short_video
group by user_id,res_id,res_name,res_dur
) p
left join
(
select source_user_id
,res_id
,count(distinct user_id) as invite_num
from tamp_data_dwd.dwd_user_visit_clues
where current_page = 'p1037'
group by source_user_id,res_id
) t
on p.user_id = t.source_user_id
and p.res_id = t.res_id
left join tamp_analysis.user_info_view q
on p.user_id = q.user_id
order by p.user_id, p.data_dt desc, p.res_id
'''
watch_short_video_summary_dict = fetch_all(sql, None)
logging.info(f'{function_name} success')
return watch_short_video_summary_dict
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 14)
end = datetime.date(2021, 9, 14)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
ads_user_watch_short_video(data_dt)
data_dt += delta
# -*- coding: utf-8 -*-
'''
用户观看短视频明细统计,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch, save_result
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]
def dwd_user_watch_short_video(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
watch_short_video_dict = query_dwd_user_watch_short_video(start_time, end_time)
save_result('tamp_data_dwd', 'dwd_user_watch_short_video', watch_short_video_dict, file_name)
def query_dwd_user_watch_short_video(start_time, end_time):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select p.data_dt
,p.user_id
,t.real_name
,t.user_name
,t.nickname
,t.team_id
,t.level_grade
,p.res_id
,q.res_name
,p.learn_dur
,q.dur as res_dur
,round(p.learn_dur / q.dur * 100, 2) as play_rate
,p.start_time
,p.end_time
,q.create_time as online_time
from
(
select date_format(server_time,'%%Y-%%m-%%d') as data_dt
,uid as user_id
,res_id
,sum(dur) as learn_dur
,min(start_time) as start_time
,max(end_time) as end_time
from tamp_analysis.access_log
where server_time between %s and %s
and event_type = '3017'
and dur >= 1
and uid <> ''
and uid is not null
and res_id <> ''
and res_id is not null
group by date_format(server_time,'%%Y-%%m-%%d'), uid, res_id
) p
left join tamp_analysis.user_info_view t
on p.user_id = t.user_id
left join tamp_analysis.short_video_view q
on p.res_id = q.res_id
where t.user_id is not null
and q.res_id is not null
order by p.user_id,p.start_time
'''
watch_short_video_dict = fetch_all(sql, (start_time, end_time))
logging.info(f'{function_name} success')
return watch_short_video_dict
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_watch_short_video(data_dt)
data_dt += delta
\ No newline at end of file
# -*- coding: utf-8 -*-
import logging
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
用户观看短视频明细数据统计,定时任务,每2个小时运行一次
先上线,如果要用访问线索,需要调整这里的逻辑,包括(dws_user_share_event, dws_user_visitor_clues)
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
def dws_user_watch_short_video(data_dt):
share_short_video_dict = query_dws_user_share_short_video_num(data_dt)
watch_short_video_dict = query_dws_user_watch_short_video(data_dt)
merge_short_video_result_dict = merge_short_video_dict(watch_short_video_dict, share_short_video_dict)
save_result('tamp_data_dws', 'dws_user_watch_short_video', merge_short_video_result_dict, file_name)
# 分享短视频
def query_dws_user_share_short_video_num(data_dt):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select p.data_dt
,p.source_user_id as user_id
,p.res_id
,t.res_name
,t.dur as res_dur
,0 as learn_dur
,0.0 as play_rate
,t.create_time as online_time
,min(p.local_time) as start_time
,max(p.local_time) as end_time -- 为了避免,因为分享多次,就是多条数据
,count(p.res_id) as share_num
from tamp_data_dwd.dwd_user_share_event p
left join tamp_analysis.short_video_view t
on p.res_id = t.res_id
where p.data_dt = %s
and p.event_type = '3021'
group by p.source_user_id, p.res_id
'''
share_short_video_dict = fetch_all(sql, data_dt)
logging.info(f'{function_name} success')
return share_short_video_dict
def query_dws_user_watch_short_video(data_dt):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = f'''
select data_dt
,user_id
,res_id
,res_name
,res_dur
,learn_dur
,play_rate
,online_time
,start_time
,end_time
from tamp_data_dwd.dwd_user_watch_short_video
where data_dt = %s
'''
watch_short_video_dict = fetch_all(sql, data_dt)
logging.info(f'{function_name} success')
return watch_short_video_dict
def merge_short_video_dict(watch_short_video_dict, share_short_video_dict):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
merge_short_video_tmp = list()
merge_short_video_result = list()
# 用户观看了直播
if watch_short_video_dict:
for x in watch_short_video_dict:
for y in share_short_video_dict:
# 用户观看并且分享了直播
if x['res_id'] == y['res_id'] and x['user_id'] == y['user_id']:
x['share_num'] = y['share_num']
merge_short_video_tmp.append(x)
else:
x['share_num'] = 0
merge_short_video_tmp.append(x)
# 用户没有观看直播,有分享直播
else:
logging.info(f'{function_name} start')
for x in share_short_video_dict:
merge_short_video_tmp.append(x)
# 去重
for i in merge_short_video_tmp:
if i not in merge_short_video_result:
merge_short_video_result.append(i)
logging.info(f'{function_name} success')
return merge_short_video_result
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_watch_short_video(data_dt)
data_dt += delta
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment