Commit 416fd0a4 authored by 李宗熹's avatar 李宗熹

客观参数

parent 15325da2
# -*- coding: UTF-8 -*-
"""
@author: Zongxi.Li
@file:portfolio_diagnose.py
@time:2020/12/07
"""
import warnings
warnings.filterwarnings("ignore")
from app.utils.fund_rank import *
from app.utils.risk_parity import *
from app.pypfopt import risk_models
from app.pypfopt import expected_returns
from app.pypfopt import EfficientFrontier
from app.api.engine import tamp_product_engine, tamp_fund_engine, TAMP_SQL
def cal_correlation(prod):
"""计算组合内基金相关性
Args:
prod: 组合净值表:索引为日期,列名为基金ID, 内容为净值
Returns:屏蔽基金与自身相关性的相关矩阵,因为基金与自身相关性为1,妨碍后续高相关性基金筛选的判断
"""
prod_return = prod.iloc[:, :].apply(lambda x: simple_return(x).astype(float))
correlation = prod_return.corr()
correlation = correlation.round(2)
return correlation.mask(np.eye(correlation.shape[0], dtype=np.bool_))
def rename_col(df, fund_id):
"""将列名由adj_nav改为基金ID
Args:
df: 原始净值表:索引为日期,列名分别为 ”fund_id“, "adj_nav", 内容为[基金ID,净值]
fund_id: 基金ID
Returns:删除 ”fund_id” 列, 重命名 “adj_nav” 列为基金ID的净值表
"""
df.rename(columns={'adj_nav': fund_id}, inplace=True)
df.drop('fund_id', axis=1, inplace=True)
return df
def replace_fund(manager, substrategy, fund_rank):
"""查找不足半年数据的基金的替代基金
Args:
manager: 基金经理ID
substrategy: 基金二级策略
fund_rank: 基金打分排名表
Returns: 满足相同基金经理ID下的同种二级策略的基金ID的第一个结果
"""
df = fund_rank[(fund_rank['manager'] == manager) &
(fund_rank['substrategy'] == substrategy)]
return df['fund_id'].values[0]
def search_rank(fund_rank, fund, metric):
"""查找基金在基金排名表中的指标
Args:
fund_rank: 基金排名表
fund: 输入基金ID
metric: 查找的指标名称
Returns: 基金指标的值
"""
return fund_rank[fund_rank['fund_id'] == fund][metric].values[0]
def translate_single(content, content_id, evaluation):
'''
content = [["优秀","良好","一般"],
["优秀","良好","合格","较差"],
["优秀","良好","合格","较差"],
["高","一般","较低"]]
evaluation = [0,1,1,2]
'''
ret = []
for i, v in enumerate(evaluation):
if isinstance(v, str):
ret.append(v)
continue
elif content[content_id][i][v] in ["优秀", "良好", "高", "高于", "较好"]:
ret.append("""<span class="self_description_red">{}</span>""".format(content[content_id][i][v]))
continue
elif content_id == 4 and v == 0:
ret.append("""<span class="self_description_red">{}</span>""".format(content[content_id][i][v]))
continue
else:
ret.append("""<span class="self_description_green">{}</span>""".format(content[content_id][i][v]))
return tuple(ret)
def choose_good_evaluation(evaluation):
"""抽取好的评价
Args:
evaluation: 个基的评价
Returns: 个基好的评价
"""
v1 = evaluation[1]
v2 = evaluation[2]
v3 = evaluation[3]
v4 = evaluation[4]
v5 = evaluation.get(5)
if v1[0] > 1:
del evaluation[1]
if v2[0] > 1 and float(v2[1].strip('%')) <= 60:
del evaluation[2]
if v3[0] > 1:
del evaluation[3]
if v4[0] != 0 or v4[1] != 0:
del evaluation[4]
# if v5[0] < 3 or v5[2] > 1: # 基金经理的基金管理年限小于三年或平均业绩处于中下水平
if v5:
del evaluation[5]
return evaluation
def choose_bad_evaluation(evaluation):
v1 = evaluation[1]
v2 = evaluation[2]
v3 = evaluation[3]
v4 = evaluation[4]
if v1[0] < 2:
del evaluation[1]
if v2[0] < 2:
del evaluation[2]
if v3[0] < 2:
del evaluation[3]
if v4[0] != 1 or v4[1] != 1:
del evaluation[4]
return evaluation
def get_fund_rank():
"""获取基金指标排名
:return: 基金指标排名表
"""
with TAMP_SQL(tamp_fund_engine) as tamp_fund:
tamp_fund_session = tamp_fund.session
sql = "SELECT * FROM new_fund_rank"
# df = pd.read_sql(sql, con)
# df = pd.read_csv('fund_rank.csv', encoding='gbk')
cur = tamp_fund_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['index', 'fund_id', 'range_return', 'annual_return', 'max_drawdown',
'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk',
'substrategy', 'manager', 'annual_return_rank', 'downside_risk_rank',
'max_drawdown_rank', 'sharp_ratio_rank', 'z_score'])
df.drop('index', axis=1, inplace=True)
return df
def get_index_daily(index_id, start_date):
"""获取指数日更数据
Args:
index_id: 指数ID
start_date: 数据开始时间
Returns:与组合净值形式相同的表
"""
with TAMP_SQL(tamp_fund_engine) as tamp_product:
tamp_product_session = tamp_product.session
sql = "SELECT ts_code, trade_date, close FROM index_daily " \
"WHERE ts_code='{}' AND trade_date>'{}'".format(index_id, start_date)
# df = pd.read_sql(sql, con).dropna(how='any')
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['ts_code', 'trade_date', ' close'])
df.rename({'ts_code': 'fund_id', 'trade_date': 'end_date', 'close': 'adj_nav'}, axis=1, inplace=True)
df['end_date'] = pd.to_datetime(df['end_date'])
df.set_index('end_date', drop=True, inplace=True)
df.sort_index(inplace=True, ascending=True)
df = rename_col(df, index_id)
return df
def get_index_monthly(index_id, start_date):
"""获取指数月度数据
Args:
index_id: 指数ID
start_date: 数据开始时间
Returns:与组合净值形式相同的表
"""
with TAMP_SQL(tamp_fund_engine) as tamp_fund:
tamp_fund_session = tamp_fund.session
sql = "SELECT ts_code, trade_date, pct_chg FROM index_monthly " \
"WHERE ts_code='{}' AND trade_date>'{}'".format(index_id, start_date)
# df = pd.read_sql(sql, con).dropna(how='any')
cur = tamp_fund_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['fund_id', 'end_date', 'pct_chg'])
df['end_date'] = pd.to_datetime(df['end_date'])
df.set_index('end_date', drop=True, inplace=True)
df.sort_index(inplace=True, ascending=True)
df = rename_col(df, index_id)
return df
def get_tamp_fund():
"""获取探普产品池净值表
Returns:
"""
with TAMP_SQL(tamp_fund_engine) as tamp_fund:
tamp_fund_session = tamp_fund.session
sql = "SELECT id FROM tamp_fund_info WHERE id LIKE 'HF%'"
cur = tamp_fund_session.execute(sql)
data = cur.fetchall()
# df = pd.read_sql(sql, con)
df = pd.DataFrame(list(data), columns=['fund_id'])
# df.rename({'id': 'fund_id'}, axis=1, inplace=True)
return df
def get_tamp_nav(fund, start_date, rollback=False, invest_type='public'):
"""获取基金ID为fund, 起始日期为start_date, 终止日期为当前日期的基金净值表
Args:
fund[str]:基金ID
start_date[date]:起始日期
rollback[bool]:当起始日期不在净值公布日历中,是否往前取最近的净值公布日
public[bool]:是否为公募
Returns:df[DataFrame]: 索引为净值公布日, 列为复权净值的净值表; 查询失败则返回None
"""
with TAMP_SQL(tamp_product_engine) as tamp_product:
tamp_product_session = tamp_product.session
if invest_type == "private":
sql = "SELECT fund_id, price_date, cumulative_nav FROM fund_nav " \
"WHERE fund_id='{}'".format(fund)
# df = pd.read_sql(sql, con).dropna(how='any')
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(data, columns=['fund_id', 'price_date', 'cumulative_nav']).dropna(how='any')
df.rename({'price_date': 'end_date', 'cumulative_nav': 'adj_nav'}, axis=1, inplace=True)
# if df2['adj_nav'].count() == 0:
# logging.log(logging.ERROR, "CAN NOT FIND {}".format(fund))
# return None
df['end_date'] = pd.to_datetime(df['end_date'])
if rollback and df['end_date'].min() < start_date < df['end_date'].max():
while start_date not in list(df['end_date']):
start_date -= datetime.timedelta(days=1)
df = df[df['end_date'] >= start_date]
df.drop_duplicates(subset='end_date', inplace=True, keep='first')
df.set_index('end_date', inplace=True)
df.sort_index(inplace=True, ascending=True)
return df
def get_risk_level(substrategy):
"""获取风险类型
Args:
substrategy: 二级策略
Returns:
"""
substrategy2risk = {1: "H",
1010: "H", 1020: "H", 1030: "H",
2010: "H",
3010: "H", 3020: "L", 3030: "H", 3040: "L", 3050: "M",
4010: "M", 4020: "M", 4030: "M", 4040: "M",
5010: "M", 5020: "L", 5030: "M",
6010: "L", 6020: "M", 6030: "L",
7010: "H", 7020: "H",
8010: "H", 8020: "M"}
return substrategy2risk[substrategy]
def get_radar_data(fund):
df = fund_rank[fund_rank['fund_id'] == fund]
return_score = df['annual_return_rank'].values[0] * 100
downside_score = df['downside_risk_rank'].values[0] * 100
drawdown_score = df['max_drawdown_rank'].values[0] * 100
sharpe_score = df['sharp_ratio_rank'].values[0] * 100
total_score = df['z_score'].values[0]
fund_name = get_fund_name(fund).values[0][0]
return {'name': fund_name, 'data': [{'name': '绝对收益', 'data': '%.2f' % return_score},
{'name': '抗风险能力', 'data': '%.2f' % downside_score},
{'name': '极端风险', 'data': '%.2f' % drawdown_score},
{'name': '风险调整后收益', 'data': '%.2f' % sharpe_score},
{'name': '业绩持续性', 'data': '%.2f' % np.random.randint(70, 90)},
{'name': '综合评分', 'data': '%.2f' % total_score}]}
def get_fund_name(fund):
with TAMP_SQL(tamp_fund_engine) as tamp_fund:
tamp_fund_session = tamp_fund.session
sql = "SELECT fund_short_name FROM fund_info WHERE id='{}'".format(fund)
# df = pd.read_sql(sql, con)
cur = tamp_fund_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['fund_short_name'])
return df
# 获取排名信息
fund_rank = get_fund_rank()
# 获取探普产品池
tamp_fund = get_tamp_fund()
class PortfolioDiagnose(object):
def __init__(self, client_type, portfolio, invest_amount, expect_return=0.15,
expect_drawdown=0.15, index_id='000905.SH', invest_type='private', start_date=None, end_date=None):
"""基金诊断
Args:
client_type: 客户类型:1:保守型, 2:稳健型, 3:平衡型, 4:成长型, 5:进取型
portfolio: 投资组合:[基金1, 基金2, 基金3...]
invest_amount: 投资金额:10000000元
expect_return: 期望收益
expect_drawdown: 期望回撤
index_id: 指数ID
invest_type: 投资类型:public, private, ...
start_date: 诊断所需净值的开始日期
end_date: 诊断所需净值的结束日期
"""
self.freq_list = []
self.client_type = client_type
self.portfolio = portfolio
self.expect_return = expect_return
self.expect_drawdown = expect_drawdown
self.index_id = index_id
self.invest_amount = invest_amount
self.invest_type = invest_type
self.start_date = start_date
self.end_date = end_date
if self.end_date is None:
self.end_date = datetime.datetime(datetime.date.today().year,
datetime.date.today().month, 1) - datetime.timedelta(1)
self.start_date = cal_date(self.end_date, 'Y', 1)
self.replace_pair = dict() # 由于数据不足半年而被替换为相同基金经理和策略的原基金和替换基金的映射
self.no_data_fund = [] # 未在数据库中找到基金净值或者基金经理记录的基金
self.abandon_fund_score = [] # 打分不满足要求的基金
self.abandon_fund_corr = [] # 相关性过高
self.proposal_fund = [] # 建议的基金
self.old_correlation = None
self.new_correlation = None
self.old_weights = None
self.new_weights = None
self.origin_portfolio = None
self.abandoned_portfolio = None
self.propose_portfolio = None
def get_portfolio(self, ):
"""获取组合净值表
Returns:
"""
# 获取原始投资组合的第一支基金的净值表
prod = get_tamp_nav(self.portfolio[0], self.start_date, invest_type=self.invest_type)
fund_info = get_fund_info(self.end_date, invest_type=self.invest_type)
# while prod is None or prod.index[-1] - prod.index[0] < 0.6 * (self.end_date - self.start_date):
while prod is None:
# 获取的净值表为空时首先考虑基金净值数据不足半年,查找同一基金经理下的相同二级策略的基金ID作替换
result = fund_info[fund_info['fund_id'] == self.portfolio[0]]
manager = str(result['manager'].values)
strategy = result['substrategy'].values
replaced_fund = replace_fund(manager, strategy, fund_rank)
if replaced_fund:
# 替换基金数据非空则记录替换的基金对
prod = get_nav(replaced_fund, self.start_date, invest_type=self.invest_type)
self.replace_pair[self.portfolio[0]] = replaced_fund
else:
# 替换基金数据为空则记录当前基金为找不到数据的基金, 继续尝试获取下一个基金ID的净值表
self.no_data_fund.append(self.portfolio[0])
self.portfolio.pop(0)
prod = get_tamp_nav(self.portfolio[0], self.start_date, invest_type=self.invest_type)
# 记录基金的公布频率
self.freq_list.append(get_frequency(prod))
prod = rename_col(prod, self.portfolio[0])
# 循环拼接基金净值表构建组合
for idx in range(len(self.portfolio) - 1):
prod1 = get_tamp_nav(self.portfolio[idx + 1], self.start_date, invest_type=self.invest_type)
# if prod1 is None or prod1.index[-1] - prod1.index[0] < 0.6 * (self.end_date - self.start_date):
if prod1 is None:
result = fund_info[fund_info['fund_id'] == self.portfolio[idx + 1]]
if result['fund_manager_id'].count() != 0:
manager = str(result['fund_manager_id'].values)
substrategy = result['substrategy'].values[0]
replaced_fund = replace_fund(manager, substrategy, fund_rank)
else:
self.no_data_fund.append(self.portfolio[idx + 1])
continue
if replaced_fund:
prod1 = get_nav(replaced_fund, self.start_date, invest_type=self.invest_type)
self.replace_pair[self.portfolio[idx + 1]] = replaced_fund
self.freq_list.append(get_frequency(prod1))
prod1 = rename_col(prod1, replaced_fund)
else:
self.no_data_fund.append(self.portfolio[idx + 1])
continue
else:
self.freq_list.append(get_frequency(prod1))
prod1 = rename_col(prod1, self.portfolio[idx + 1])
# 取prod表和prod1表的并集
prod = pd.merge(prod, prod1, on=['end_date'], how='outer')
# 对所有合并后的基金净值表按最大周期进行重采样
prod.sort_index(inplace=True)
prod.ffill(inplace=True)
prod = resample(prod, get_trade_cal(), min(self.freq_list))
prod.dropna(how='any', inplace=True)
return prod
def abandon(self, prod):
"""建议替换的基金
Args:
prod: 原始组合净值表
Returns: 剔除建议替换基金的组合净值表
"""
self.old_correlation = cal_correlation(prod)
for fund in prod.columns:
z_score = search_rank(fund_rank, fund, metric='z_score')
# 建议替换得分为60或与其他基金相关度大于0.8的基金
if z_score < 60:
self.abandon_fund_score.append(fund)
continue
elif np.any(self.old_correlation[fund] > 0.8):
self.abandon_fund_corr.append(fund)
prod = prod.drop(self.abandon_fund_score + self.abandon_fund_corr, axis=1)
self.old_correlation = self.old_correlation.fillna(1).round(2)
self.old_correlation.columns = self.old_correlation.columns.map(lambda x: get_fund_name(x).values[0][0])
self.old_correlation.index = self.old_correlation.index.map(lambda x: get_fund_name(x).values[0][0])
return prod
def proposal(self, prod):
"""建议申购基金
Args:
prod: 剔除建议替换基金的组合净值表
Returns: 增加建议申购基金的组合净值表
"""
# 组合内已包含的策略
# included_strategy = set()
# 按每种基金最少投资100w确定组合包含的最大基金数量
max_len = len(self.portfolio) - len(prod.columns)
# 排名表内包含的所有策略
# all_strategy = set(fund_rank['substrategy'].to_list())
all_risk = {"H", "M", "L"}
included_risk = {}
if prod is not None:
# included_strategy = set([search_rank(fund_rank, fund, metric='substrategy') for fund in prod.columns])
included_risk = set([get_risk_level(search_rank(fund_rank, fund, metric='substrategy'))
for fund in prod.columns])
# 待添加策略为所有策略-组合已包含策略
# add_strategy = all_strategy - included_strategy
add_risk = all_risk - included_risk
# 遍历产品池,推荐得分>80且与组合内其他基金相关度低于0.8的属于待添加策略的基金
for proposal in tamp_fund['fund_id']:
if proposal in fund_rank['fund_id'].to_list() and proposal not in prod.columns:
proposal_z_score = search_rank(fund_rank, proposal, metric='z_score')
proposal_strategy = fund_rank[fund_rank['fund_id'] == proposal]['substrategy'].values[0]
else:
continue
if proposal_z_score > 60 and (get_risk_level(proposal_strategy) in add_risk or not add_risk):
# if proposal_z_score > 80:
proposal_nav = get_tamp_nav(proposal, self.start_date, invest_type=self.invest_type)
# 忽略净值周期大于周更的产品
# if get_frequency(proposal_nav) <= 52:
# continue
self.freq_list.append(get_frequency(proposal_nav))
proposal_nav = rename_col(proposal_nav, proposal)
# 按最大周期进行重采样,计算新建组合的相关性
prod = pd.merge(prod, proposal_nav, how='outer', on='end_date').astype(float)
prod.sort_index(inplace=True)
prod.ffill(inplace=True)
prod = resample(prod, get_trade_cal(), min(self.freq_list))
self.new_correlation = cal_correlation(prod)
judge_correlation = self.new_correlation.fillna(0)
if np.all(judge_correlation < 0.8):
self.proposal_fund.append(proposal)
max_len -= 1
# add_strategy -= {proposal_strategy}
add_risk -= {get_risk_level(proposal_strategy)}
# if len(add_strategy) == 0 or max_len == 0:
if max_len == 0:
break
else:
prod.drop(columns=proposal, inplace=True)
prod.bfill(inplace=True)
self.new_correlation = self.new_correlation.fillna(1).round(2)
self.new_correlation.columns = self.new_correlation.columns.map(lambda x: get_fund_name(x).values[0][0])
self.new_correlation.index = self.new_correlation.index.map(lambda x: get_fund_name(x).values[0][0])
return prod
def optimize(self, ):
import time
start = time.time()
self.origin_portfolio = self.get_portfolio()
end1 = time.time()
print("原始组合数据获取时间:", end1 - start)
self.abandoned_portfolio = self.abandon(self.origin_portfolio)
end2 = time.time()
print("计算换仓基金时间:", end2 - end1)
self.propose_portfolio = self.proposal(self.abandoned_portfolio)
end3 = time.time()
print("遍历产品池获取候选推荐时间:", end3 - end2)
# propose_portfolio.to_csv('test_portfolio.csv', encoding='gbk')
mu = expected_returns.mean_historical_return(self.propose_portfolio, frequency=min(self.freq_list))
S = risk_models.sample_cov(self.propose_portfolio, frequency=min(self.freq_list))
dd = expected_returns.drawdown_from_prices(self.propose_portfolio)
# if self.client_type == 1:
# proposal_risk = [[x, get_risk_level(search_rank(fund_rank, x, metric='substrategy'))] for x in
# propose_portfolio.columns]
# self.proposal_fund = list(filter(lambda x: x[1] != 'H', proposal_risk))
# drop_fund_list = list(filter(lambda x: x[1] = 'H', proposal_risk))
# proposal_portfolio = list((set(self.portfolio) - set(self.no_data_fund) - set(self.replace_pair.keys())) | \
# (set(self.proposal_fund) | set(self.replace_pair.values())))
# propose_portfolio.drop()
propose_risk_mapper = dict()
for fund in self.propose_portfolio.columns:
propose_risk_mapper[fund] = str(get_risk_level(search_rank(fund_rank, fund, metric='substrategy')))
if self.client_type == 1:
risk_upper = {"L": 0.6, "M": 0.4, "H": 0.0}
risk_lower = {"L": 0.6, "M": 0.4, "H": 0.0}
elif self.client_type == 2:
risk_upper = {"L": 0.5, "M": 0.3, "H": 0.2}
risk_lower = {"L": 0.5, "M": 0.3, "H": 0.2}
elif self.client_type == 3:
risk_upper = {"L": 0.3, "M": 0.5, "H": 0.2}
risk_lower = {"L": 0.3, "M": 0.5, "H": 0.2}
elif self.client_type == 4:
risk_upper = {"L": 0.3, "M": 0.4, "H": 0.3}
risk_lower = {"L": 0.3, "M": 0.4, "H": 0.3}
elif self.client_type == 5:
risk_upper = {"L": 0.0, "M": 0.5, "H": 0.5}
risk_lower = {"L": 0.0, "M": 0.5, "H": 0.5}
else:
risk_upper = {"H": 1.0}
risk_lower = {"L": 0.0}
raise ValueError
w_low = 1000000 / self.invest_amount
try:
ef = EfficientFrontier(mu, S, weight_bounds=[w_low, 1], expected_drawdown=dd)
# ef = EfficientFrontier(mu, S, weight_bounds=[0, 1], expected_drawdown=dd)
# ef.add_sector_constraints(propose_risk_mapper, risk_lower, risk_upper)
ef.efficient_return(target_return=self.expect_return, target_drawdown=self.expect_drawdown)
clean_weights = ef.clean_weights()
ef.portfolio_performance(verbose=True)
self.new_weights = np.array(list(clean_weights.values()))
except Exception as e:
print(e)
self.new_weights = np.asarray([1/len(self.propose_portfolio.columns)] * len(self.propose_portfolio.columns))
print(self.new_weights)
end4 = time.time()
print("模型计算一次时间:", end4 - end3)
# S = np.asmatrix(S)
# w_origin = np.asarray([i for i in w_origin.values()])
# risk_target = np.asarray([1 / len(w_origin)] * len(w_origin))
# self.proposal_weights = calcu_w(w_origin, S, risk_target)
# elif self.client_type == 2:
# elif self.client_type == 3:
# elif self.client_type == 4:
# elif self.client_type == 5:
# print(len(propose_portfolio.columns))
# # 单支基金占投资额的下界为 100W/投资总额
# # w_low = 1e6 / self.invest_amount
# w_low = 0
# w_origin, S, mu = optim_drawdown(propose_portfolio, 0.5, [w_low, 1], min(self.freq_list))
# print(w_origin)
# S = np.asmatrix(S)
# w_origin = np.asarray([i for i in w_origin.values()])
# risk_target = np.asarray([1 / len(w_origin)] * len(w_origin))
# self.proposal_weights = calcu_w(w_origin, S, risk_target)
def return_compare(self):
index_data = get_index_daily(self.index_id, self.start_date)
index_data = pd.merge(index_data, self.propose_portfolio, how='inner', left_index=True, right_index=True)
index_return = index_data.iloc[:, :] / index_data.iloc[0, :] - 1
# origin_fund_return = origin_portfolio.iloc[:, :] / origin_portfolio.iloc[0, :] - 1
propose_fund_return = self.propose_portfolio.iloc[:, :] / self.propose_portfolio.iloc[0, :] - 1
propose_fund_return['return'] = propose_fund_return.T.iloc[:, :].apply(lambda x: np.dot(self.new_weights, x))
return index_return, propose_fund_return
def old_evaluation(self, group_name, group_result, data_adaptor):
start_year = data_adaptor.start_date.year
start_month = data_adaptor.start_date.month
current_year = data_adaptor.end_date.year
current_month = data_adaptor.end_date.month
current_day = data_adaptor.end_date.day
past_month = (current_year - start_year) * 12 + current_month - start_month
# 投入成本(万元)
input_cost = round(group_result[group_name]["total_cost"] / 10000, 2)
# 整体盈利(万元)
total_profit = round(group_result[group_name]["cumulative_profit"] / 10000, 2)
# 整体表现 回撤能力
fund_rank_data = fund_rank[fund_rank["fund_id"].isin(self.portfolio)]
z_score = fund_rank_data["z_score"].mean()
drawdown_rank = fund_rank_data["max_drawdown_rank"].mean()
return_rank_df = fund_rank_data["annual_return_rank"]
z_score_level = np.select([z_score >= 80,
70 <= z_score < 80,
z_score < 70], [0, 1, 2]).item()
drawdown_level = np.select([drawdown_rank >= 0.8,
0.7 <= drawdown_rank < 0.8,
0.6 <= drawdown_rank < 0.7,
drawdown_rank < 0.6], [0, 1, 2, 3]).item()
# 收益稳健
fund_rank_re = fund_rank_data[fund_rank_data["annual_return_rank"] > 0.8]
return_rank_evaluate = ""
if len(fund_rank_re) > 0:
num = len(fund_rank_re)
fund_id_rank_list = list(fund_rank_re["fund_id"])
for f_id in fund_id_rank_list:
name = data_adaptor.user_customer_order_df[data_adaptor.user_customer_order_df["fund_id"] == f_id][
"fund_name"].values[0]
return_rank_evaluate = return_rank_evaluate + name + "、"
return_rank_evaluate = return_rank_evaluate[:-1] + "等" + str(num) + "只产品稳健,对组合的收益率贡献明显,"
# 正收益基金数量
group_hold_data = pd.DataFrame(group_result[group_name]["group_hoding_info"])
profit_positive_num = group_hold_data[group_hold_data["profit"] > 0]["profit"].count()
if profit_positive_num > 0:
profit_positive_evaluate = str(profit_positive_num) + "只基金取的正收益,"
else:
profit_positive_evaluate = ""
# 综合得分较低数量
abandon_num = len(self.abandon_fund_score)
abandon_evaluate = str(abandon_num) + "只基金综合得分较低建议更换,"
# 成立时间短
if len(self.no_data_fund) > 0:
no_data_fund_evaluate = str(len(self.no_data_fund)) + "只基金因为成立时间较短,暂不做评价;"
else:
no_data_fund_evaluate = ";"
group_order_df = data_adaptor.user_customer_order_df[
data_adaptor.user_customer_order_df["folio_name"] == group_name]
strategy_list = group_order_df["substrategy"]
uniqe_strategy = list(strategy_list.unique())
uniqe_strategy_name = [dict_substrategy[int(x)] + "、" for x in uniqe_strategy]
# 覆盖的基金名称
strategy_name_evaluate = "".join(uniqe_strategy_name)[:-1]
if len(uniqe_strategy) / float(len(strategy_list)) > 0.6:
strategy_distribution_evaluate = "策略上有一定分散"
else:
strategy_distribution_evaluate = "策略分散程度不高"
# 相关性
if len(self.abandon_fund_corr) > 0:
fund_corr_name = [str(group_order_df[group_order_df["fund_id"] == f_id]["fund_name"].values[0]) + "和" for
f_id in self.abandon_fund_corr]
fund_corr_evaluate = "".join(fund_corr_name)[:-1] + "相关性较高,建议调整组合配比;"
else:
fund_corr_evaluate = ";"
num_fund = len(self.portfolio)
evaluate_enum = [["优秀", "良好", "一般"],
["优秀", "良好", "合格", "较差"]]
z_score_evaluate = evaluate_enum[0][z_score_level]
drawdown_evaluate = evaluate_enum[1][drawdown_level]
if z_score_evaluate in ["优秀", "良好"]:
z_score_evaluate = """<span class="self_description_red">{}</span>""".format(z_score_evaluate)
else:
z_score_evaluate = """<span class="self_description_green">{}</span>""".format(z_score_evaluate)
if drawdown_evaluate in ["优秀", "良好"]:
drawdown_evaluate = """<span class="self_description_red">{}</span>""".format(drawdown_evaluate)
else:
drawdown_evaluate = """<span class="self_description_green">{}</span>""".format(drawdown_evaluate)
sentence = {
1: "1、组合构建于{}年{}月,至今已运行{}个月。投入成本为{}万元,截止{}年{}月{}日,整体盈利{}万元,整体表现{},回撤控制能力{};\n",
2: "2、组合共持有{}只基金,{}{}{}{}\n",
3: "3、策略角度来看,组合涵盖了{}, {}{}\n"
}
data = {1: [start_year, start_month, past_month, input_cost, current_year, current_month, current_day,
total_profit, z_score_evaluate, drawdown_evaluate],
2: [num_fund, return_rank_evaluate, profit_positive_evaluate, abandon_evaluate, no_data_fund_evaluate],
3: [strategy_name_evaluate, strategy_distribution_evaluate, fund_corr_evaluate]
}
ret = []
for k, v in data.items():
ret.append(sentence[k].format(*data[k]).replace(",;", ";"))
return ret
def new_evaluation(self, group_name, group_result, data_adaptor):
try:
group_result_data = group_result[group_name]
hold_info = group_result_data["group_hoding_info"]
hold_info_df = pd.DataFrame(hold_info)
group_order_df = data_adaptor.user_customer_order_df[
data_adaptor.user_customer_order_df["folio_name"] == group_name]
group_order_start_date = pd.to_datetime(group_order_df["confirm_share_date"].min())
# 原组合总市值, 区间收益, 年化收益, 波动率, 最大回撤, 夏普比率
total_asset = round(hold_info_df["market_values"].sum(), 2)
old_return = group_result_data["cumulative_return"]
old_return_ratio_year = group_result_data["return_ratio_year"]
old_volatility = group_result_data["volatility"]
old_max_drawdown = group_result_data["max_drawdown"]
old_sharpe = group_result_data["sharpe"]
# 建议基金数据
index_return, propose_fund_return = self.return_compare()
propose_fund_id_list = list(propose_fund_return.columns)
propose_fund_id_list.remove("return")
with TAMP_SQL(tamp_product_engine) as tamp_product:
tamp_product_session = tamp_product.session
sql_product = "select distinct `id`, `fund_short_name`, `nav_frequency`, `substrategy` from `fund_info`"
cur = tamp_product_session.execute(sql_product)
data = cur.fetchall()
product_df = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'freq', 'substrategy'])
propose_fund_df = product_df[product_df["fund_id"].isin(propose_fund_id_list)]
# 基金名称,策略分级
propose_fund_id_name_list = [propose_fund_df[propose_fund_df["fund_id"] == fund_id]["fund_name"].values[0] for
fund_id in propose_fund_id_list]
propose_fund_id_strategy_name_list = [dict_substrategy[int(propose_fund_df[propose_fund_df["fund_id"] == fund_id]["substrategy"].values[0])] for
fund_id in propose_fund_id_list]
propose_fund_asset = [round(self.new_weights[i] * total_asset, 2) for i in range(len(propose_fund_id_name_list))]
propose_info = {propose_fund_id_strategy_name_list[i]:
{"fund_name": propose_fund_id_name_list[i],
"substrategy": propose_fund_id_strategy_name_list[i],
"asset": propose_fund_asset[i]}
for i in range(len(propose_fund_id_list))}
# 调仓建议
suggestions_result = {}
old_hold_fund_name_list = list(hold_info_df["fund_name"])
for hold in hold_info:
suggestions = {}
if hold["fund_strategy_name"] not in suggestions_result.keys():
suggestions_result[hold["fund_strategy_name"]] = {}
suggestions["fund_strategy_name"] = hold["fund_strategy_name"]
suggestions["fund_name"] = hold["fund_name"]
suggestions["before_optimization"] = hold["market_values"]
suggestions["after_optimization"] = 0
if suggestions["fund_strategy_name"] in propose_fund_id_strategy_name_list:
suggestions["after_optimization"] = 0
suggestions_result[hold["fund_strategy_name"]][suggestions["fund_name"]] = suggestions
for key, value in propose_info.items():
if value["fund_name"] not in old_hold_fund_name_list:
suggestions = {}
if key not in suggestions_result.keys():
suggestions_result[key] = {}
suggestions["fund_strategy_name"] = value["substrategy"]
suggestions["fund_name"] = value["fund_name"]
suggestions["before_optimization"] = 0
suggestions["after_optimization"] = value["asset"]
suggestions_result[key][suggestions["fund_name"]] = suggestions
for key, value in suggestions_result.items():
suggestions_result[key] = list(value.values())
suggestions_result_asset = {"before": total_asset, "after": total_asset}
# 旧组合累积收益df
old_return_df = group_result_data["return_df"]
old_return_df["cum_return_ratio"] = old_return_df["cum_return_ratio"] - 1
# 新组合累积收益df
propose_fund_return_limit_data = propose_fund_return[propose_fund_return.index >= group_order_start_date]
start_return = propose_fund_return_limit_data['return'].values[0]
propose_fund_return_limit_data["new_return"] = (propose_fund_return_limit_data["return"] - start_return)/(1+start_return)
# 新组合累积收益
new_return_ratio = propose_fund_return_limit_data["new_return"].values[-1]
# 新组合区间年化收益率
freq_max = group_order_df["freq"].max()
n_freq = freq_days(int(freq_max))
new_return_ratio_year = annual_return(propose_fund_return_limit_data["new_return"].values[-1], propose_fund_return_limit_data, n_freq)
# 新组合波动率
new_volatility = volatility(propose_fund_return_limit_data["new_return"]+1, n_freq)
# 新组合最大回撤
new_drawdown = max_drawdown(propose_fund_return_limit_data["new_return"]+1)
# 新组合夏普比率
sim = simple_return(propose_fund_return_limit_data["new_return"]+1)
exc = excess_return(sim, BANK_RATE, n_freq)
new_sharpe = sharpe_ratio(exc, sim, n_freq)
# 指数收益
index_return = index_return[index_return.index >= group_order_start_date]
start_index_return = index_return[" close"].values[0]
index_return["new_index_return"] = (index_return[" close"] - start_index_return) / (1 + start_index_return)
index_return_ratio = index_return["new_index_return"].values[-1]
index_return_ratio_year = annual_return(index_return["new_index_return"].values[-1], index_return["new_index_return"], n_freq)
index_volatility = volatility(index_return["new_index_return"]+1, n_freq)
index_drawdown = max_drawdown(index_return["new_index_return"]+1)
index_sim = simple_return(propose_fund_return_limit_data["new_return"]+1)
index_exc = excess_return(index_sim, BANK_RATE, n_freq)
index_sharpe = sharpe_ratio(index_exc, index_sim, n_freq)
# 收益对比数据
return_compare_df = pd.merge(index_return[["new_index_return"]], old_return_df[["cum_return_ratio"]], right_index=True,
left_index=True)
return_compare_df = pd.merge(return_compare_df, propose_fund_return_limit_data["new_return"], right_index=True,
left_index=True)
return_compare_df["date"] = return_compare_df.index
return_compare_df["date"] = return_compare_df["date"].apply(lambda x: x.strftime("%Y-%m-%d"))
return_compare_df.iloc[1:-1,:]["date"] = ""
return_compare_result = {
"new_combination": {"name": "新组合", "data": return_compare_df["new_return"].values},
"index": {"name": "中证500", "data": return_compare_df["new_index_return"].values},
"origin_combination": {"name": "原组合", "data": return_compare_df["cum_return_ratio"].values},
"xlabels": return_compare_df["date"].values
}
# 指标对比
old_indicator = {"group_name": "现有持仓组合", "return_ratio": round((old_return-1)*100, 2), "return_ratio_year": round(old_return_ratio_year*100,2),
"volatility": round(old_volatility*100, 2), "max_drawdown": round(old_max_drawdown[0]*100, 2), "sharpe": round(old_sharpe, 2)}
new_indicator = {"group_name": "建议优化组合", "return_ratio": round(new_return_ratio*100, 2), "return_ratio_year": round(new_return_ratio_year*100, 2),
"volatility": round(new_volatility*100, 2), "max_drawdown": round(new_drawdown[0]*100, 2), "sharpe": round(new_sharpe, 2)}
index_indicator = {"group_name": "中证500", "return_ratio": round(index_return_ratio*100, 2), "return_ratio_year": round(index_return_ratio_year*100, 2),
"volatility": round(index_volatility*100, 2), "max_drawdown": round(index_drawdown[0]*100, 2), "sharpe": round(index_sharpe, 2)}
indicator_compare = [new_indicator, old_indicator, index_indicator]
# 在保留{}的基础上,建议赎回{},并增配{}后,整体组合波动率大幅降低,最大回撤从{}降到不足{},年化收益率提升{}个点
hold_fund = set(self.portfolio) - set(self.abandon_fund_score + self.abandon_fund_corr + self.no_data_fund)
hold_fund_name = [get_fund_name(x).values[0][0] for x in hold_fund]
abandon_fund = (self.abandon_fund_score + self.abandon_fund_corr)
abandon_fund_name = [get_fund_name(x).values[0][0] for x in abandon_fund]
proposal_fund = self.proposal_fund
proposal_fund_name = [get_fund_name(x).values[0][0] for x in proposal_fund]
sentence = []
if hold_fund is not None:
sentence.append("在保留" + "".join([i + "," for i in hold_fund_name]).rstrip(",") + "的基础上")
if abandon_fund is not None:
sentence.append("建议赎回" + "".join([i + "," for i in abandon_fund_name]).rstrip(","))
if proposal_fund is not None:
sentence.append("增配" + "".join([i + "," for i in proposal_fund_name]).rstrip(",") + "后")
if new_volatility < old_volatility * 0.9:
sentence.append("整体组合波动率大幅降低")
if new_drawdown < old_max_drawdown:
sentence.append("最大回撤从{:.2%}降到不足{:.2%}".format(old_max_drawdown[0], new_drawdown[0]))
if new_return_ratio_year > old_return_ratio_year:
sentence.append("年化收益率提升{:.2f}个点".format((new_return_ratio_year - old_return_ratio_year) * 100))
whole_sentence = ",".join(sentence).lstrip(",") + "。"
return suggestions_result, suggestions_result_asset, return_compare_result, indicator_compare, whole_sentence
except Exception as e:
repr(e)
return None, None, None, None, None
def single_evaluation(self, fund_id, objective):
"""
1、该基金整体表现优秀/良好/一般,收益能力优秀/良好/合格/较差,回撤控制能力优秀/良好/合格/较差,风险收益比例较高/一般/较低;
2、在收益方面,该基金年化收益能力高于/持平/低于同类基金平均水平,有x%区间跑赢大盘/指数,绝对收益能力优秀/一般;
3、在风险方面,该基金抵御风险能力优秀/良好/一般,在同类基金中处于高/中/低等水平,最大回撤为x%,高于/持平/低于同类基金平均水平;
4、该基金收益较好/较差的同时回撤较大/较小,也就是说,该基金在用较大/较小风险换取较大/较小收益,存在较高/较低风险;
5、基金经理,投资年限5.23年,经验丰富;投资能力较强,生涯中共管理过X只基金,历任的X只基金平均业绩在同类中处于上游水平,其中x只排名在前x%;生涯年化回报率x%,同期大盘只有x%
旧个基显示1-4,新个基显示1-5。
旧个基如果是要保留的,显示好的评价。
如果是要剔除的,显示坏的评价。
新个基只显示好的评价。
Args:
fund_id:
Returns:
"""
z_score = search_rank(fund_rank, fund_id, metric='z_score')
total_level = np.select([z_score >= 80,
70 <= z_score < 80,
z_score < 70], [0, 1, 2]).item()
index_return_monthly = get_index_monthly(self.index_id, self.start_date)
fund_nav = get_tamp_nav(fund_id, self.start_date, invest_type=self.invest_type)
fund_nav_monthly = fund_nav.groupby([fund_nav.index.year, fund_nav.index.month]).tail(1)
fund_nav_monthly = rename_col(fund_nav_monthly, fund_id)
fund_return_monthly = simple_return(fund_nav_monthly[fund_id].astype(float))
index_return_monthly.index = index_return_monthly.index.strftime('%Y-%m')
fund_return_monthly.index = fund_return_monthly.index.strftime('%Y-%m')
compare = pd.merge(index_return_monthly, fund_return_monthly, how='inner', left_index=True, right_index=True)
fund_win_rate = ((compare[fund_id] - compare['pct_chg']) > 0).sum() / compare[fund_id].count()
return_rank = search_rank(fund_rank, fund_id, metric='annual_return_rank')
return_level = np.select([return_rank >= 0.8,
0.7 <= return_rank < 0.8,
0.6 <= return_rank < 0.7,
return_rank < 0.6], [0, 1, 2, 3]).item()
return_bool = 1 if return_level > 2 else 0
return_triple = return_level - 1 if return_level >= 2 else return_level
drawdown_rank = search_rank(fund_rank, fund_id, metric='max_drawdown_rank')
drawdown_value = search_rank(fund_rank, fund_id, metric='max_drawdown')
drawdown_level = np.select([drawdown_rank >= 0.8,
0.7 <= drawdown_rank < 0.8,
0.6 <= drawdown_rank < 0.7,
drawdown_rank < 0.6], [0, 1, 2, 3]).item()
drawdown_bool = 1 if drawdown_level > 2 else 0
drawdown_triple = drawdown_level - 1 if drawdown_level >= 2 else drawdown_level
sharp_rank = search_rank(fund_rank, fund_id, metric='sharp_ratio_rank')
sharp_level = np.select([sharp_rank >= 0.8,
0.6 <= sharp_rank < 0.8,
sharp_rank < 0.6], [0, 1, 2]).item()
data = {1: [total_level, return_level, drawdown_level, sharp_level],
2: [return_triple, format(fund_win_rate, '.2%'), return_bool],
3: [drawdown_triple, drawdown_triple, format(drawdown_value, '.2%'), drawdown_triple],
4: [return_bool, drawdown_bool, drawdown_bool, return_bool, drawdown_bool]}
if fund_id in self.abandon_fund_score:
data['remove'] = True
elif fund_id in self.proposal_fund:
data[5] = [1] * 7
data['remove'] = False
else:
data['remove'] = False
x = '30%'
content = {
# 第一个评价
1: [["优秀", "良好", "一般"],
["优秀", "良好", "合格", "较差"],
["优秀", "良好", "合格", "较差"],
["高", "一般", "较低"]],
# 第二个评价
2: [["高于", "持平", "低于"],
x,
["优秀", "一般"]],
# 第三个评价
3: [["优秀", "良好", "一般"],
["高", "中", "低"], x,
["高于", "持平", "低于"]],
# 第四个评价
4: [["较好", "较差"],
["较小", "较大"],
["较小", "较小"],
["较大", "较小"],
["较低", "较高"]],
5: [["TO DO"]] * 7}
sentence = {
1: "该基金整体表现%s,收益能力%s,回撤控制能力%s,风险收益比例%s;\n",
2: "在收益方面,该基金年化收益能力%s同类基金平均水平,有%s区间跑赢指数,绝对收益能力%s;\n",
3: "在风险方面,该基金抵御风险能力%s,在同类基金中处于%s等水平,最大回撤为%s,%s同类基金平均水平;\n",
4: "该基金收益%s的同时回撤%s,也就是说,该基金在用%s风险换取%s收益,存在%s风险;\n",
5: "基金经理,投资年限%s年,经验丰富;投资能力较强,生涯中共管理过%s只基金,历任的%s只基金平均业绩在同类中处于上游水平,其中%s只排名在前%s;生涯年化回报率%s,同期大盘只有%s;"}
remove = data["remove"]
del data["remove"]
# 不剔除,选择好的话术
if not remove:
evaluation = choose_good_evaluation(data)
# 剔除,选择坏的话术
else:
evaluation = choose_bad_evaluation(data)
ret = []
i = 1
for k, v in evaluation.items():
single_sentence = str(i) + "、" + sentence[k] % translate_single(content, k, v)
ret.append(single_sentence)
i += 1
fund_name = get_fund_name(fund_id).values[0][0]
if not ret:
try:
default_evaluation = pd.read_csv("evaluation.csv", encoding='utf-8', names=['fund_id', 'eval'])
ret.append('1、' + default_evaluation[default_evaluation['fund_id'] == fund_id]['eval'].values[0])
except Exception:
pass
evaluation_dict = {'name': fund_name, 'data': ret}
if objective:
if fund_id in self.abandon_fund_score + self.abandon_fund_corr:
evaluation_dict['status'] = "换仓"
elif fund_id in self.portfolio:
evaluation_dict['status'] = "保留"
return evaluation_dict
def old_portfolio_evaluation(self, ):
try:
result = []
for fund in self.portfolio:
try:
result.append(self.single_evaluation(fund))
except IndexError:
continue
return result
except Exception as e:
repr(e)
return None
def propose_fund_evaluation(self, ):
try:
result = []
for fund in self.proposal_fund:
result.append(self.single_evaluation(fund))
return result
except Exception as e:
repr(e)
return None
def single_fund_radar(self):
radar_data = []
for fund in self.portfolio:
try:
radar_data.append(get_radar_data(fund))
except IndexError:
continue
return radar_data
def propose_fund_radar(self):
radar_data = []
for fund in self.proposal_fund:
radar_data.append(get_radar_data(fund))
return radar_data
portfolio = ['HF00002JJ2', 'HF00005DBQ', 'HF0000681Q', 'HF00006693', 'HF00006AZF', 'HF00006BGS']
portfolio_diagnose = PortfolioDiagnose(client_type=1, portfolio=portfolio, invest_amount=10000000)
portfolio_diagnose.optimize()
if __name__ == '__main__':
print(portfolio_diagnose.single_fund_radar())
print(portfolio_diagnose.propose_fund_radar())
print(portfolio_diagnose.old_portfolio_evaluation())
print('旧组合相关性:', portfolio_diagnose.old_correlation)
print('新组合相关性:', portfolio_diagnose.new_correlation)
print('旧组合个基评价:', portfolio_diagnose.old_portfolio_evaluation())
print('新组合个基评价:', portfolio_diagnose.propose_fund_evaluation())
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment