ods_users_info.py 6.62 KB
Newer Older

# -*- coding: utf-8 -*-
import datetime
import logging

from phone import Phone

from common.file_uitil import get_file_name, get_file_path
from common.mysql_uitl import fetch_all, insert, insert_batch

'''
用户基础信息表,全删全插
'''

logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s",datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)

file_path = get_file_path()
file_name = get_file_name()


def ods_users_info():
    user_list = query_user()
    save_ods_user_info(user_list)


# 查询所有用户
def query_user():
    logging.info('query_user start')
    sql = '''
        select  user_id
                ,real_name
                ,user_name
                ,nickname
                ,team_id
                ,telephone
                ,company_mp 
                ,level_grade
                ,level_grade_name
                ,coalesce(department, '') as department
                ,is_securities                       -- 是否为券商
                ,sex
                ,channel_id
                ,create_time                          -- 创建时间
                ,register_time                        -- 注册时间
                ,investor_time                        -- 成为投资人的时间
                ,practitioner_time                    -- 成为探普理顾时间
                ,advisor_time                         -- 成为探普专家理顾时间
                ,senior_adviser_time                  -- 成为交易理财师
         from   tamp_analysis.user_info_view 
    '''
    rows = fetch_all(sql, None)
    if rows:
        user_list = []
        for row in rows:
            user_id = row[0]
            real_name = row[1]
            user_name = row[2]
            nickname = row[3]
            team_id = row[4]
            telephone = row[5]
            company_name = row[6]
            level_grade = row[7]
            level_grade_name = row[8]
            department = row[9]
            is_securities = row[10]
            sex = row[11]
            channel_id = row[12]
            tourist_time = row[13]  # 成为游客的时间
            register_time = row[14]
            investor_time = row[15]
            practitioner_time = row[16]
            advisor_time = row[17]
            senior_adviser_time = row[18]
            if level_grade >= 1 and (telephone[0: 8] != '17000000' or telephone[0: 8] != '18000000'
                                     or telephone[0: 8] != '19999999999'):
                area_dict = get_region(telephone)
                province = area_dict.get('province', '未知')
                city = area_dict.get('city', '未知')
                tel_operator = area_dict.get('tel_operator', '未知')
                user_list.append(dict(
                    user_id=user_id,
                    real_name=real_name,
                    user_name=user_name,
                    nickname=nickname,
                    team_id=team_id,
                    telephone=telephone,
                    province=province,
                    city=city,
                    tel_operator=tel_operator,
                    company_name=company_name,
                    level_grade=level_grade,
                    level_grade_name=level_grade_name,
                    department=department,
                    is_securities=is_securities,
                    sex=sex,
                    channel_id=channel_id,
                    tourist_time=tourist_time,
                    register_time=register_time,
                    investor_time=investor_time,
                    practitioner_time=practitioner_time,
                    advisor_time=advisor_time,
                    senior_adviser_time=senior_adviser_time
                ))
            else:
                user_list.append(dict(
                    user_id=user_id,
                    real_name=real_name,
                    user_name=user_name,
                    nickname=nickname,
                    team_id=team_id,
                    telephone=telephone,
                    province='',
                    city='',
                    tel_operator='',
                    company_name=company_name,
                    level_grade=level_grade,
                    level_grade_name=level_grade_name,
                    department=department,
                    is_securities=is_securities,
                    sex=sex,
                    channel_id=channel_id,
                    tourist_time=tourist_time,
                    register_time=register_time,
                    investor_time=investor_time,
                    practitioner_time=practitioner_time,
                    advisor_time=advisor_time,
                    senior_adviser_time=senior_adviser_time
                ))
        logging.info('query_user end')
        return user_list


# 获取手机号归属地
def get_region(telephone):
    if telephone:
        area_dict = {}
        try:
            info = Phone().find(telephone)
            area_dict = dict(province=info['province'], city=info['city'], tel_operator=info['phone_type'])

        except Exception as e:
            logging.warning(telephone + ' telephone is warning: ' + str(e))
        return area_dict


# 保存用户基本信息
def save_ods_user_info(user_list):
    if user_list:
        logging.info('save_ods_user_info start')
        row = 0
        fields = None
        place_holder = None
        values = []
        for i in user_list:
            row = row + 1
            fields = ','.join([f"`{k}`" for k in i.keys()])
            place_holder = ','.join(["%s" for _ in i.keys()])
            values.append((i['user_id'], i['real_name'], i['user_name'], i['nickname'], i['team_id'], i['telephone'],
                           i['province'], i['city'], i['tel_operator'], i['company_name'], i['level_grade'],
                           i['level_grade_name'], i['department'], i['is_securities'], i['sex'], i['channel_id'],
                           i['tourist_time'], i['register_time'], i['investor_time'], i['practitioner_time'],
                           i['advisor_time'], i['senior_adviser_time']))

        # 先删除数据,再插入数据
        del_sql = 'truncate tamp_data_ods.ods_user_info'
        fetch_all(del_sql, None)
        # 插入用戶信息数据
        sql = f'''insert into tamp_data_ods.ods_user_info ( {fields} ) values ( {place_holder} )'''
        ret = insert_batch(sql, values)
        if ret == row:
            logging.info(f'save_ods_user_info success {row}')
        else:
            logging.error(f'save_ods_user_info err 数据为:{row}行,插入成功为:{ret} 行 运行路径为:{file_path}, 文件为:{file_name}')


if __name__ == '__main__':
    ods_users_info()