# -*- coding: utf-8 -*- import datetime import logging from phone import Phone from common.file_uitil import get_file_name, get_file_path from common.mysql_uitl import fetch_all, insert, insert_batch ''' 用户基础信息表,全删全插 ''' logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s",datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) file_path = get_file_path() file_name = get_file_name() def ods_users_info(): user_list = query_user() save_ods_user_info(user_list) # 查询所有用户 def query_user(): logging.info('query_user start') sql = ''' select user_id ,real_name ,user_name ,nickname ,team_id ,telephone ,company_mp ,level_grade ,level_grade_name ,coalesce(department, '') as department ,is_securities -- 是否为券商 ,sex ,channel_id ,create_time -- 创建时间 ,register_time -- 注册时间 ,investor_time -- 成为投资人的时间 ,practitioner_time -- 成为探普理顾时间 ,advisor_time -- 成为探普专家理顾时间 ,senior_adviser_time -- 成为交易理财师 from tamp_analysis.user_info_view ''' rows = fetch_all(sql, None) if rows: user_list = [] for row in rows: user_id = row[0] real_name = row[1] user_name = row[2] nickname = row[3] team_id = row[4] telephone = row[5] company_name = row[6] level_grade = row[7] level_grade_name = row[8] department = row[9] is_securities = row[10] sex = row[11] channel_id = row[12] tourist_time = row[13] # 成为游客的时间 register_time = row[14] investor_time = row[15] practitioner_time = row[16] advisor_time = row[17] senior_adviser_time = row[18] if level_grade >= 1 and (telephone[0: 8] != '17000000' or telephone[0: 8] != '18000000' or telephone[0: 8] != '19999999999'): area_dict = get_region(telephone) province = area_dict.get('province', '未知') city = area_dict.get('city', '未知') tel_operator = area_dict.get('tel_operator', '未知') user_list.append(dict( user_id=user_id, real_name=real_name, user_name=user_name, nickname=nickname, team_id=team_id, telephone=telephone, province=province, city=city, tel_operator=tel_operator, company_name=company_name, level_grade=level_grade, level_grade_name=level_grade_name, department=department, is_securities=is_securities, sex=sex, channel_id=channel_id, tourist_time=tourist_time, register_time=register_time, investor_time=investor_time, practitioner_time=practitioner_time, advisor_time=advisor_time, senior_adviser_time=senior_adviser_time )) else: user_list.append(dict( user_id=user_id, real_name=real_name, user_name=user_name, nickname=nickname, team_id=team_id, telephone=telephone, province='', city='', tel_operator='', company_name=company_name, level_grade=level_grade, level_grade_name=level_grade_name, department=department, is_securities=is_securities, sex=sex, channel_id=channel_id, tourist_time=tourist_time, register_time=register_time, investor_time=investor_time, practitioner_time=practitioner_time, advisor_time=advisor_time, senior_adviser_time=senior_adviser_time )) logging.info('query_user end') return user_list # 获取手机号归属地 def get_region(telephone): if telephone: area_dict = {} try: info = Phone().find(telephone) area_dict = dict(province=info['province'], city=info['city'], tel_operator=info['phone_type']) except Exception as e: logging.warning(telephone + ' telephone is warning: ' + str(e)) return area_dict # 保存用户基本信息 def save_ods_user_info(user_list): if user_list: logging.info('save_ods_user_info start') row = 0 fields = None place_holder = None values = [] for i in user_list: row = row + 1 fields = ','.join([f"`{k}`" for k in i.keys()]) place_holder = ','.join(["%s" for _ in i.keys()]) values.append((i['user_id'], i['real_name'], i['user_name'], i['nickname'], i['team_id'], i['telephone'], i['province'], i['city'], i['tel_operator'], i['company_name'], i['level_grade'], i['level_grade_name'], i['department'], i['is_securities'], i['sex'], i['channel_id'], i['tourist_time'], i['register_time'], i['investor_time'], i['practitioner_time'], i['advisor_time'], i['senior_adviser_time'])) # 先删除数据,再插入数据 del_sql = 'truncate tamp_data_ods.ods_user_info' fetch_all(del_sql, None) # 插入用戶信息数据 sql = f'''insert into tamp_data_ods.ods_user_info ( {fields} ) values ( {place_holder} )''' ret = insert_batch(sql, values) if ret == row: logging.info(f'save_ods_user_info success {row}') else: logging.error(f'save_ods_user_info err 数据为:{row}行,插入成功为:{ret} 行 运行路径为:{file_path}, 文件为:{file_name}') if __name__ == '__main__': ods_users_info()