1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import pymysql
import tushare as ts
import logging
logging.basicConfig(level=logging.DEBUG)
from week_evaluation import *
con = pymysql.connect(host='tamper.mysql.polardb.rds.aliyuncs.com',
user='tamp_fund',
password='@imeng408',
database='tamp_fund',
charset='utf8',
use_unicode='True')
def get_dataframe(fund, start_date, rollback=False):
sql = "SELECT ts_code, end_date, adj_nav FROM public_fund_nav " \
"WHERE ts_code='{0}'".format(fund)
df = pd.read_sql(sql, con).dropna(how='any')
if df['adj_nav'].count() == 0:
logging.log(logging.ERROR, "CAN NOT FIND {}".format(fund))
return None
df['end_date'] = pd.to_datetime(df['end_date'])
if rollback:
while start_date not in list(df['end_date']):
start_date = start_date - datetime.timedelta(days=1)
df = df[df['end_date'] >= start_date]
df.drop_duplicates(subset='end_date', inplace=True, keep='first')
df.set_index('end_date', inplace=True)
df.sort_index(inplace=True, ascending=True)
return df
def get_frequency(df):
index_series = df.index.to_series()
freq_series = index_series - index_series.shift(1)
logging.log(logging.INFO, freq_series.describe())
f = freq_series.mode()[0].days
if f in range(0, 3):
return 250
elif f in range(6, 9):
return 52
elif f in range(13, 18):
return 24
elif f in range(28, 33):
return 12
elif f in range(110, 133):
return 3
else:
raise ValueError
def get_trade_cal(start_date, end_date, method):
if method == 'mysql':
sql = 'SELECT cal_date FROM stock_trade_cal WHERE is_open=1'
df = pd.read_sql(sql, con)
df['end_date'] = pd.to_datetime(df['cal_date'])
df.set_index('end_date', drop=False, inplace=True)
elif method == 'tushare':
ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc')
pro = ts.pro_api()
if end_date is not None:
df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1')
else:
df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1')
df.drop(['exchange', 'is_open'], axis=1, inplace=True)
df.rename(columns={'cal_date': 'end_date'}, inplace=True)
return df
def get_manager():
sql = 'SELECT ts_code, name FROM public_fund_manager WHERE end_date IS NULL'
df = pd.read_sql(sql, con)
return df
def get_fund_info(end_date):
sql = "SELECT ts_code, fund_type, management FROM public_fund_basic " \
"WHERE delist_date IS NULL AND (due_date IS NULL OR due_date>'{}')".format(end_date.strftime('%Y%m%d'))
df = pd.read_sql(sql, con).dropna(how='all')
manager_info = get_manager()
df = pd.merge(df, manager_info, how="left", on='ts_code')
return df
def resample(df, trading_cal, freq):
"""对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果
Args:
df ([DataFrame]): [原始基金净值表]
trading_cal ([DataFrame]): [上交所交易日表]
freq ([int]): [重采样频率: 1:工作日,2:周, 3:月, 4:半月, 5:季度]
Returns:
[DataFrame]: [重采样后剔除不在交易日历中的净值表和交易日历以净值日期为索引的合表]
"""
freq_dict = {250: 'B', 52: 'W-FRI', 12: 'M', 24: 'SM', 3: 'Q'}
resample_freq = freq_dict[freq]
# 按采样频率进行重采样并进行净值的前向填充
df = df.resample(rule=resample_freq).ffill()
# 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等)
timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120}
timeoffsetmax = timeoffset_dict[freq]
# Dataframe不允许直接修改index,新建一份index的复制并转为list
new_index = list(df.index)
# 遍历重采样后的日期
for idx, date in enumerate(df.index):
# 如果重采样后的日期不在交易日历中
if date not in trading_cal['end_date']:
# 对重采样后的日期进行偏移
for time_offset in range(1, timeoffsetmax):
# 如果偏移后的日期在交易日历中,保留偏移后的日期
if date - datetime.timedelta(days=time_offset) in trading_cal['end_date']:
new_index[idx] = date - datetime.timedelta(days=time_offset)
# 任意一天满足立即退出循环
break
# 更改净值表的日期索引为重采样后且在交易日内的日期
df.index = pd.Series(new_index)
return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True)
def z_score(annual_return_rank, downside_risk_rank, max_drawdown_rank, sharp_ratio_rank):
return 25 * annual_return_rank + 25 * downside_risk_rank + 25 * max_drawdown_rank + 25 * sharp_ratio_rank
def cal_date(date, period_type, period):
year, month, day = map(int, date.strftime('%Y-%m-%d').split('-'))
if period_type == 'Y':
cal_year = year - period
return datetime.datetime(cal_year, month, day)
elif period_type == 'm':
cal_month = month - period
if cal_month > 0:
return datetime.datetime(year, cal_month, day)
else:
return datetime.datetime(year - 1, cal_month + 12, day)
elif period_type == 'd':
return date - datetime.timedelta(days=period)
def metric_rank(df):
for metric in ['annual_return', 'downside_risk', 'max_drawdown', 'sortino_ratio']:
if metric in ['downside_risk', 'max_drawdown']:
ascending = False
else:
ascending = True
df['{}_rank'.format(metric)] = df.groupby(['invest_type'])[metric].rank(ascending=ascending, pct=True)
return df
def public_fund_rank(start_date, end_date):
fund_info = get_fund_info(end_date)
group = fund_info.groupby('fund_type')
grouped_fund = group['ts_code'].unique()
trading_cal = get_trade_cal(start_date, end_date, method='mysql')
metric_df = pd.DataFrame(columns=('ts_code', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio',
'volatility', 'sortino_ratio', 'downside_risk', 'invest_type'))
skipped_funds = []
for invest_type in grouped_fund.index:
for fund in grouped_fund[invest_type]:
df = get_dataframe(fund, start_date)
try:
if df.index[-1] - df.index[0] < 0.6 * (end_date - start_date):
skipped_funds.append(fund)
n = get_frequency(df)
except Exception as e:
logging.log(logging.ERROR, repr(e))
logging.log(logging.INFO, 'Skipped {}'.format(fund))
continue
df = resample(df, trading_cal, n)
_ = get_frequency(df)
logging.log(logging.INFO, "Dealing with {}".format(fund))
net_worth = df['adj_nav'].astype(float)
end_df, begin_df = net_worth.values[-1], net_worth.values[0]
sim_return = simple_return(net_worth)
ex_return = excess_return(sim_return, bank_rate=0.015, n=n)
drawdown = float(max_drawdown(net_worth)[0])
shp_ratio = sharpe_ratio(ex_return, sim_return, n)
rng_return = float(range_return(end_df, begin_df))
ann_return = annual_return(rng_return, net_worth, n)
vol = volatility(sim_return, n)
down_risk = downside_risk(sim_return, bank_rate=0.015, n=n)
sor_ratio = sortino_ratio(ex_return, down_risk, n)
manager = fund_info[fund_info['ts_code'] == fund]['name'].values
management = fund_info[fund_info['ts_code'] == fund]['management'].values
row = pd.Series([fund, rng_return, ann_return, drawdown, shp_ratio,
vol, sor_ratio, down_risk, invest_type, manager, management],
index=['ts_code', 'range_return', 'annual_return', 'max_drawdown',
'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk',
'invest_type', 'manager', 'management'])
metric_df = metric_df.append(row, ignore_index=True)
metric_df.set_index('ts_code', inplace=True)
df = metric_rank(metric_df)
df['z_score'] = z_score(df['annual_return_rank'],
df['downside_risk_rank'],
df['max_drawdown_rank'],
df['sharp_ratio_rank'])
return df
if __name__ == '__main__':
end_date = datetime.datetime.now() - datetime.timedelta(days=1)
start_date = cal_date(end_date, 'Y', 1)
public_fund_rank = public_fund_rank(start_date, end_date)
public_fund_rank.to_csv('public_fund_rank.csv', encoding='gbk')