#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Time : 2020/11/18 19:12 # @Author : Jie. Z # @Email : zhaojiestudy@163.com # @File : data_service.py # @Software: PyCharm import pandas as pd import numpy as np from sqlalchemy import and_ import tushare as ts import datetime import math from decimal import Decimal from app.api.engine import tamp_user_engine, tamp_product_engine, TAMP_SQL, tamp_fund_engine, tamp_diagnose_app_engine # from app.model.tamp_user_models import CustomerOrder, CustomerInfo # from app.model.tamp_product_models import FundInfo from app.utils.fund_rank import get_frequency, get_trade_cal class UserCustomerDataAdaptor: user_id = "" customer_id = "" customer_real_name = "" month_date = "" end_date = "" group_data = {} trade_cal_date = None all_fund_distribution = {} all_fund_performance = {} def __init__(self, user_id, customer_id, end_date=str(datetime.date.today()), index_id="IN0000007M"): self.user_id = user_id self.customer_id = customer_id self.compare_index_id = index_id p_end_date = pd.to_datetime(end_date).date() # p_end_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1) - datetime.timedelta(days=1) self.end_date = pd.to_datetime(str(p_end_date)) # self.end_date = pd.to_datetime("2020-12-11") p_start_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1) self.month_start_date = p_start_date # self.month_start_date = pd.to_datetime("2020-12-01") self.user_customer_order_df = self.get_user_customer_order_data() self.fund_nav_total, self.fund_cnav_total = self.get_customer_fund_nav_data() self.index_df = self.get_customer_index_nav_data() self.total_customer_order_cnav_df = self.total_combine_data() self.group_operate() @staticmethod def get_trade_cal(start_date, end_date): try: df = get_trade_cal() df = df[df["cal_date"] >= start_date] df.drop(['end_date'], axis=1, inplace=True) df.rename(columns={'cal_date': 'end_date'}, inplace=True) df["datetime"] = df["end_date"].apply(lambda x: datetime.datetime.strptime(x, "%Y%m%d")) return df except: pass ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc') pro = ts.pro_api() if end_date is not None: df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1') else: df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1') df.drop(['exchange', 'is_open'], axis=1, inplace=True) df.rename(columns={'cal_date': 'end_date'}, inplace=True) df["datetime"] = df["end_date"].apply(lambda x: datetime.datetime.strptime(x, "%Y%m%d")) return df # 获取理财师下该用户所有订单列表 def get_user_customer_order_data(self): with TAMP_SQL(tamp_fund_engine) as tamp_fund, TAMP_SQL(tamp_product_engine) as tamp_product, TAMP_SQL(tamp_diagnose_app_engine) as tamp_diagnose_app: tamp_diagnose_app_session = tamp_diagnose_app.session tamp_product_session = tamp_product.session tamp_fund_session = tamp_fund.session sql_user = """select f1.fund_id, f2.realname,f3.customer_name,f1.type,f1.order_type,f1.pay_date,f1.subscription_fee,f1.confirm_share_date,f1.confirm_share,f1.confirm_amount,f1.nav,f1.folio_name from tamp_diagnose_app.customer_order_view f1, tamp_user.user_info f2,tamp_diagnose_app.customer_view f3 where f2.id=f1.user_id and f3.id=f1.customer_id and f1.delete_tag=0 and user_id='{}' and customer_id='{}'""".format(self.user_id, self.customer_id) cur = tamp_diagnose_app_session.execute(sql_user) data = cur.fetchall() order_df = pd.DataFrame(list(data), columns=['fund_id', 'username', 'customer_name', 'type', 'order_type', 'pay_date', 'subscription_fee', 'confirm_share_date', 'confirm_share', 'confirm_amount', 'nav', 'folio_name']) cur_fund_id = list(order_df["fund_id"].unique()) fund_list_str = str(cur_fund_id).replace("[", "(").replace("]", ")") sql_product = "select distinct `id`, `fund_short_name`, `nav_frequency`, `substrategy` from `fund_info`" cur = tamp_product_session.execute(sql_product) data = cur.fetchall() product_df = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'freq', 'substrategy']) if len(cur_fund_id) > 0: sql_fund = "select distinct `id`, `fund_short_name`, `nav_frequency`, `substrategy` from `fund_info` where `id` in {}".format(fund_list_str) cur = tamp_fund_session.execute(sql_fund) data = cur.fetchall() fund_df = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'freq', 'substrategy']) if len(fund_df) > 0: product_df = product_df.append(fund_df) product_df = product_df.drop_duplicates("fund_id") user_customer_order_df = order_df.set_index('fund_id').join(product_df.set_index('fund_id')).reset_index() user_customer_order_df["confirm_share_date"] = user_customer_order_df["confirm_share_date"].apply(lambda x: pd.to_datetime(x.date())) self.customer_real_name = user_customer_order_df["customer_name"].values[0] self.ifa_real_name = user_customer_order_df["username"].values[0] user_customer_order_df = user_customer_order_df[user_customer_order_df["confirm_share_date"] <= self.end_date] user_customer_order_df["confirm_amount"] = user_customer_order_df["confirm_amount"] - user_customer_order_df["subscription_fee"] user_customer_order_df.index = pd.Series(range(len(user_customer_order_df))) for index, row in user_customer_order_df.iterrows(): if row["order_type"] == 2: need_less_share = row["confirm_share"] for index_ori, row_ori in user_customer_order_df.iterrows(): if index_ori >= index: break if row["fund_id"] == row_ori["fund_id"] and row_ori["order_type"] == 1: ori_share = row_ori["confirm_share"] if need_less_share >= ori_share: need_less_share -= ori_share user_customer_order_df.loc[index_ori, "confirm_share"] = 0 else: need_less_share = 0 user_customer_order_df.loc[index_ori, "confirm_share"] = row_ori[ "confirm_share"] - need_less_share if need_less_share <= 0: break user_customer_order_df = user_customer_order_df[(user_customer_order_df["order_type"] == 1)&(user_customer_order_df["confirm_share"] > 0)] user_customer_order_df.index = pd.Series(range(len(user_customer_order_df))) self.start_date = user_customer_order_df["confirm_share_date"].min() return user_customer_order_df # 获取客户持有的基金净值数据 def get_customer_fund_nav_data(self): with TAMP_SQL(tamp_product_engine) as tamp_product, TAMP_SQL(tamp_fund_engine) as tamp_fund: tamp_product_session = tamp_product.session tamp_fund_session = tamp_fund.session now_date = datetime.datetime.now().strftime("%Y%m%d") trade_date_df = self.get_trade_cal("20000101", now_date) self.trade_cal_date = trade_date_df all_fund_nav = pd.DataFrame(index=trade_date_df["datetime"]) all_fund_cnav = pd.DataFrame(index=trade_date_df["datetime"]) temp_fund_id = [] # for cur_fund_id in self.user_customer_order_df["fund_id"].unique(): for index, row in self.user_customer_order_df.iterrows(): cur_fund_id = row["fund_id"] if cur_fund_id in temp_fund_id: continue else: temp_fund_id.append(cur_fund_id) fund_type = row["type"] # 对应基金净值 if fund_type == 0: sql = """select distinct `end_date`, `unit_nav`,`accum_nav` from `public_fund_nav` where `id`='{}' order by `end_date` ASC""".format(cur_fund_id) cur = tamp_fund_session.execute(sql) elif fund_type == 1: sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `fund_nav` where `fund_id`='{}' order by `price_date` ASC""".format( cur_fund_id) cur = tamp_fund_session.execute(sql) elif fund_type == 2: sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `fund_nav` where `fund_id`='{}' order by `price_date` ASC""".format( cur_fund_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() cur_fund_nav_df = pd.DataFrame(list(data), columns=['price_date', 'nav', 'cnav']) # # 对应基金分红 sql = """select distinct `distribute_date`, `distribution` from `fund_distribution` where `fund_id`='{}' and `distribute_type`='1' order by `distribute_date` ASC""".format( cur_fund_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() cur_fund_distribution_df = pd.DataFrame(list(data), columns=['price_date', 'distribution']) self.all_fund_distribution[cur_fund_id] = cur_fund_distribution_df # 对应基金performance数据 sql = """select distinct `price_date`, `ret_1w`, `ret_cum_1m`, `ret_cum_6m`, `ret_cum_1y`, `ret_cum_ytd`, `ret_cum_incep` from `fund_performance` where `fund_id`='{}' order by `price_date` ASC""".format( cur_fund_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() cur_fund_performance_df = pd.DataFrame(list(data), columns=['price_date', 'ret_1w', 'ret_cum_1m', 'ret_cum_6m', 'ret_cum_1y', 'ret_cum_ytd', 'ret_cum_incep']) cur_fund_performance_df = cur_fund_performance_df[ cur_fund_performance_df["price_date"] <= self.end_date.date()] self.all_fund_performance[cur_fund_id] = cur_fund_performance_df cur_fund_nav_df["price_date"] = pd.to_datetime(cur_fund_nav_df["price_date"]) cur_fund_nav_df.drop_duplicates(subset="price_date", keep='first', inplace=True) cur_fund_nav_df.set_index("price_date", inplace=True) cur_fund_nav_df = cur_fund_nav_df[cur_fund_nav_df.index.isin(all_fund_nav.index)] if math.isnan(row["freq"]): try: freq = get_frequency(cur_fund_nav_df) except: freq = 250 timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120} self.user_customer_order_df.loc[index, "freq"] = timeoffset_dict[freq] all_fund_nav[cur_fund_id] = cur_fund_nav_df["nav"] all_fund_cnav[cur_fund_id] = cur_fund_nav_df["cnav"] all_fund_nav = all_fund_nav[all_fund_nav.index <= self.end_date] all_fund_cnav = all_fund_cnav[all_fund_cnav.index <= self.end_date] # for cur_fund_id in self.user_customer_order_df["fund_id"].unique(): # all_fund_nav[cur_fund_id][all_fund_nav[cur_fund_id].apply(lambda x: math.isnan(x))]=np.nan # all_fund_cnav[cur_fund_id][all_fund_cnav[cur_fund_id].apply(lambda x: math.isnan(x))] = np.nan self.last_nav_date = str(all_fund_cnav.dropna(how="all").index.values[-1])[:10] return all_fund_nav, all_fund_cnav # 获取客户对比指数净值数据 def get_customer_index_nav_data(self, index_id="IN0000007M"): with TAMP_SQL(tamp_product_engine) as tamp_product: tamp_product_session = tamp_product.session sql = "select distinct price_date,close from fund_market_indexes where index_id='{}' order by price_date ASC".format(index_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() index_df = pd.DataFrame(list(data), columns=['price_date', 'index']) index_df["price_date"] = pd.to_datetime(index_df["price_date"]) index_df.set_index("price_date", inplace=True) self.fund_cnav_total["index"] = index_df["index"] self.index_df = index_df return index_df # 分组合计算 def group_operate(self): for folio in self.user_customer_order_df["folio_name"].unique(): cur_folio_order_df = self.user_customer_order_df[self.user_customer_order_df["folio_name"] == folio] fund_id_list = list(self.user_customer_order_df["fund_id"].unique()) cur_folio_nav_df = self.fund_nav_total[fund_id_list] # fund_id_list.append("index") cur_folio_cnav_df = self.fund_cnav_total[fund_id_list] self.signal_folio_operate(folio, cur_folio_order_df, cur_folio_nav_df, cur_folio_cnav_df) continue # 单个组合数据操作 def signal_folio_operate(self, p_folio, p_order_df, p_nav_df, p_cnav_df): start_date = pd.to_datetime(p_order_df["confirm_share_date"].min()) cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy() p_fund_id_list = list(p_order_df["fund_id"].unique()) for p_fund_id in p_fund_id_list: order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min() if pd.to_datetime(order_min_date) > start_date: cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]): last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # 判断上个净值日和当前确认日之中是否存在分红日 """need add judge""" if len(last_nav_data) < 1: cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] else: diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0]) cur_cnav = last_cnav_data.values[0] + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav else: confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1) confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1) diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0]) cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill') for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 # if pd.isnull(cnav_df.loc[confirm_share_date, cur_fund_id]): # last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # # 判断上个净值日和当前确认日之中是否存在分红日 # """need add judge""" # # if len(last_nav_data) < 1: # cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] # else: # diff_nav = row["nav"] - last_nav_data.values[0] # cur_cnav = last_cnav_data.values[0] + diff_nav # cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav if cur_fund_id+"_amount" not in cnav_df: price = cnav_df[cur_fund_id].dropna() profit = price.diff().fillna(Decimal(0)) cnav_df[cur_fund_id + "_profit"] = profit cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1) cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) cnav_df[cur_fund_id+"_amount"] = 0 cnav_df[cur_fund_id + "_earn"] = 0 cnav_df[cur_fund_id + "_share"] = 0 # buy if row['order_type'] == 1: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"] # sell elif row['order_type'] == 2: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] -= row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] -= row["confirm_share"] # cnav_df[cur_fund_id + "_earn"] = cnav_df[cur_fund_id + "_profit"] * cnav_df[cur_fund_id + "_share"] # cnav_df[cur_fund_id + "_earn"] = cnav_df[cur_fund_id + "_earn"].apply(lambda x: float(x)) # cnav_df[cur_fund_id + "_cum_earn"] = cnav_df[cur_fund_id + "_earn"].cumsum() for p_fund_id_ in p_fund_id_list: cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply(lambda x: float(x)).fillna(0) # cnav_df[p_fund_id_ + "_earn"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: float(x)) cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0) cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + cnav_df[p_fund_id_ + "_amount"] # cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_share"] * cnav_df[p_fund_id_] # cnav_df[p_fund_id_ + "_profit_ratio"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: Decimal(x)) / cnav_df[ # p_fund_id_ + "_net_amount"].shift() self.group_data[p_folio] = {"result_cnav_data": cnav_df, "order_df": p_order_df} return cnav_df # 所有的数据操作 def total_combine_data(self): p_order_df = self.user_customer_order_df.copy() p_nav_df = self.fund_nav_total.copy() p_cnav_df = self.fund_cnav_total.copy() start_date = pd.to_datetime(p_order_df["confirm_share_date"].min()) cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy() p_fund_id_list = list(p_order_df["fund_id"].unique()) for p_fund_id in p_fund_id_list: order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min() if pd.to_datetime(order_min_date) > start_date: cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]): last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # 判断上个净值日和当前确认日之中是否存在分红日 """need add judge""" if len(last_nav_data) < 1: cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] else: diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0]) cur_cnav = last_cnav_data.values[0] + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav else: confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1) confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1) diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0]) cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill') for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) if cur_fund_id + "_amount" not in cnav_df: price = cnav_df[cur_fund_id].dropna() profit = price.diff().fillna(Decimal(0)) cnav_df[cur_fund_id + "_profit"] = profit cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1) cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) cnav_df[cur_fund_id + "_amount"] = 0 cnav_df[cur_fund_id + "_earn"] = 0 cnav_df[cur_fund_id + "_share"] = 0 # profit = cnav_df[cur_fund_id].dropna() - cnav_df[cur_fund_id].dropna().shift(1) # cnav_df[cur_fund_id + "_profit"] = profit # cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) # cnav_df[cur_fund_id + "_profit_ratio"] = profit / cnav_df[cur_fund_id].dropna().shift(1) # cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) # cnav_df[cur_fund_id + "_amount"] = 0 # cnav_df[cur_fund_id + "_earn"] = 0 # cnav_df[cur_fund_id + "_cum_earn"] = 0 # cnav_df[cur_fund_id + "_share"] = 0 # buy if row['order_type'] == 1: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"] # sell elif row['order_type'] == 2: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] -= row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] -= row["confirm_share"] for p_fund_id_ in p_fund_id_list: cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply(lambda x: float(x)).fillna(0) # cnav_df[p_fund_id_ + "_earn"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: float(x)) cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0) cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + cnav_df[p_fund_id_ + "_amount"] # cnav_df[p_fund_id_ + "_profit_ratio"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: Decimal(x)) / cnav_df[ # p_fund_id_ + "_net_amount"].shift() # cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_share"] * cnav_df[p_fund_id_] return cnav_df