1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# db = create_engine(
# 'mysql+pymysql://tamp_fund:@imeng408@tamper.mysql.polardb.rds.aliyuncs.com:3306/tamp_fund?charset=utf8mb4',
# pool_size=50,
# pool_recycle=3600,
# pool_pre_ping=True)
# con = db.connect()
import logging
logging.basicConfig(level=logging.INFO)
from app.api.engine import tamp_fund_engine, TAMP_SQL
from app.utils.week_evaluation import *
# con = pymysql.connect(host='tamper.mysql.polardb.rds.aliyuncs.com',
# user='tamp_fund',
# password='@imeng408',
# database='tamp_fund',
# charset='utf8',
# use_unicode='True')
def get_nav(fund, start_date, rollback=False, invest_type='public'):
"""获取基金ID为fund, 起始日期为start_date, 终止日期为当前日期的基金净值表
Args:
fund[str]:基金ID
start_date[date]:起始日期
rollback[bool]:当起始日期不在净值公布日历中,是否往前取最近的净值公布日
public[bool]:是否为公募
Returns:df[DataFrame]: 索引为净值公布日, 列为复权净值的净值表; 查询失败则返回None
"""
with TAMP_SQL(tamp_fund_engine) as tamp_product:
tamp_product_session = tamp_product.session
if invest_type == 'public':
sql = "SELECT ts_code, end_date, adj_nav FROM public_fund_nav " \
"WHERE ts_code='{}'".format(fund)
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['fund_id', 'end_date', 'adj_nav']).dropna(how='any')
df.rename({'ts_code': 'fund_id'}, axis=1, inplace=True)
else:
sql = "SELECT fund_id, price_date, cumulative_nav FROM fund_nav " \
"WHERE fund_id='{}'".format(fund)
# df = pd.read_sql(sql, con).dropna(how='any')
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(data, columns=['fund_id', 'price_date', 'cumulative_nav']).dropna(how='any')
df.rename({'price_date': 'end_date', 'cumulative_nav': 'adj_nav'}, axis=1, inplace=True)
if df['adj_nav'].count() == 0:
logging.log(logging.ERROR, "CAN NOT FIND {}".format(fund))
return None
df['end_date'] = pd.to_datetime(df['end_date'])
if rollback and df['end_date'].min() < start_date < df['end_date'].max():
while start_date not in list(df['end_date']):
start_date -= datetime.timedelta(days=1)
df = df[df['end_date'] >= start_date]
df.drop_duplicates(subset='end_date', inplace=True, keep='first')
df.set_index('end_date', inplace=True)
df.sort_index(inplace=True, ascending=True)
return df
def get_frequency(df):
"""获取基金净值一年当中公布的频率
Args:
df[DataFrame]:以基金净值公布日期为索引的基金净值表
Returns:[int]: 年公布频率;查询失败则返回ValueError
"""
index_series = df.index.to_series()
# freq_series = index_series - index_series.shift(1)
freq_series = index_series.diff(1)
logging.log(logging.DEBUG, freq_series.describe())
f = freq_series.mode()[0].days
if f in range(0, 3):
return 250
elif f in range(6, 9):
return 52
elif f in range(13, 18):
return 24
elif f in range(28, 33):
return 12
elif f in range(110, 133):
return 3
else:
raise ValueError
def get_trade_cal():
"""获取上交所交易日历表
Returns:df[DataFrame]: 索引为交易日, 列为交易日的上交所交易日历表
"""
with TAMP_SQL(tamp_fund_engine) as tamp_product:
tamp_product_session = tamp_product.session
sql = 'SELECT cal_date FROM stock_trade_cal WHERE is_open=1'
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['cal_date']).dropna(how='all')
# df = pd.read_sql(sql, con)
df['end_date'] = pd.to_datetime(df['cal_date'])
df.set_index('end_date', drop=False, inplace=True)
return df
def get_manager(invest_type):
"""获取基金对应基金经理表
Args:
invest_type: 资产类型:公募, 私募等
Returns:
"""
with TAMP_SQL(tamp_fund_engine) as tamp_product:
tamp_product_session = tamp_product.session
if invest_type == 'public':
sql = 'SELECT ts_code, name FROM public_fund_manager WHERE end_date IS NULL'
# df = pd.read_sql(sql, con)
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['ts_code', 'name'])
else:
sql = 'SELECT fund_id, fund_manager_id FROM fund_manager_mapping'
# df = pd.read_sql(sql, con)
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['fund_id', 'fund_manager_id'])
return df
def get_fund_info(end_date, invest_type):
"""[summary]
Args:
end_date ([type]): [description]
invest_type ([type]): [description]
Returns:
[type]: [description]
"""
with TAMP_SQL(tamp_fund_engine) as tamp_product:
tamp_product_session = tamp_product.session
if invest_type == 'public':
sql = "SELECT ts_code, fund_type, management FROM public_fund_basic " \
"WHERE delist_date IS NULL AND (due_date IS NULL OR due_date>'{}')".format(end_date.strftime('%Y%m%d'))
# df = pd.read_sql(sql, con).dropna(how='all')
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['ts_code', 'fund_type', 'management'])
manager_info = get_manager(invest_type)
df.rename({'ts_code': 'fund_id'}, axis=1, inplace=True)
df = pd.merge(df, manager_info, how="left", on='fund_id')
else:
sql = "SELECT id, substrategy FROM fund_info WHERE delete_tag=0 " \
"AND substrategy!=-1"
cur = tamp_product_session.execute(sql)
data = cur.fetchall()
df = pd.DataFrame(list(data), columns=['id', 'substrategy'])
# df = pd.read_sql(sql, con).dropna(how='all')
df.rename({'id': 'fund_id'}, axis=1, inplace=True)
manager_info = get_manager(invest_type)
df = pd.merge(df, manager_info, how="inner", on='fund_id')
return df
def resample(df, trading_cal, freq, simple_flag=True):
"""对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果
Args:
df ([DataFrame]): [原始基金净值表]
trading_cal ([DataFrame]): [上交所交易日表]
freq ([int]): [重采样频率: 1:工作日,2:周, 3:月, 4:半月, 5:季度]
Returns:
[DataFrame]: [重采样后剔除不在交易日历中的净值表和交易日历以净值日期为索引的合表]
"""
freq_dict = {250: 'B', 52: 'W-FRI', 12: 'M', 24: 'SM', 3: 'Q'}
resample_freq = freq_dict[freq]
# 按采样频率进行重采样并进行净值的前向填充
df = df.resample(rule=resample_freq, closed='right').ffill()
# 计算年化指标时简化重采样过程
if simple_flag and freq == 250:
return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True)
elif simple_flag and freq != 250:
return df
# 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等)
timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120}
timeoffsetmax = timeoffset_dict[freq]
# Dataframe不允许直接修改index,新建一份index的复制并转为list
new_index = list(df.index)
# 遍历重采样后的日期
for idx, date in enumerate(df.index):
# 如果重采样后的日期不在交易日历中
if date not in trading_cal['end_date']:
# 对重采样后的日期进行偏移
for time_offset in range(1, timeoffsetmax):
# 如果偏移后的日期在交易日历中,保留偏移后的日期
if date - datetime.timedelta(days=time_offset) in trading_cal['end_date']:
new_index[idx] = date - datetime.timedelta(days=time_offset)
# 任意一天满足立即退出循环
break
# 更改净值表的日期索引为重采样后且在交易日内的日期
df.index = pd.Series(new_index)
return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True)
def z_score(annual_return_rank, downside_risk_rank, max_drawdown_rank, sharp_ratio_rank):
return 25 * annual_return_rank + 25 * downside_risk_rank + 25 * max_drawdown_rank + 25 * sharp_ratio_rank
def cal_date(date, period_type, period):
year, month, day = map(int, date.strftime('%Y-%m-%d').split('-'))
if period_type == 'Y':
cal_year = year - period
return datetime.datetime(cal_year, month, day)
elif period_type == 'm':
cal_month = month - period
if cal_month > 0:
return datetime.datetime(year, cal_month, day)
else:
return datetime.datetime(year - 1, cal_month + 12, day)
elif period_type == 'd':
return date - datetime.timedelta(days=period)
def metric_rank(df):
for metric in ['annual_return', 'downside_risk', 'max_drawdown', 'sharp_ratio']:
if metric in ['downside_risk', 'max_drawdown']:
ascending = False
else:
ascending = True
df['{}_rank'.format(metric)] = df.groupby(['substrategy'])[metric].rank(ascending=ascending, pct=True)
return df
def fund_rank(start_date, end_date, invest_type='private'):
fund_info = get_fund_info(end_date, invest_type=invest_type)
group = fund_info.groupby('substrategy')
grouped_fund = group['fund_id'].unique()
trading_cal = get_trade_cal()
metric_df = pd.DataFrame(columns=('fund_id', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio',
'volatility', 'sortino_ratio', 'downside_risk', 'substrategy'))
skipped_funds = []
for substrategy in grouped_fund.index:
for fund in grouped_fund[substrategy]:
df = get_nav(fund, start_date, rollback=False, invest_type=invest_type)
try:
if df.index[-1] - df.index[0] < 0.6 * (end_date - start_date):
skipped_funds.append(fund)
logging.log(logging.INFO, 'Skipped {}'.format(fund))
continue
n = get_frequency(df)
except Exception as e:
# logging.log(logging.ERROR, repr(e))
logging.log(logging.INFO, 'Skipped {}'.format(fund))
continue
df = resample(df, trading_cal, n)
try:
_ = get_frequency(df)
except ValueError:
continue
logging.log(logging.INFO, "Dealing with {}".format(fund))
net_worth = df['adj_nav'].astype(float)
end_df, begin_df = net_worth.values[-1], net_worth.values[0]
sim_return = simple_return(net_worth)
ex_return = excess_return(sim_return, bank_rate=0.015, n=n)
drawdown = float(max_drawdown(net_worth)[0])
shp_ratio = sharpe_ratio(ex_return, sim_return, n)
rng_return = float(range_return(end_df, begin_df))
ann_return = annual_return(rng_return, net_worth, n)
vol = volatility(sim_return, n)
down_risk = downside_risk(sim_return, bank_rate=0.015, n=n)
sor_ratio = sortino_ratio(ex_return, down_risk, n)
manager = fund_info[fund_info['fund_id'] == fund]['fund_manager_id'].values
# management = fund_info[fund_info['fund_id'] == fund]['management'].values
row = pd.Series([fund, rng_return, ann_return, drawdown, shp_ratio,
vol, sor_ratio, down_risk, substrategy, manager],
index=['fund_id', 'range_return', 'annual_return', 'max_drawdown',
'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk',
'substrategy', 'manager'])
metric_df = metric_df.append(row, ignore_index=True)
metric_df.set_index('fund_id', inplace=True)
df = metric_rank(metric_df)
df['z_score'] = z_score(df['annual_return_rank'],
df['downside_risk_rank'],
df['max_drawdown_rank'],
df['sharp_ratio_rank'])
return df
if __name__ == '__main__':
end_date = datetime.datetime.now() - datetime.timedelta(days=1)
start_date = cal_date(end_date, 'Y', 1)
fund_rank = fund_rank(start_date, end_date, False)
# fund_rank.to_csv("fund_rank.csv", encoding='gbk')
# df = pd.read_csv('fund_rank.csv')
# df.to_sql("fund_rank", con, if_exists='replace')
# con.close()