ods_users_info.py 6.62 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
# -*- coding: utf-8 -*-
import datetime
import logging

from phone import Phone

from common.file_uitil import get_file_name, get_file_path
from common.mysql_uitl import fetch_all, insert, insert_batch

'''
用户基础信息表,全删全插
'''

logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s",datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)

file_path = get_file_path()
file_name = get_file_name()


def ods_users_info():
    user_list = query_user()
    save_ods_user_info(user_list)


# 查询所有用户
def query_user():
    logging.info('query_user start')
    sql = '''
        select  user_id
                ,real_name
                ,user_name
                ,nickname
                ,team_id
                ,telephone
                ,company_mp 
                ,level_grade
                ,level_grade_name
                ,coalesce(department, '') as department
                ,is_securities                       -- 是否为券商
                ,sex
                ,channel_id
                ,create_time                          -- 创建时间
                ,register_time                        -- 注册时间
                ,investor_time                        -- 成为投资人的时间
                ,practitioner_time                    -- 成为探普理顾时间
                ,advisor_time                         -- 成为探普专家理顾时间
                ,senior_adviser_time                  -- 成为交易理财师
         from   tamp_analysis.user_info_view 
    '''
    rows = fetch_all(sql, None)
    if rows:
        user_list = []
        for row in rows:
            user_id = row[0]
            real_name = row[1]
            user_name = row[2]
            nickname = row[3]
            team_id = row[4]
            telephone = row[5]
            company_name = row[6]
            level_grade = row[7]
            level_grade_name = row[8]
            department = row[9]
            is_securities = row[10]
            sex = row[11]
            channel_id = row[12]
            tourist_time = row[13]  # 成为游客的时间
            register_time = row[14]
            investor_time = row[15]
            practitioner_time = row[16]
            advisor_time = row[17]
            senior_adviser_time = row[18]
            if level_grade >= 1 and (telephone[0: 8] != '17000000' or telephone[0: 8] != '18000000'
                                     or telephone[0: 8] != '19999999999'):
                area_dict = get_region(telephone)
                province = area_dict.get('province', '未知')
                city = area_dict.get('city', '未知')
                tel_operator = area_dict.get('tel_operator', '未知')
                user_list.append(dict(
                    user_id=user_id,
                    real_name=real_name,
                    user_name=user_name,
                    nickname=nickname,
                    team_id=team_id,
                    telephone=telephone,
                    province=province,
                    city=city,
                    tel_operator=tel_operator,
                    company_name=company_name,
                    level_grade=level_grade,
                    level_grade_name=level_grade_name,
                    department=department,
                    is_securities=is_securities,
                    sex=sex,
                    channel_id=channel_id,
                    tourist_time=tourist_time,
                    register_time=register_time,
                    investor_time=investor_time,
                    practitioner_time=practitioner_time,
                    advisor_time=advisor_time,
                    senior_adviser_time=senior_adviser_time
                ))
            else:
                user_list.append(dict(
                    user_id=user_id,
                    real_name=real_name,
                    user_name=user_name,
                    nickname=nickname,
                    team_id=team_id,
                    telephone=telephone,
                    province='',
                    city='',
                    tel_operator='',
                    company_name=company_name,
                    level_grade=level_grade,
                    level_grade_name=level_grade_name,
                    department=department,
                    is_securities=is_securities,
                    sex=sex,
                    channel_id=channel_id,
                    tourist_time=tourist_time,
                    register_time=register_time,
                    investor_time=investor_time,
                    practitioner_time=practitioner_time,
                    advisor_time=advisor_time,
                    senior_adviser_time=senior_adviser_time
                ))
        logging.info('query_user end')
        return user_list


# 获取手机号归属地
def get_region(telephone):
    if telephone:
        area_dict = {}
        try:
            info = Phone().find(telephone)
            area_dict = dict(province=info['province'], city=info['city'], tel_operator=info['phone_type'])

        except Exception as e:
            logging.warning(telephone + ' telephone is warning: ' + str(e))
        return area_dict


# 保存用户基本信息
def save_ods_user_info(user_list):
    if user_list:
        logging.info('save_ods_user_info start')
        row = 0
        fields = None
        place_holder = None
        values = []
        for i in user_list:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append((i['user_id'], i['real_name'], i['user_name'], i['nickname'], i['team_id'], i['telephone'],
                           i['province'], i['city'], i['tel_operator'], i['company_name'], i['level_grade'],
                           i['level_grade_name'], i['department'], i['is_securities'], i['sex'], i['channel_id'],
                           i['tourist_time'], i['register_time'], i['investor_time'], i['practitioner_time'],
                           i['advisor_time'], i['senior_adviser_time']))

        # 先删除数据,再插入数据
        del_sql = 'truncate tamp_data_ods.ods_user_info'
        fetch_all(del_sql, None)
        # 插入用戶信息数据
        sql = f'''insert into tamp_data_ods.ods_user_info ( {fields} ) values ( {place_holder} )'''
        ret = insert_batch(sql, values)
        if ret == row:
            logging.info(f'save_ods_user_info success {row}')
        else:
            logging.error(f'save_ods_user_info err 数据为:{row}行,插入成功为:{ret} 行 运行路径为:{file_path}, 文件为:{file_name}')


if __name__ == '__main__':
    ods_users_info()