from app.utils.fund_rank import * from app.utils.risk_parity import * from app.pypfopt import risk_models from app.pypfopt import expected_returns from app.pypfopt import EfficientFrontier def cal_correlation(prod): """计算组合内基金相关性 Args: prod: 组合净值表:索引为日期,列名为基金ID, 内容为净值 Returns:屏蔽基金与自身相关性的相关矩阵,因为基金与自身相关性为1,妨碍后续高相关性基金筛选的判断 """ prod_return = prod.iloc[:, :].apply(lambda x: simple_return(x)) correlation = prod_return.corr() return correlation.mask(np.eye(correlation.shape[0], dtype=np.bool)) def rename_col(df, fund_id): """将列名由adj_nav改为基金ID Args: df: 原始净值表:索引为日期,列名分别为 ”fund_id“, "adj_nav", 内容为[基金ID,净值] fund_id: 基金ID Returns:删除 ”fund_id” 列, 重命名 “adj_nav” 列为基金ID的净值表 """ df.rename(columns={'adj_nav': fund_id}, inplace=True) df.drop('fund_id', axis=1, inplace=True) return df def replace_fund(manager, substrategy, fund_rank): """查找不足半年数据的基金的替代基金 Args: manager: 基金经理ID substrategy: 基金二级策略 fund_rank: 基金打分排名表 Returns: 满足相同基金经理ID下的同种二级策略的基金ID的第一个结果 """ df = fund_rank[(fund_rank['manager'] == manager) & (fund_rank['substrategy'] == substrategy)] return df['fund_id'].values[0] def search_rank(fund_rank, fund, metric): """查找基金在基金排名表中的指标 Args: fund_rank: 基金排名表 fund: 输入基金ID metric: 查找的指标名称 Returns: 基金指标的值 """ return fund_rank[fund_rank['fund_id'] == fund][metric].values[0] def translate_single(content, evaluation): ''' content = [["优秀","良好","一般"], ["优秀","良好","合格","较差"], ["优秀","良好","合格","较差"], ["高","一般","较低"]] evaluation = [0,1,1,2] ''' return tuple([content[i][v] if type(v) == int else v for i, v in enumerate(evaluation)]) def choose_good_evaluation(evaluation): """抽取好的评价 Args: evaluation: 个基的评价 Returns: 个基好的评价 """ v1 = evaluation[1] v2 = evaluation[2] v3 = evaluation[3] v4 = evaluation[4] v5 = evaluation[5] if v1[0] > 1: del evaluation[1] if v2[0] > 1: del evaluation[2] if v3[0] > 1: del evaluation[3] if v4[0] != 0 or v4[1] != 0: del evaluation[4] if v5[0] < 3 or v5[2] > 1: # 基金经理的基金管理年限小于三年或平均业绩处于中下水平 del evaluation[5] return evaluation def choose_bad_evaluation(evaluation): v1 = evaluation[1] v2 = evaluation[2] v3 = evaluation[3] v4 = evaluation[4] if v1[0] < 2: del evaluation[1] if v2[0] < 2: del evaluation[2] if v3[0] < 2: del evaluation[3] if v4[0] != 1 or v4[1] != 1: del evaluation[4] return evaluation def get_fund_rank(): sql = "SELECT * FROM fund_rank" df = pd.read_sql(sql, con) # df = pd.read_csv('fund_rank.csv', encoding='gbk') return df def get_index_daily(index_id): """获取指数数据 Args: index_id: 指数ID Returns:与组合净值形式相同的表 """ sql = "SELECT ts_code, trade_date, close FROM index_daily WHERE ts_code='{}'".format(index_id) df = pd.read_sql(sql, con).dropna(how='any') df.rename({'ts_code': 'fund_id', 'trade_date': 'end_date', 'close': 'adj_nav'}, axis=1, inplace=True) df['end_date'] = pd.to_datetime(df['end_date']) df.set_index('end_date', drop=True, inplace=True) df.sort_index(inplace=True, ascending=True) df = rename_col(df, index_id) return df def get_tamp_fund(): """获取探普产品池净值表 Returns: """ sql = "SELECT id FROM tamp_fund_info WHERE id LIKE 'HF%'" df = pd.read_sql(sql, con) df.rename({'id': 'fund_id'}, axis=1, inplace=True) return df def get_risk_level(substrategy): """获取风险类型 Args: substrategy: 二级策略 Returns: """ substrategy2risk = {1: "H", 1010: "H", 1020: "H", 1030: "H", 2010: "H", 3010: "H", 3020: "L", 3030: "H", 3040: "L", 3050: "M", 4010: "M", 4020: "M", 4030: "M", 4040: "M", 5010: "M", 5020: "L", 5030: "M", 6010: "L", 6020: "M", 6030: "L", 7010: "H", 7020: "H", 8010: "H", 8020: "M"} return substrategy2risk[substrategy] def get_radar_data(fund): df = fund_rank[fund_rank['fund_id'] == fund] return_score = df['annual_return_rank'].values[0] * 100 downside_score = df['downside_risk_rank'].values[0] * 100 drawdown_score = df['max_drawdown_rank'].values[0] * 100 sharpe_score = df['sharp_ratio_rank'].values[0] * 100 total_score = df['z_score'].values[0] fund_name = get_fund_name(fund).values[0][0] # print(fund_name) return {'name': fund_name, 'data': [{'name': '绝对收益', 'data': '%.2f' % return_score}, {'name': '抗风险能力', 'data': '%.2f' % downside_score}, {'name': '极端风险', 'data': '%.2f' % drawdown_score}, {'name': '风险调整后收益', 'data': '%.2f' % sharpe_score}, {'name': '业绩持续性', 'data': '%.2f' % np.random.randint(70, 90)}, {'name': '综合评分', 'data': '%.2f' % total_score}]} def get_fund_name(fund): sql = "SELECT fund_short_name FROM fund_info WHERE id='{}'".format(fund) df = pd.read_sql(sql, con) return df # 获取排名信息 fund_rank = get_fund_rank() # 获取探普产品池 tamp_fund = get_fund_rank() class PortfolioDiagnose(object): def __init__(self, client_type, portfolio, invest_amount, expect_return=None, expect_drawdown=None, index_id='000905.SH', invest_type='private', start_date=None, end_date=None): """基金诊断 Args: client_type: 客户类型:1:保守型, 2:稳健型, 3:平衡型, 4:成长型, 5:进取型 portfolio: 投资组合:[基金1, 基金2, 基金3...] invest_amount: 投资金额:10000000元 invest_type: 投资类型:public, private, ... start_date: 诊断所需净值的开始日期 end_date: 诊断所需净值的结束日期 """ self.freq_list = [] self.client_type = client_type self.portfolio = portfolio self.expect_return = expect_return self.expect_drawdown = expect_drawdown self.index_id = index_id self.invest_amount = invest_amount self.invest_type = invest_type self.start_date = start_date self.end_date = end_date if self.end_date is None: self.end_date = datetime.datetime.now() - datetime.timedelta(days=1) self.start_date = cal_date(self.end_date, 'Y', 1) self.replace_pair = dict() # 由于数据不足半年而被替换为相同基金经理和策略的原基金和替换基金的映射 self.no_data_fund = [] # 未在数据库中找到基金净值或者基金经理记录的基金 self.abandon_fund_score = [] # 打分不满足要求的基金 self.abandon_fund_corr = [] # 相关性过高 self.proposal_fund = [] # 建议的基金 self.old_correlation = None self.new_correlation = None self.old_weights = None self.new_weights = None def get_portfolio(self, ): """获取组合净值表 Returns: """ # 获取原始投资组合的第一支基金的净值表 prod = get_nav(self.portfolio[0], self.start_date, invest_type=self.invest_type) fund_info = get_fund_info(self.end_date, invest_type=self.invest_type) while prod is None: # 获取的净值表为空时首先考虑基金净值数据不足半年,查找同一基金经理下的相同二级策略的基金ID作替换 result = fund_info[fund_info['fund_id'] == portfolio[0]] manager = str(result['manager'].values) strategy = result['substrategy'].values replaced_fund = replace_fund(manager, strategy, fund_rank) if replaced_fund is not None: # 替换基金数据非空则记录替换的基金对 prod = get_nav(replaced_fund, self.start_date, invest_type=self.invest_type) self.replace_pair[portfolio[0]] = replaced_fund else: # 替换基金数据为空则记录当前基金为找不到数据的基金, 继续尝试获取下一个基金ID的净值表 self.no_data_fund.append(portfolio[0]) self.portfolio.pop(0) prod = get_nav(self.portfolio[0], self.start_date, invest_type=self.invest_type) # 记录基金的公布频率 self.freq_list.append(get_frequency(prod)) prod = rename_col(prod, portfolio[0]) # 循环拼接基金净值表构建组合 for idx in range(len(portfolio) - 1): prod1 = get_nav(portfolio[idx + 1], self.start_date, invest_type=self.invest_type) if prod1 is None or prod1.index[-1] - prod1.index[0] < 0.6 * (self.end_date - self.start_date): result = fund_info[fund_info['fund_id'] == portfolio[idx + 1]] if result['fund_manager_id'].count() != 0: manager = str(result['fund_manager_id'].values) substrategy = result['substrategy'].values[0] replaced_fund = replace_fund(manager, substrategy, fund_rank) else: self.no_data_fund.append(portfolio[idx + 1]) continue if replaced_fund is not None: prod1 = get_nav(replaced_fund, self.start_date, invest_type=self.invest_type) self.replace_pair[portfolio[0]] = replaced_fund self.freq_list.append(get_frequency(prod1)) prod1 = rename_col(prod1, replaced_fund) else: self.no_data_fund.append(portfolio[idx + 1]) continue else: self.freq_list.append(get_frequency(prod1)) prod1 = rename_col(prod1, portfolio[idx + 1]) # 取prod表和prod1表的并集 prod = pd.merge(prod, prod1, on=['end_date'], how='outer') # 对所有合并后的基金净值表按最大周期进行重采样 prod.sort_index(inplace=True) prod.ffill(inplace=True) prod = resample(prod, get_trade_cal(), min(self.freq_list)) return prod def abandon(self, prod): """建议替换的基金 Args: prod: 原始组合净值表 Returns: 剔除建议替换基金的组合净值表 """ self.old_correlation = cal_correlation(prod) for fund in prod.columns: z_score = search_rank(fund_rank, fund, metric='z_score') # 建议替换得分为60或与其他基金相关度大于0.8的基金 if z_score < 60: self.abandon_fund_score.append(fund) prod = prod.drop(fund, axis=1) if np.any(self.old_correlation[fund] > 0.8): self.abandon_fund_corr.append(fund) prod = prod.drop(fund, axis=1) return prod def proposal(self, prod): """建议申购基金 Args: prod: 剔除建议替换基金的组合净值表 Returns: 增加建议申购基金的组合净值表 """ # 组合内已包含的策略 included_strategy = set() # 按每种基金最少投资100w确定组合包含的最大基金数量 max_len = self.invest_amount // 1e6 - len(prod.columns) # 排名表内包含的所有策略 all_strategy = set(fund_rank['substrategy'].to_list()) if prod is not None: included_strategy = set([search_rank(fund_rank, fund, metric='substrategy') for fund in prod.columns]) # 待添加策略为所有策略-组合已包含策略 add_strategy = all_strategy - included_strategy # 遍历产品池,推荐得分>80且与组合内其他基金相关度低于0.8的属于待添加策略的基金 for proposal in tamp_fund['fund_id']: if proposal in fund_rank['fund_id'].to_list(): proposal_z_score = search_rank(fund_rank, proposal, metric='z_score') proposal_strategy = fund_rank[fund_rank['fund_id'] == proposal]['substrategy'].values[0] else: continue if proposal_z_score > 80 and proposal_strategy in add_strategy: # if proposal_z_score > 80: proposal_nav = get_nav(proposal, self.start_date, invest_type=self.invest_type) # 忽略净值周期大于周更的产品 if get_frequency(proposal_nav) <= 52: continue self.freq_list.append(get_frequency(proposal_nav)) proposal_nav = rename_col(proposal_nav, proposal) # 按最大周期进行重采样,计算新建组合的相关性 prod = pd.merge(prod, proposal_nav, how='outer', on='end_date') prod.sort_index(inplace=True) prod.ffill(inplace=True) prod = resample(prod, get_trade_cal(), min(self.freq_list)) _correlation = cal_correlation(prod) _correlation = _correlation.fillna(0) if np.all(_correlation < 0.8): self.proposal_fund.append(proposal) max_len -= 1 add_strategy -= {proposal_strategy} if len(add_strategy) == 0 or max_len == 0: # if max_len == 0: break else: prod.drop(columns=proposal, inplace=True) return prod def optimize(self, ): origin_portfolio = self.get_portfolio() abandoned_portfolio = self.abandon(origin_portfolio) propose_portfolio = self.proposal(abandoned_portfolio) # propose_portfolio.to_csv('test_portfolio.csv', encoding='gbk') returns = propose_portfolio.pct_change().dropna() mu = expected_returns.mean_historical_return(propose_portfolio) S = risk_models.sample_cov(propose_portfolio) # if self.client_type == 1: # proposal_risk = [[x, get_risk_level(search_rank(fund_rank, x, metric='substrategy'))] for x in # self.proposal_fund] # self.proposal_fund = list(filter(lambda x: x[1] != 3, proposal_risk)) # proposal_portfolio = list((set(self.portfolio) - set(self.no_data_fund) - set(self.replace_pair.keys())) | \ # (set(self.proposal_fund) | set(self.replace_pair.values()))) propose_risk_mapper = dict() for fund in propose_portfolio.columns: propose_risk_mapper[fund] = str(get_risk_level(search_rank(fund_rank, fund, metric='substrategy'))) risk_upper = {"H": 0.0} risk_lower = {"L": 0.6, "M": 0.4} ef = EfficientFrontier(mu, S) ef.add_sector_constraints(propose_risk_mapper, risk_lower, risk_upper) # weights = ef.nonconvex_objective(deviation_risk_parity, ef.cov_matrix) ef.efficient_return(0.2) clean_weights = ef.clean_weights() # ef.portfolio_performance(verbose=True) self.new_weights = np.array(list(clean_weights.values())) # S = np.asmatrix(S) # w_origin = np.asarray([i for i in w_origin.values()]) # risk_target = np.asarray([1 / len(w_origin)] * len(w_origin)) # self.proposal_weights = calcu_w(w_origin, S, risk_target) # elif self.client_type == 2: # elif self.client_type == 3: # elif self.client_type == 4: # elif self.client_type == 5: # print(len(propose_portfolio.columns)) # # 单支基金占投资额的下界为 100W/投资总额 # # w_low = 1e6 / self.invest_amount # w_low = 0 # w_origin, S, mu = optim_drawdown(propose_portfolio, 0.5, [w_low, 1], min(self.freq_list)) # print(w_origin) # S = np.asmatrix(S) # w_origin = np.asarray([i for i in w_origin.values()]) # risk_target = np.asarray([1 / len(w_origin)] * len(w_origin)) # self.proposal_weights = calcu_w(w_origin, S, risk_target) def return_compare(self): index_data = get_index_daily(self.index_id) origin_portfolio = self.get_portfolio() abandoned_portfolio = self.abandon(origin_portfolio) propose_portfolio = self.proposal(abandoned_portfolio) index_data = pd.merge(index_data, propose_portfolio, how='inner', left_index=True, right_index=True) index_return = index_data.iloc[:, :] / index_data.iloc[0, :] - 1 # origin_fund_return = origin_portfolio.iloc[:, :] / origin_portfolio.iloc[0, :] - 1 propose_fund_return = propose_portfolio.iloc[:, :] / propose_portfolio.iloc[0, :] - 1 print(self.new_weights) propose_fund_return['return'] = propose_fund_return.T.iloc[:, :].apply(lambda x: np.dot(self.new_weights, x)) propose_fund_return.to_csv('new_port.csv', encoding='gbk') return index_return, propose_fund_return def old_evaluation(self): start_year = self.start_date.year start_month = self.start_date.month current_year = datetime.datetime.now().year current_month = datetime.datetime.now().month current_day = datetime.datetime.now().day past_month = (current_year - start_year) * 12 + current_month - start_month num_fund = len(self.portfolio) abandon_fund = [[x, self.invest_type] for x in self.abandon_fund] old_strategy = set([search_rank(fund_rank, x, metric='substrategy') for x in self.portfolio]) data = [start_year, start_month, past_month, self.invest_amount, current_year, current_month, current_day] return data def new_evaluation(self): hold_fund = set(self.portfolio) - set(self.abandon_fund) abandon_fund = self.abandon_fund proposal_fund = self.proposal_fund data = [hold_fund, abandon_fund, proposal_fund] return data def single_evaluation(self, fund_id): """ 1、该基金整体表现优秀/良好/一般,收益能力优秀/良好/合格/较差,回撤控制能力优秀/良好/合格/较差,风险收益比例较高/一般/较低; 2、在收益方面,该基金年化收益能力高于/持平/低于同类基金平均水平,有x%区间跑赢大盘/指数,绝对收益能力优秀/一般; 3、在风险方面,该基金抵御风险能力优秀/良好/一般,在同类基金中处于高/中/低等水平,最大回撤为x%,高于/持平/低于同类基金平均水平; 4、该基金收益较好/较差的同时回撤较大/较小,也就是说,该基金在用较大/较小风险换取较大/较小收益,存在较高/较低风险; 5、基金经理,投资年限5.23年,经验丰富;投资能力较强,生涯中共管理过X只基金,历任的X只基金平均业绩在同类中处于上游水平,其中x只排名在前x%;生涯年化回报率x%,同期大盘只有x%; 旧个基显示1-4,新个基显示1-5。 旧个基如果是要保留的,显示好的评价。 如果是要剔除的,显示坏的评价。 新个基只显示好的评价。 Args: fund_id: Returns: """ z_score = search_rank(fund_rank, fund_id, metric='z_score') total_level = np.select([z_score >= 80, 70 <= z_score < 80, z_score < 70], [0, 1, 2]).item() return_rank = search_rank(fund_rank, fund_id, metric='annual_return_rank') return_level = np.select([return_rank >= 0.8, 0.7 <= return_rank < 0.8, 0.6 <= return_rank < 0.7, return_rank < 0.6], [0, 1, 2, 3]).item() return_bool = 1 if return_level > 2 else 0 return_triple = return_level - 1 if return_level >= 2 else return_level drawdown_rank = search_rank(fund_rank, fund_id, metric='max_drawdown_rank') drawdown_value = search_rank(fund_rank, fund_id, metric='max_drawdown') drawdown_level = np.select([drawdown_rank >= 0.8, 0.7 <= drawdown_rank < 0.8, 0.6 <= drawdown_rank < 0.7, drawdown_rank < 0.6], [0, 1, 2, 3]).item() drawdown_bool = 1 if drawdown_level > 2 else 0 drawdown_triple = drawdown_level - 1 if drawdown_level >= 2 else drawdown_level sharp_rank = search_rank(fund_rank, fund_id, metric='sharp_ratio_rank') sharp_level = np.select([sharp_rank >= 0.8, 0.6 <= sharp_rank < 0.8, sharp_rank < 0.6], [0, 1, 2]).item() data = {1: [total_level, return_level, drawdown_level, sharp_level], 2: [return_triple, "TO DO", return_triple], 3: [drawdown_triple, drawdown_triple, drawdown_value, drawdown_triple], 4: [return_bool, drawdown_bool, drawdown_bool, return_bool, drawdown_bool]} if fund_id in self.abandon_fund: data['remove'] = True elif fund_id in self.proposal_fund: data[5] = [1] * 7 data['remove'] = False else: data['remove'] = False x = '30%' # 第一个评价 content = {1: [["优秀", "良好", "一般"], ["优秀", "良好", "合格", "较差"], ["优秀", "良好", "合格", "较差"], ["高", "一般", "较低"]], # 第二个评价 2: [["高于", "持平", "低于"], x, ["优秀", "一般"]], # 第三个评价 3: [["优秀", "良好", "一般"], ["高", "中", "低"], x, ["高于", "持平", "低于"]], # 第四个评价 4: [["较好", "较差"], ["较小", "较大"], ["较小", "较小"], ["较大", "较小"], ["较低", "较高"]], 5: [["TO DO"] * 7]} sentence = { 1: "1、该基金整体表现%s,收益能力%s,回撤控制能力%s,风险收益比例%s;\n", 2: "2、在收益方面,该基金年化收益能力%s同类基金平均水平,有%s区间跑赢指数,绝对收益能力%s;\n", 3: "3、在风险方面,该基金抵御风险能力%s,在同类基金中处于%s等水平,最大回撤为%s,%s同类基金平均水平;\n", 4: "4、该基金收益%s的同时回撤%s,也就是说,该基金在用%s风险换取%s收益,存在%s风险;\n", 5: "5、基金经理,投资年限%s年,经验丰富;投资能力较强,生涯中共管理过%s只基金,历任的%s只基金平均业绩在同类中处于上游水平,其中%s只排名在前%s;生涯年化回报率%s,同期大盘只有%s;"} remove = data["remove"] del data["remove"] # 不剔除,选择好的话术 if not remove: evaluation = choose_good_evaluation(data) # 剔除,选择坏的话术 else: evaluation = choose_bad_evaluation(data) print(evaluation) ret = "" for k, v in evaluation.items(): # print(translate_single(content[k], v)) ret = ret + sentence[k] % translate_single(content[k], v) return {fund_id: ret} def old_portfolio_evaluation(self, ): result = [] for fund in self.portfolio: result.append(self.single_evaluation(fund)) return result def propose_fund_evaluation(self, ): result = [] for fund in self.proposal_fund: result.append(self.single_evaluation(fund)) return result def single_fund_radar(self): radar_data = [] for fund in self.portfolio: try: radar_data.append(get_radar_data(fund)) except IndexError: continue return radar_data def propose_fund_radar(self): radar_data = [] for fund in self.proposal_fund: radar_data.append(get_radar_data(fund)) return radar_data portfolio = ['HF00002JJ2', 'HF00005DBQ', 'HF0000681Q', 'HF00006693', 'HF00006AZF', 'HF00006BGS'] portfolio_diagnose = PortfolioDiagnose(client_type=1, portfolio=portfolio, invest_amount=10000000) portfolio_diagnose.optimize() if __name__ == '__main__': print(portfolio_diagnose.old_correlation) # print(portfolio_diagnose.propose_fund_evaluation())