ads_user_basic_behavior_bak.py 5.04 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
# -*- coding: utf-8 -*-


'''
用户登录行为汇总,定时任务,每2个小时运行一次
'''

from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch

import logging
import sys

from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch


logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]


def ads_user_basic_behavior():
    user_basic_behavior_dict = query_ads_user_basic_behavior_tmp()
    save_ads_user_basic_behavior_tmp(user_basic_behavior_dict)
    user_behavior_ret_dict = query_ads_user_basic_behavior()
    save_ads_user_basic_behavior(user_behavior_ret_dict)


# 这里有点细节,没有严格去做。
def query_ads_user_basic_behavior_tmp():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    sql = '''
            select  env.user_id
                    ,env.top_env_name
                    ,env.recent_env_name
                    ,env.recent_time
                    ,coalesce(area.login_area, '') as login_area
                    ,coalesce(area.top_area, '') as top_area
                    ,coalesce(dur.live_total_dur, 0) as live_total_dur
                    ,coalesce(dur.course_total_dur, 0) as course_total_dur
                    ,coalesce(tel.phone_mode, '') as phone_mode
                    ,coalesce(tel.phone_name, '') as phone_name 
                    ,coalesce(tel.phone_brand, '') as phone_brand
            from    tamp_data_dws.dws_user_login_environment env
            left    join tamp_data_dws.dws_user_login_area area
            on      env.user_id = area.user_id 
            left    join  tamp_data_dws.dws_user_learn_total_dur dur
            on      env.user_id = dur.user_id 
            left    join   tamp_data_dws.dws_user_login_phone_mode tel
            on      env.user_id = tel.user_id 
        '''
    user_basic_behavior_dict = fetch_all(sql, None)
    logging.info(f'{function_name} success')
    return user_basic_behavior_dict


def query_ads_user_basic_behavior():
    function_name = sys._getframe().f_code.co_name
    sql = '''
        select   p.user_id
                ,p.real_name 
                ,p.user_name
                ,p.nickname
                ,p.team_id
                ,p.level_grade
                ,t.top_env_name
                ,t.recent_env_name
                ,t.recent_time
                ,t.login_area
                ,t.top_area
                ,t.live_total_dur
                ,t.course_total_dur
                ,t.phone_mode
                ,t.phone_name
                ,t.phone_brand
        from     tamp_analysis.user_info_view p
        inner   join tamp_data_ads.ads_user_basic_behavior_tmp t 
        on      p.user_id = t.user_id 
    '''
    user_behavior_ret_dict = fetch_all(sql, None)
    logging.info(f'{function_name} success')
    return user_behavior_ret_dict


def save_ads_user_basic_behavior_tmp(ret):
    function_name = sys._getframe().f_code.co_name
    if ret:
        logging.info(f'{function_name} start')
        row = 0
        fields = None
        place_holder = None
        values = list()
        for i in ret:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append(tuple(i.values()))
        sql = f'''replace into tamp_data_ads.ads_user_basic_behavior_tmp ( {fields} ) values ( {place_holder} )'''
        rs = insert_batch(sql, values)
        if rs == row or rs == 2 * row:  # 因为这里用的是replace
            logging.info(f'{function_name} success {row}')
        else:
            logging.error(f'{function_name} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')


def save_ads_user_basic_behavior(ret):
    function_name = sys._getframe().f_code.co_name
    if ret:
        logging.info(f'{function_name} start')
        row = 0
        fields = None
        place_holder = None
        values = list()
        for i in ret:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append(tuple(i.values()))
        sql = f'''replace into tamp_data_ads.ads_user_basic_behavior ( {fields} ) values ( {place_holder} )'''
        rs = insert_batch(sql, values)
        if rs == row or rs == 2 * row:  # 因为这里用的是replace
            logging.info(f'{function_name} success {row}')
        else:
            logging.error(f'{function_name} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')


if __name__ == '__main__':
    ads_user_basic_behavior()
    import datetime
    # begin = datetime.date(2021, 9, 7)
    # end = datetime.date(2021, 9, 7)
    # data_dt = begin
    # delta = datetime.timedelta(days=1)
    # while data_dt <= end:
    #     print(data_dt.strftime("%Y-%m-%d"))
    #     ads_user_login_behavior()
    #     data_dt += delta