#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Time : 2020/11/18 19:12 # @Author : Jie. Z # @Email : zhaojiestudy@163.com # @File : data_service.py # @Software: PyCharm import pandas as pd import numpy as np from sqlalchemy import and_ import tushare as ts import datetime import math from decimal import Decimal from app.api.engine import tamp_user_engine, tamp_product_engine, TAMP_SQL, tamp_fund_engine, tamp_diagnose_app_engine from app.service.portfolio_diagnose import get_fund_substrategy # from app.model.tamp_user_models import CustomerOrder, CustomerInfo # from app.model.tamp_product_models import FundInfo from app.utils.fund_rank import get_frequency, get_trade_cal class UserCustomerDataAdaptor: user_id = "" customer_id = "" customer_real_name = "" month_date = "" end_date = "" group_data = {} trade_cal_date = None all_fund_id_list = None all_fund_type_dict = None all_fund_distribution = {} all_fund_performance = {} def __init__(self, user_id, customer_id, end_date=str(datetime.datetime.now()), index_id="IN0000007M"): self.user_id = user_id self.customer_id = customer_id self.compare_index_id = index_id self.valueSex = '' p_end_date = pd.to_datetime(end_date).date() # p_end_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1) - datetime.timedelta(days=1) self.end_date = pd.to_datetime(str(p_end_date)) # self.end_date = pd.to_datetime("2021-01-29") p_start_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1) self.month_start_date = p_start_date # self.month_start_date = pd.to_datetime("2020-12-01") self.user_customer_order_df = self.get_user_customer_order_data() self.fund_nav_total, self.fund_cnav_total = self.get_customer_fund_nav_data() self.index_df = self.get_customer_index_nav_data() self.total_customer_order_cnav_df = self.total_combine_data() self.group_operate() @staticmethod def get_trade_cal(start_date, end_date): try: df = get_trade_cal() df = df[df["cal_date"] >= start_date] df.drop(['end_date'], axis=1, inplace=True) df.rename(columns={'cal_date': 'end_date'}, inplace=True) df["datetime"] = df["end_date"].apply(lambda x: datetime.datetime.strptime(x, "%Y%m%d")) return df except: pass ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc') pro = ts.pro_api() if end_date is not None: df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1') else: df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1') df.drop(['exchange', 'is_open'], axis=1, inplace=True) df.rename(columns={'cal_date': 'end_date'}, inplace=True) df["datetime"] = df["end_date"].apply(lambda x: datetime.datetime.strptime(x, "%Y%m%d")) return df # 获取理财师下该用户所有订单列表 def get_user_customer_order_data(self): with TAMP_SQL(tamp_fund_engine) as tamp_fund, TAMP_SQL(tamp_product_engine) as tamp_product, TAMP_SQL(tamp_diagnose_app_engine) as tamp_diagnose_app: tamp_diagnose_app_session = tamp_diagnose_app.session tamp_product_session = tamp_product.session tamp_fund_session = tamp_fund.session sql_user = """select f1.fund_id, f2.realname,f3.customer_name,f3.valueSex,f1.type,f1.order_type,f1.pay_date,f1.subscription_fee,f1.confirm_share_date,f1.confirm_share,f1.confirm_amount,f1.nav,f1.folio_name, f1.comefrom from tamp_diagnose_app.customer_order_view f1, tamp_user.user_info f2,tamp_diagnose_app.customer_view f3 where f2.id=f1.user_id and f3.id=f1.customer_id and f1.delete_tag=0 and user_id='{}' and customer_id='{}'""".format(self.user_id, self.customer_id) cur = tamp_diagnose_app_session.execute(sql_user) data = cur.fetchall() order_df = pd.DataFrame(list(data), columns=['fund_id', 'username', 'customer_name', 'valueSex', 'type', 'order_type', 'pay_date', 'subscription_fee', 'confirm_share_date', 'confirm_share', 'confirm_amount', 'nav', 'folio_name', 'comefrom']) cur_fund_id = list(order_df["fund_id"].unique()) fund_list_str = str(cur_fund_id).replace("[", "(").replace("]", ")") sql_product = "select distinct `id`, `fund_short_name`, `nav_frequency`, `substrategy` from `fund_info`" cur = tamp_product_session.execute(sql_product) data = cur.fetchall() product_df = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'freq', 'substrategy']) if len(cur_fund_id) > 0: sql_fund = "select distinct `id`, `fund_short_name`, `nav_frequency`, `substrategy` from `fund_info` where `id` in {}".format(fund_list_str) cur = tamp_fund_session.execute(sql_fund) data = cur.fetchall() fund_df = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'freq', 'substrategy']) if len(fund_df) > 0: product_df = product_df.append(fund_df) product_df = product_df.drop_duplicates("fund_id") fund_type3 = order_df[order_df["type"] == 3] if len(fund_type3) > 0: type3_fund_id = list(fund_type3["fund_id"].unique()) fund_list_str_3 = str(type3_fund_id).replace("[", "(").replace("]", ")") sql_fund_3 = "select distinct `id`, `fund_name`, `substrategy` from `ifa_imported_fund_info` where `id` in {}".format( fund_list_str_3) cur = tamp_fund_session.execute(sql_fund_3) data = cur.fetchall() fund_df_3 = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'substrategy']) fund_df_3["freq"] = 1 if len(fund_df_3) > 0: product_df = product_df.append(fund_df_3) product_df = product_df.drop_duplicates("fund_id") fund_type0 = order_df[order_df["type"] == 0] if len(fund_type0) > 0: type0_fund_id = list(fund_type0["fund_id"].unique()) fund_list_str_0 = str(type0_fund_id).replace("[", "(").replace("]", ")") sql_fund_0 = "select distinct `id`, `fund_short_name`, `substrategy` from `tx_fund_info` where `id` in {}".format( fund_list_str_0) cur = tamp_fund_session.execute(sql_fund_0) data = cur.fetchall() fund_df_0 = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'substrategy']) fund_df_0["freq"] = 1 if len(fund_df_0) > 0: product_df = product_df.append(fund_df_0) product_df = product_df.drop_duplicates("fund_id") user_customer_order_df = order_df.set_index('fund_id').join(product_df.set_index('fund_id')).reset_index() user_customer_order_df["substrategy"] = user_customer_order_df.apply(lambda x: get_fund_substrategy(x["fund_id"], x["type"]), axis=1) user_customer_order_df["confirm_share_date"] = user_customer_order_df["confirm_share_date"].apply(lambda x: pd.to_datetime(x.date())) self.customer_real_name = user_customer_order_df["customer_name"].values[0] self.ifa_real_name = user_customer_order_df["username"].values[0] self.valueSex = user_customer_order_df["valueSex"].values[0] user_customer_order_df = user_customer_order_df[user_customer_order_df["confirm_share_date"] <= self.end_date] user_customer_order_df["confirm_amount"] = user_customer_order_df["confirm_amount"] - user_customer_order_df["subscription_fee"] user_customer_order_df.sort_values(by="confirm_share_date", inplace=True) user_customer_order_df.index = pd.Series(range(len(user_customer_order_df))) user_customer_order_df["original_confirm_share"] = user_customer_order_df["confirm_share"] user_customer_order_df["divident_share"] = user_customer_order_df["confirm_share"] user_customer_order_df["reduce_share"] = user_customer_order_df["confirm_share"] user_customer_order_df["coefficient"] = Decimal(1.0) # 内部订单 inter_df = user_customer_order_df[user_customer_order_df["comefrom"] == "inter"] inter_order_df = self.reduce_inter_order(inter_df) # 外部订单 outside_df = user_customer_order_df[user_customer_order_df["comefrom"] == "app"] outside_order_df = self.reduce_outside_order(outside_df) # outside_order_df["comefrom"] = "app" # outside_order_df = outside_order_df[outside_order_df["fund_id"] == "HF00005AFK"] user_customer_order_df = pd.concat([inter_order_df, outside_order_df]) user_customer_order_df = user_customer_order_df[((user_customer_order_df["order_type"] == 1)|(user_customer_order_df["order_type"] == 4))&(user_customer_order_df["confirm_share"] > 0)] user_customer_order_df.index = pd.Series(range(len(user_customer_order_df))) self.start_date = user_customer_order_df["confirm_share_date"].min() return user_customer_order_df # 核减内部订单 def reduce_inter_order(self, order_df): inter_order_df = order_df.copy() inter_order_df.sort_values(by="confirm_share_date", inplace=True) def reduce_order(fund_order, d_fund_id, d_date, d_nav): """ 再平衡分红后单子的份额数量,对分红前的单子进行拆分 :param fund_order: 订单dataframe :param d_date: 分红订单日期 :param d_nav: 分红后单位净值 :return: """ for _index, _row in fund_order.iterrows(): if _row["confirm_share_date"] > d_date: break if _row["order_type"] == 1 and _row["fund_id"] == d_fund_id and _row["nav"] > d_nav: coefficient = _row["nav"] / d_nav fund_order.loc[_index, "coefficient"] = coefficient * _row["coefficient"] fund_order.loc[_index, "divident_share"] = _row["confirm_share"] * coefficient return fund_order for index, row in inter_order_df.iterrows(): if row["order_type"] == 4: dividend_date = row["confirm_share_date"] dividend_nav = row["nav"] reduce_order(inter_order_df, row["fund_id"], dividend_date, dividend_nav) continue if row["order_type"] == 2: need_less_share = row["confirm_share"] for index_ori, row_ori in inter_order_df.iterrows(): if index_ori >= index: break if row["fund_id"] == row_ori["fund_id"] and (row_ori["order_type"] == 1 or row_ori["order_type"] == 4): ori_share = row_ori["original_confirm_share"] div_share = row_ori["divident_share"] if need_less_share >= div_share: need_less_share -= ori_share inter_order_df.loc[index_ori, "divident_share"] = 0 else: stay_share = row_ori["divident_share"] - need_less_share inter_order_df.loc[index_ori, "divident_share"] = stay_share if stay_share > 0.01 else 0 need_less_share = 0 if need_less_share <= 0: break inter_order_df["reduce_share"] = inter_order_df["divident_share"] / inter_order_df["coefficient"] inter_order_df["confirm_share"] = inter_order_df["reduce_share"] return inter_order_df # 核减外部订单 def reduce_outside_order(self, order_df): outside_order_df = order_df.copy() for index, row in outside_order_df.iterrows(): if row["order_type"] == 2: need_less_share = row["confirm_share"] for index_ori, row_ori in outside_order_df.iterrows(): if index_ori >= index: break if row["fund_id"] == row_ori["fund_id"] and row_ori["order_type"] == 1: ori_share = row_ori["reduce_share"] if need_less_share >= ori_share: need_less_share -= ori_share outside_order_df.loc[index_ori, "reduce_share"] = 0 else: outside_order_df.loc[index_ori, "reduce_share"] = row_ori["reduce_share"] - need_less_share need_less_share = 0 if need_less_share <= 0: break outside_order_df["confirm_share"] = outside_order_df["reduce_share"] return outside_order_df # 获取客户持有的基金净值数据 def get_customer_fund_nav_data(self): with TAMP_SQL(tamp_product_engine) as tamp_product, TAMP_SQL(tamp_fund_engine) as tamp_fund: tamp_product_session = tamp_product.session tamp_fund_session = tamp_fund.session now_date = datetime.datetime.now().strftime("%Y%m%d") trade_date_df = self.get_trade_cal("20000101", now_date) self.trade_cal_date = trade_date_df all_fund_nav = pd.DataFrame(index=trade_date_df["datetime"]) all_fund_cnav = pd.DataFrame(index=trade_date_df["datetime"]) temp_fund_id = [] # for cur_fund_id in self.user_customer_order_df["fund_id"].unique(): for index, row in self.user_customer_order_df.iterrows(): cur_fund_id = row["fund_id"] if cur_fund_id in temp_fund_id: continue else: temp_fund_id.append(cur_fund_id) fund_type = row["type"] # 对应基金净值 if fund_type == 0: sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `tx_fund_nav` where `fund_id`='{}' and `delete_tag`=0 order by `price_date` ASC""".format(cur_fund_id) cur = tamp_fund_session.execute(sql) elif fund_type == 1: sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `fund_nav` where `fund_id`='{}' and `delete_tag`=0 order by `price_date` ASC""".format( cur_fund_id) cur = tamp_fund_session.execute(sql) elif fund_type == 2: sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `fund_nav` where `fund_id`='{}' and `delete_tag`=0 order by `price_date` ASC""".format( cur_fund_id) cur = tamp_product_session.execute(sql) elif fund_type == 3: sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `ifa_imported_fund_nav` where `fund_id`='{}' and `delete_tag`=0 order by `price_date` ASC""".format( cur_fund_id) cur = tamp_fund_session.execute(sql) data = cur.fetchall() cur_fund_nav_df = pd.DataFrame(list(data), columns=['price_date', 'nav', 'cnav']) # # 对应基金分红 dis_sql = """select distinct `distribute_date`, `distribution` from `fund_distribution` where `fund_id`='{}' and `distribute_type`='1' order by `distribute_date` ASC""".format( cur_fund_id) if fund_type == 0: dis_sql = """select distinct `distribute_date`, `distribution` from `tx_fund_distribution` where `fund_id`='{}' and `distribute_type`='1' order by `distribute_date` ASC""".format( cur_fund_id) cur = tamp_fund_session.execute(dis_sql) else: cur = tamp_product_session.execute(dis_sql) data = cur.fetchall() cur_fund_distribution_df = pd.DataFrame(list(data), columns=['price_date', 'distribution']) self.all_fund_distribution[cur_fund_id] = cur_fund_distribution_df # 对应基金performance数据 per_sql = """select distinct `price_date`, `ret_1w`, `ret_cum_1m`, `ret_cum_6m`, `ret_cum_1y`, `ret_cum_ytd`, `ret_cum_incep` from `fund_performance` where `fund_id`='{}' order by `price_date` ASC""".format( cur_fund_id) if fund_type == 0: per_sql = """select distinct `price_date`, `ret_1w`, `ret_1m`, `ret_6m`, `ret_1y`, `ret_ytd`, `ret_incep` from `tx_fund_count` where `fund_id`='{}' order by `price_date` ASC""".format( cur_fund_id) cur = tamp_fund_session.execute(per_sql) elif fund_type == 3: per_sql = """select distinct `price_date`, `ret_1w`, `ret_1m`, `ret_6m`, `ret_1y`, `ret_ytd`, `ret_incep` from `ifa_imported_fund_count` where `fund_id`='{}' order by `price_date` ASC""".format( cur_fund_id) cur = tamp_fund_session.execute(per_sql) else: cur = tamp_product_session.execute(per_sql) data = cur.fetchall() cur_fund_performance_df = pd.DataFrame(list(data), columns=['price_date', 'ret_1w', 'ret_cum_1m', 'ret_cum_6m', 'ret_cum_1y', 'ret_cum_ytd', 'ret_cum_incep']) cur_fund_performance_df = cur_fund_performance_df[ cur_fund_performance_df["price_date"] <= self.end_date.date()] self.all_fund_performance[cur_fund_id] = cur_fund_performance_df cur_fund_nav_df["price_date"] = pd.to_datetime(cur_fund_nav_df["price_date"]) cur_fund_nav_df.drop_duplicates(subset="price_date", keep='first', inplace=True) cur_fund_nav_df.set_index("price_date", inplace=True) cur_fund_nav_df = cur_fund_nav_df[cur_fund_nav_df.index.isin(all_fund_nav.index)] if math.isnan(row["freq"]): try: freq = get_frequency(cur_fund_nav_df) except: freq = 250 timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120} self.user_customer_order_df.loc[index, "freq"] = timeoffset_dict[freq] all_fund_nav[cur_fund_id] = cur_fund_nav_df["nav"] all_fund_cnav[cur_fund_id] = cur_fund_nav_df["cnav"] all_fund_nav = all_fund_nav[all_fund_nav.index <= self.end_date] all_fund_cnav = all_fund_cnav[all_fund_cnav.index <= self.end_date] # for cur_fund_id in self.user_customer_order_df["fund_id"].unique(): # all_fund_nav[cur_fund_id][all_fund_nav[cur_fund_id].apply(lambda x: math.isnan(x))]=np.nan # all_fund_cnav[cur_fund_id][all_fund_cnav[cur_fund_id].apply(lambda x: math.isnan(x))] = np.nan self.last_nav_date = str(all_fund_cnav.dropna(how="all").index.values[-1])[:10] return all_fund_nav, all_fund_cnav # 获取客户对比指数净值数据 def get_customer_index_nav_data(self, index_id="IN0000007M"): with TAMP_SQL(tamp_product_engine) as tamp_product: tamp_product_session = tamp_product.session sql = "select distinct price_date,close from fund_market_indexes where index_id='{}' order by price_date ASC".format(index_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() index_df = pd.DataFrame(list(data), columns=['price_date', 'index']) index_df["price_date"] = pd.to_datetime(index_df["price_date"]) index_df.set_index("price_date", inplace=True) self.fund_cnav_total["index"] = index_df["index"] self.fund_nav_total["index"] = index_df["index"] self.index_df = index_df return index_df # 分组合计算 def group_operate(self): for folio in self.user_customer_order_df["folio_name"].unique(): cur_folio_order_df = self.user_customer_order_df[self.user_customer_order_df["folio_name"] == folio] fund_id_list = list(self.user_customer_order_df["fund_id"].unique()) cur_folio_nav_df = self.fund_nav_total[fund_id_list] # fund_id_list.append("index") cur_folio_cnav_df = self.fund_cnav_total[fund_id_list] self.signal_folio_operate(folio, cur_folio_order_df, cur_folio_nav_df, cur_folio_cnav_df) continue # 单个组合数据操作 def signal_folio_operate(self, p_folio, p_order_df, p_nav_df, p_cnav_df): start_date = pd.to_datetime(p_order_df["confirm_share_date"].min()) p_order_df = p_order_df[(p_order_df["order_type"] == 1) | (p_order_df["order_type"] == 4)] p_nav_df = self.fund_nav_total.copy() p_cnav_df = self.fund_cnav_total.copy() start_date = pd.to_datetime(p_order_df["confirm_share_date"].min()) nav_df = p_nav_df[p_nav_df.index >= start_date].copy() cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy() p_fund_id_list = list(p_order_df["fund_id"].unique()) self.all_fund_id_list = p_fund_id_list self.all_fund_type_dict = {values["fund_id"]: values["type"] for values in p_order_df[["fund_id", "type"]].drop_duplicates().to_dict(orient='records')} for p_fund_id in p_fund_id_list: order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min() if pd.to_datetime(order_min_date) > start_date: cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan # 处理累积净值 p_outside_order_df = p_order_df[p_order_df["comefrom"] == "app"] for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]): last_nav_data = p_nav_df[p_nav_df.index > confirm_share_date][cur_fund_id].dropna().head(1) last_cnav_data = p_cnav_df[p_cnav_df.index > confirm_share_date][cur_fund_id].dropna().head(1) # 判断上个净值日和当前确认日之中是否存在分红日 """need add judge""" if len(last_nav_data) < 1: cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] else: diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0]) cur_cnav = last_cnav_data.values[0] + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav else: confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1) confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1) diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0]) cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill') for index, row in p_outside_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) if cur_fund_id + "_amount" not in cnav_df: price = cnav_df[cur_fund_id].dropna() profit = price.diff().fillna(Decimal(0)) cnav_df[cur_fund_id + "_profit"] = profit cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1) cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) cnav_df[cur_fund_id + "_amount"] = 0 cnav_df[cur_fund_id + "_earn"] = 0 cnav_df[cur_fund_id + "_share"] = 0 # buy if row['order_type'] == 1 or row['order_type'] == 4: # if row['order_type'] == 1: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["nav"] * row["confirm_share"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"] # 处理单位净值 nav_df = nav_df.dropna(axis=0, how="all").fillna(method='ffill') p_inter_order_df = p_order_df[p_order_df["comefrom"] == "inter"] for index, row in p_inter_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) confirm_share_nav = row["nav"] nav_df.loc[confirm_share_date, cur_fund_id] = confirm_share_nav # 内部订单是用单位净值算收益 nav_df = nav_df.fillna(method='ffill') actual_share_dict = {} for index, row in p_inter_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) if cur_fund_id + "_amount" not in nav_df: nav_df[cur_fund_id + "_profit"] = Decimal(0) nav_df[cur_fund_id + "_profit_ratio"] = Decimal(0) nav_df[cur_fund_id + "_amount"] = 0 nav_df[cur_fund_id + "_earn"] = 0 nav_df[cur_fund_id + "_share"] = 0 nav_df[cur_fund_id + "_reduce_share"] = 0 # buy if row['order_type'] == 1: cur_fund_share = actual_share_dict.get(cur_fund_id, 0) cur_fund_share += row["original_confirm_share"] - (row["original_confirm_share"] * row["coefficient"] - row["divident_share"]) nav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["reduce_share"] * row["nav"] # nav_df.loc[confirm_share_date:, cur_fund_id + "_share"] = cur_fund_share nav_df.loc[confirm_share_date:, cur_fund_id + "_reduce_share"] += row["reduce_share"] actual_share_dict[cur_fund_id] = cur_fund_share if row['order_type'] == 4: cur_fund_share = actual_share_dict.get(cur_fund_id, 0) # cur_fund_share += row["reduce_share"] # nav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] = cur_fund_share * row["nav"] nav_df.loc[confirm_share_date:, cur_fund_id + "_share"] = cur_fund_share + row["reduce_share"] nav_df.loc[confirm_share_date:, cur_fund_id + "_reduce_share"] = cur_fund_share + row["reduce_share"] # actual_share_dict[cur_fund_id] = cur_fund_share for p_fund_id_ in p_outside_order_df["fund_id"].unique(): cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply( lambda x: float(x)).fillna(0) cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0) cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + \ cnav_df[p_fund_id_ + "_amount"] # 内部订单 for p_fund_id_ in p_inter_order_df["fund_id"].unique(): nav_df[p_fund_id_ + "_net_amount"] = (nav_df[p_fund_id_] * nav_df[p_fund_id_ + "_reduce_share"]).apply( lambda x: float(x)).fillna(0) nav_df[p_fund_id_ + "_cum_earn"] = nav_df[p_fund_id_ + "_net_amount"] - nav_df[ p_fund_id_ + "_amount"].apply(lambda x: float(x)) nav_df[p_fund_id_ + "_earn"] = nav_df[p_fund_id_ + "_cum_earn"].diff().fillna(0) nav_df[p_fund_id_ + "_profit_ratio"] = ( nav_df[p_fund_id_ + "_earn"] / nav_df[p_fund_id_ + "_amount"].apply(lambda x: float(x))).apply( lambda x: Decimal(x)) nav_df[p_fund_id_ + "_net_amount"] = nav_df[p_fund_id_ + "_net_amount"].apply(lambda x: Decimal(x)) nav_df[p_fund_id_ + "_profit_ratio"] = nav_df[p_fund_id_ + "_profit_ratio"].apply( lambda x: Decimal(0) if math.isnan(x) else x) nav_df[p_fund_id_ + "_share"] = nav_df[p_fund_id_ + "_reduce_share"] finall_cnav_df = cnav_df.copy() all_nav_df = p_nav_df[p_nav_df.index >= start_date].copy().dropna(axis=0, how="all").fillna(method='ffill') for p_fund_id_ in p_order_df["fund_id"].unique(): finall_cnav_df[p_fund_id_ + "_nav"] = all_nav_df[p_fund_id_] if p_fund_id_ + "_profit_ratio" not in finall_cnav_df.columns: finall_cnav_df[p_fund_id_ + "_profit_ratio"] = Decimal(0) finall_cnav_df[p_fund_id_ + "_amount"] = 0 finall_cnav_df[p_fund_id_ + "_earn"] = 0 finall_cnav_df[p_fund_id_ + "_share"] = 0 finall_cnav_df[p_fund_id_ + "_net_amount"] = Decimal(0) # try: # finall_cnav_df[p_fund_id_] = cnav_df[p_fund_id_] # finall_cnav_df[p_fund_id_ + "_net_amount"] += cnav_df[p_fund_id_ + "_net_amount"] # finall_cnav_df[p_fund_id_ + "_profit_ratio"] += cnav_df[p_fund_id_ + "_profit_ratio"] # finall_cnav_df[p_fund_id_ + "_amount"] += cnav_df[p_fund_id_ + "_amount"] # finall_cnav_df[p_fund_id_ + "_earn"] += cnav_df[p_fund_id_ + "_earn"] # finall_cnav_df[p_fund_id_ + "_share"] += cnav_df[p_fund_id_ + "_share"] # except: # pass try: finall_cnav_df[p_fund_id_ + "_nav"] = nav_df[p_fund_id_] finall_cnav_df[p_fund_id_ + "_net_amount"] += nav_df[p_fund_id_ + "_net_amount"] finall_cnav_df[p_fund_id_ + "_profit_ratio"] += nav_df[p_fund_id_ + "_profit_ratio"] finall_cnav_df[p_fund_id_ + "_amount"] += nav_df[p_fund_id_ + "_amount"] finall_cnav_df[p_fund_id_ + "_earn"] += nav_df[p_fund_id_ + "_earn"] finall_cnav_df[p_fund_id_ + "_share"] += nav_df[p_fund_id_ + "_share"] except: pass self.group_data[p_folio] = {"result_cnav_data": finall_cnav_df, "order_df": p_order_df} return cnav_df # 所有的数据操作 def total_combine_data(self): p_order_df = self.user_customer_order_df[(self.user_customer_order_df["order_type"] == 1) | (self.user_customer_order_df["order_type"] == 4)] p_nav_df = self.fund_nav_total.copy() p_cnav_df = self.fund_cnav_total.copy() start_date = pd.to_datetime(p_order_df["confirm_share_date"].min()) nav_df = p_nav_df[p_nav_df.index >= start_date].copy() cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy() p_fund_id_list = list(p_order_df["fund_id"].unique()) self.all_fund_id_list = p_fund_id_list self.all_fund_type_dict = {values["fund_id"]: values["type"] for values in p_order_df[["fund_id", "type"]].drop_duplicates().to_dict(orient='records')} for p_fund_id in p_fund_id_list: order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min() if pd.to_datetime(order_min_date) > start_date: cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan # 处理累积净值 p_outside_order_df = p_order_df[p_order_df["comefrom"] == "app"] for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]): last_nav_data = p_nav_df[p_nav_df.index > confirm_share_date][cur_fund_id].dropna().head(1) last_cnav_data = p_cnav_df[p_cnav_df.index > confirm_share_date][cur_fund_id].dropna().head(1) # 判断上个净值日和当前确认日之中是否存在分红日 """need add judge""" if len(last_nav_data) < 1: cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] else: diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0]) cur_cnav = last_cnav_data.values[0] + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav else: confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1) confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1) diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0]) cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill') for index, row in p_outside_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) if cur_fund_id + "_amount" not in cnav_df: price = cnav_df[cur_fund_id].dropna() profit = price.diff().fillna(Decimal(0)) cnav_df[cur_fund_id + "_profit"] = profit cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1) cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) cnav_df[cur_fund_id + "_amount"] = 0 cnav_df[cur_fund_id + "_earn"] = 0 cnav_df[cur_fund_id + "_share"] = 0 # buy if row['order_type'] == 1 or row['order_type'] == 4: # if row['order_type'] == 1: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["nav"] * row["confirm_share"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"] # 处理单位净值 nav_df = nav_df.dropna(axis=0, how="all").fillna(method='ffill') p_inter_order_df = p_order_df[p_order_df["comefrom"] == "inter"] for index, row in p_inter_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) confirm_share_nav = row["nav"] nav_df.loc[confirm_share_date, cur_fund_id] = confirm_share_nav # 内部订单是用单位净值算收益 nav_df = nav_df.fillna(method='ffill') actual_share_dict = {} for index, row in p_inter_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) if cur_fund_id + "_amount" not in nav_df: nav_df[cur_fund_id + "_profit"] = Decimal(0) nav_df[cur_fund_id + "_profit_ratio"] = Decimal(0) nav_df[cur_fund_id + "_amount"] = 0 nav_df[cur_fund_id + "_earn"] = 0 nav_df[cur_fund_id + "_share"] = 0 nav_df[cur_fund_id + "_reduce_share"] = 0 # buy if row['order_type'] == 1: cur_fund_share = actual_share_dict.get(cur_fund_id, 0) cur_fund_share += row["original_confirm_share"] - (row["original_confirm_share"] * row["coefficient"] - row["divident_share"]) nav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["reduce_share"] * row["nav"] nav_df.loc[confirm_share_date:, cur_fund_id + "_share"] = cur_fund_share nav_df.loc[confirm_share_date:, cur_fund_id + "_reduce_share"] += row["reduce_share"] actual_share_dict[cur_fund_id] = cur_fund_share if row['order_type'] == 4: cur_fund_share = actual_share_dict.get(cur_fund_id, 0) # cur_fund_share += row["reduce_share"] # nav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] = cur_fund_share * row["nav"] nav_df.loc[confirm_share_date:, cur_fund_id + "_share"] = cur_fund_share + row["reduce_share"] nav_df.loc[confirm_share_date:, cur_fund_id + "_reduce_share"] = cur_fund_share + row["reduce_share"] # actual_share_dict[cur_fund_id] = cur_fund_share for p_fund_id_ in p_outside_order_df["fund_id"].unique(): cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply(lambda x: float(x)).fillna(0) cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0) cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + cnav_df[p_fund_id_ + "_amount"] for p_fund_id_ in p_inter_order_df["fund_id"].unique(): nav_df[p_fund_id_ + "_net_amount"] = (nav_df[p_fund_id_] * nav_df[p_fund_id_ + "_reduce_share"]).apply(lambda x: float(x)).fillna(0) nav_df[p_fund_id_ + "_cum_earn"] = nav_df[p_fund_id_ + "_net_amount"] - nav_df[p_fund_id_ + "_amount"].apply(lambda x: float(x)) nav_df[p_fund_id_ + "_earn"] = nav_df[p_fund_id_ + "_cum_earn"].diff().fillna(0) nav_df[p_fund_id_ + "_profit_ratio"] = (nav_df[p_fund_id_ + "_earn"] / nav_df[p_fund_id_ + "_amount"].apply(lambda x: float(x))).apply(lambda x: Decimal(x)) nav_df[p_fund_id_ + "_net_amount"] = nav_df[p_fund_id_ + "_net_amount"].apply(lambda x: Decimal(x)) nav_df[p_fund_id_ + "_profit_ratio"] = nav_df[p_fund_id_ + "_profit_ratio"].apply(lambda x: Decimal(0) if math.isnan(x) else x) finall_cnav_df = cnav_df.copy() all_nav_df = p_nav_df[p_nav_df.index >= start_date].copy().dropna(axis=0, how="all").fillna(method='ffill') for p_fund_id_ in p_order_df["fund_id"].unique(): finall_cnav_df[p_fund_id_ + "_nav"] = all_nav_df[p_fund_id_] if p_fund_id_ + "_profit_ratio" not in finall_cnav_df.columns: finall_cnav_df[p_fund_id_ + "_profit_ratio"] = Decimal(0) finall_cnav_df[p_fund_id_ + "_amount"] = 0 finall_cnav_df[p_fund_id_ + "_earn"] = 0 finall_cnav_df[p_fund_id_ + "_share"] = 0 finall_cnav_df[p_fund_id_ + "_net_amount"] = Decimal(0) # try: # finall_cnav_df[p_fund_id_] = cnav_df[p_fund_id_] # finall_cnav_df[p_fund_id_ + "_net_amount"] += cnav_df[p_fund_id_ + "_net_amount"] # finall_cnav_df[p_fund_id_ + "_profit_ratio"] += cnav_df[p_fund_id_ + "_profit_ratio"] # finall_cnav_df[p_fund_id_ + "_amount"] += cnav_df[p_fund_id_ + "_amount"] # finall_cnav_df[p_fund_id_ + "_earn"] += cnav_df[p_fund_id_ + "_earn"] # finall_cnav_df[p_fund_id_ + "_share"] += cnav_df[p_fund_id_ + "_share"] # except: # pass try: finall_cnav_df[p_fund_id_ + "_nav"] = nav_df[p_fund_id_] finall_cnav_df[p_fund_id_ + "_net_amount"] += nav_df[p_fund_id_ + "_net_amount"] finall_cnav_df[p_fund_id_ + "_profit_ratio"] += nav_df[p_fund_id_ + "_profit_ratio"] finall_cnav_df[p_fund_id_ + "_amount"] += nav_df[p_fund_id_ + "_amount"] finall_cnav_df[p_fund_id_ + "_earn"] += nav_df[p_fund_id_ + "_earn"] finall_cnav_df[p_fund_id_ + "_share"] += nav_df[p_fund_id_ + "_share"] except: pass return finall_cnav_df