dwd_user_share_event.py 6.27 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# -*- coding: utf-8 -*-


'''
用户分享事件,定时任务,每2个小时运行一次
10001	分享调仓建议 	基金行家
10002	分享持仓报告 	基金行家
10004	分享附件	产品
10005	分享专栏	工作室
10006	分享调仓报告	基金行家
10007	分享栏目	栏目
10009	分享持仓(客户详情页)	基金行家
10010	新版分享单节课程	课程
10011	新版分享课程包	课程
10012	分享无净值产品附件	产品
10014	分享拼团课程列表	课程
10015	拼单-分享邀请好友(生成海报)	课程
10016	分享拼单详情页	课程
10017	分享邀请好友领优惠券	优惠券
1006	我要转播	直播
1008	分享直播	直播
2027	分享文章/早报	工作室
2055	分享海报	工作室
2100	分享工作室	工作室
3009	分享课程	课程
3021	分享短视频	短视频
4040	分享产品	产品
4043	分享产品首页	产品
'''
import logging
侯双强's avatar
侯双强 committed
31
import os
32
import sys
侯双强's avatar
侯双强 committed
33 34
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
35 36 37

logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
侯双强's avatar
侯双强 committed
38
task_file = os.path.split(__file__)[-1].split(".")[0]
39 40 41 42 43 44 45


def dwd_user_share_event(data_dt):
    start_time = str(data_dt) + ' 00:00:00'
    end_time = str(data_dt) + ' 23:59:59'
    share_dict = query_share_event()
    user_share_event_dict = query_dwd_user_share_event(share_dict, start_time, end_time)
侯双强's avatar
侯双强 committed
46 47 48 49
    # save_dwd_user_share_event(user_share_event_dict)
    row = save_result('tamp_data_dwd', 'dwd_user_share_event', user_share_event_dict, file_name)
    now_time = now_str(YMDHMS_FORMAT)
    save_etl_log('tamp_data_dwd', 'dwd_user_share_event', data_dt, row, 'done', task_file, now_time)
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156


def query_share_event():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''select event_type from tamp_analysis.share_event_map where delete_tag = 0 '''
    share_dict = fetch_all(sql, None)
    logging.info(f'{function_name} success')
    return share_dict


def query_dwd_user_share_event(share_dict, start_time, end_time):
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = f'''
        select  p.data_dt
                ,p.user_id
                ,t.real_name 
                ,t.user_name
                ,t.nickname
                ,t.team_id
                ,t.level_grade
                ,p.env
                ,p.code_env
                ,p.res_id
                ,p.event_type
                ,p.extra_id
                ,p.share_mode
                ,p.share_type
                ,p.pid_uuid
                ,p.pid_user_id 
                ,p.ubr_uuid  
                ,p.ubr_source_ref_id 
                ,p.source_user_id
                ,p.source_uuid_id
                ,p.local_time
        from 
        (
            select  date_format(server_time,'%%Y-%%m-%%d') as data_dt
                    ,coalesce(env, '') as env
                    ,coalesce(code_env, '') as code_env
                    ,uid as user_id
                    ,res_id
                    ,event_type
                    ,coalesce(extra_id, '') as extra_id
                    ,coalesce(share_mode, '') as share_mode
                    ,coalesce(type, '') as share_type
                    ,coalesce(pid_uuid, '') as pid_uuid
                    ,coalesce(pid_user_id, '') as pid_user_id
                    ,coalesce(ubr_uuid, '') as ubr_uuid
                    ,coalesce(ubr_source_ref_id, '') as ubr_source_ref_id
                    ,case when coalesce(ubr_source_ref_id, '') <> '' then ubr_source_ref_id
                          when coalesce(ubr_source_ref_id, '') = '' and coalesce(pid_user_id, '') <> '' then pid_user_id
                          else  '' 
                    end   as source_user_id
                    ,case when coalesce(ubr_uuid, '') <> '' then ubr_uuid
                          when coalesce(ubr_uuid, '') = '' and coalesce(pid_uuid, '') <> '' then pid_uuid
                          else  '' 
                    end   as source_uuid_id
                    ,local_time
                    ,row_number() over(partition by uid,event_type,res_id,ubr_uuid,pid_uuid,
                                        ubr_uuid,ubr_uuid order by local_time) as row_rank
            from    tamp_analysis.access_log
            where   server_time between %s and %s
            and     event_type in ({','.join(["'%s'" % item['event_type'] for item in share_dict])})
            and     uid <> ''
            and     uid is not null
            and     res_id <> ''
            and     res_id is not null
        ) p
        left    join  tamp_analysis.user_info_view t
        on      p.user_id = t.user_id
        where   t.user_id is not null 
        and     p.row_rank = 1
        and     p.source_uuid_id <> ''
        and     p.source_user_id <> ''
        order   by p.user_id, p.local_time 
    '''
    user_share_event_dict = fetch_all(sql, (start_time, end_time))
    logging.info(f'{function_name} success')
    return user_share_event_dict


def save_dwd_user_share_event(ret):
    if ret:
        function_name = sys._getframe().f_code.co_name
        logging.info(f'{function_name} start')
        row = 0
        fields = None
        place_holder = None
        values = list()
        for i in ret:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append(tuple(i.values()))
        sql = f'''replace into tamp_data_dwd.dwd_user_share_event ( {fields} ) values ( {place_holder} )'''
        rs = insert_batch(sql, values)
        print('row=' + str(row) + ' rs=' + str(rs))
        if rs == row or rs == 2 * row:  # 因为这里用的是replace
            logging.info(f'{function_name} success {row}')
        else:
            logging.error(f'{function_name} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')


if __name__ == '__main__':
    import datetime
侯双强's avatar
侯双强 committed
157 158
    begin = datetime.date(2021, 9, 15)
    end = datetime.date(2021, 9, 22)
159 160 161 162 163 164
    data_dt = begin
    delta = datetime.timedelta(days=1)
    while data_dt <= end:
        print(data_dt.strftime("%Y-%m-%d"))
        dwd_user_share_event(data_dt)
        data_dt += delta