dwd_user_browse_fund.py 3.81 KB
# -*- coding: utf-8 -*-


'''
用户浏览基金详情,定时任务,每2个小时运行一次
'''
import logging
import sys
from common.mysql_uitl import fetch_all, insert_batch


logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]


def dwd_user_browse_fund(data_dt):
    start_time = str(data_dt) + ' 00:00:00'
    end_time = str(data_dt) + ' 23:59:59'
    browse_fund_dict = query_user_browse_fund(start_time, end_time)
    save_dwd_user_browse_fund(browse_fund_dict)



def query_user_browse_fund(start_time, end_time):
    '''
        p2060   产品详情(包括白名单和私募)(专栏中)
        p2107   产品详情(公募)
        p2108   产品详情(理财师添加)
        p6020	导入基金(添加导入基金的页面)
        p2109	无净值产品详情页面
    '''
    logging.info(f'query_user_browse_fund start')
    sql = '''
        select	p.data_dt
                ,p.user_id
                ,t.real_name 
                ,t.user_name
                ,t.nickname
                ,t.team_id
                ,t.level_grade
                ,p.res_id
                ,q.fund_type as res_type
                ,q.fund_name as res_name
                ,q.fund_short_name as res_short_name
                ,p.browse_dur
                ,p.browse_num
                ,p.start_time
                ,p.end_time
        from    
        (
            select  date_format(server_time,'%%Y-%%m-%%d') as data_dt
                    ,uid as user_id
                    ,res_id
                    ,sum(if(dur >= 180, 180, dur)) as browse_dur
                    ,count(1) as browse_num
                    ,min(start_time) as start_time
                    ,max(end_time) as end_time
            from    tamp_analysis.access_log
            where   server_time between %s and %s
            and     event_type = '1002'
            and     `to` in ('p2060', 'p2107' ,'p2108', 'p6020', 'p2109')
            and     dur >= 1
            and     uid <> ''
            and     uid is not null
            and     res_id <> ''
            and     res_id is not null
            group   by date_format(server_time,'%%Y-%%m-%%d'), uid, res_id
        ) p 
        left    join  tamp_analysis.user_info_view t
        on      p.user_id = t.user_id
        left    join    tamp_analysis.fund_info_view q
        on      p.res_id = q.fund_id
        where   t.user_id is not null 
        and     q.fund_id is not null 
        order   by p.user_id,p.start_time 
        '''
    browse_fund_dict = fetch_all(sql, (start_time, end_time))
    logging.info(f'query_user_browse_fund success')
    return browse_fund_dict


def save_dwd_user_browse_fund(ret):
    if ret:
        logging.info('save_dwd_user_browse_fund start')
        row = 0
        fields = None
        place_holder = None
        values = list()
        for i in ret:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append(tuple(i.values()))
        sql = f'''replace into tamp_data_dwd.dwd_user_browse_fund ( {fields} ) values ( {place_holder} )'''
        rs = insert_batch(sql, values)
        if rs == row or rs == 2 * row:  # 因为这里用的是replace
            logging.info(f'save_dwd_user_browse_fund success {row}')
        else:
            logging.error(f'save_dwd_user_browse_fund error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')


if __name__ == '__main__':
    import datetime
    begin = datetime.date(2021, 4, 1)
    end = datetime.date(2021, 9, 14)
    data_dt = begin
    delta = datetime.timedelta(days=1)
    while data_dt <= end:
        print(data_dt.strftime("%Y-%m-%d"))
        dwd_user_browse_fund(data_dt)
        data_dt += delta