1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# coding: utf-8
"""
计算各个指标的方法
"""
import pandas as pd
import numpy as np
import datetime
import calendar
import math
dict_substrategy = {1010: '主观多头', 1020: '股票多空', 1030: '量化多头', 2010: '宏观策略', 3010: '主观趋势', 3020: '主观套利',
3030: '量化趋势', 3040: '量化套利', 3050: 'CTA策略', 4010: '并购重组', 4020: '定向增发', 4030: '大宗交易',
4040: '事件驱动复合', 5010: '市场中性', 5020: '套利策略', 5030: '相对价值复合', 6010: '纯债策略', 6020: '强债策略',
6030: '债券策略', 7010: 'MOM', 7020: 'FOF', 8010: '主观多策略', 8020: '量化多策略', -1: '其他策略'}
BANK_RATE = 0.015
def simple_return(net_worth):
"""
简单收益率
net_worth:净值或指数数据
"""
try:
d = net_worth / net_worth.shift(1) - 1
except:
net_worth.iloc[0] = 0
return net_worth
d.iloc[0] = 0
return d
def excess_return(returns, bank_rate, n):
"""
超额收益率
returns:简单收益率
bank_rate: 银行收益率, 是已经除过的无风险收益。也可以是其他的基准收益
n: 数据类型, 周(52), 月(12), 日(250)
"""
d = returns.mean() - bank_rate / n
# print(pd.Series(d*np.ones(len(returns))))
return d
# pd.Series(d*np.ones(len(returns)))
def sharpe_ratio(excess_return, simple_return, n):
"""
夏普比率
excess_return: 超额收益率
simple_return: 简单收益率
n: 数据类型, 周(52), 月(12), 日(250)
"""
import math
try:
d = math.sqrt(n) * excess_return / simple_return.std(ddof=1)
if d == float("inf") or d == float("-inf"):
return 0.0
except:
return 0.0
return d
def volatility(simple_return, n):
"""
波动率
:param simple_return:
:param n:数据类型, 周(52), 月(12), 日(250)
:return:
"""
d = math.sqrt(n) * simple_return.std(ddof=1)
return d
def IR(excess_return, n):
"""
excess_return: 收益减去基准收益率
"""
d = math.sqrt(n) * excess_return.mean() / excess_return.std(ddof=1)
return d
def max_drawdown(return_list):
"""
最大回撤
return_list:净值或指数数据的列表
返回最大回撤值,以及开始位置,和结束位置值
"""
i = np.argmax((np.maximum.accumulate(return_list) - return_list) / np.maximum.accumulate(return_list)) # 结束位置
if i == 0:
return 0, 0, 0 # 没有回撤
j = np.argmax(return_list[:i]) # 开始位置
return (return_list[j] - return_list[i]) / (return_list[j]), j, i
def month_differ(x, y):
"""
计算月份相差
只根据month,year计算相差月份, 没有考虑day
:param x: datetime.datetime
:param y:
:return:
"""
m_d = abs((x.year - y.year) * 12 + (x.month - y.month) * 1)
return m_d
def downside_risk(r, bank_rate, n):
"""
下行风险
r: 简单收益率
"""
_r = r.map(lambda x: x / 100)
# mean = _r.mean()
r_adjust = -r.map(lambda x: min(x - bank_rate / n, 0))
risk = np.sqrt((r_adjust ** 2).mean() * len(r_adjust) / (len(r_adjust) - 1))
return risk
def sortino_ratio(excess_return, downside, n):
"""
索提诺比率
df: 净值或指数数据
"""
import math
sortino_ratio = math.sqrt(n) * excess_return.mean() / downside
return sortino_ratio
def month_minus(date, n):
"""
计算对标的前几个月份,如2020,3的前三个月是2019.12
输入datetime格式
注意:二月份没有30,31号的,而且3月31号,的前几个月有的是没有31号的。
:return:
"""
# day = date.day
if date.month > n:
month = date.month - n
year = date.year
else:
month = date.month + 12 - n
year = date.year - 1
# print('month////',month)
try:
pre_date = datetime.datetime(year, month, date.day)
except:
pre_date = datetime.datetime(year, month, calendar.monthrange(year, month)[1])
return pre_date
def is_exsits(a, b):
"""
判断日期是否存在, 将日期与基金最开始的时间对比, 如果存在,返回日期, 不存在,返回None
:param a: 基金初始时间
:param b: 需要计算收益的起始时间
:return:
"""
if a < b:
return True
else:
return False
def year_minus(date, n):
"""
计算对标的前几个年份,如2020.3的前1年是2019.3
输入datetime格式
:return:
"""
day = date.day
month = date.month
year = date.year - n
pre_date = datetime.datetime(year, month, day)
return pre_date
def range_return(end_df, begin_df):
"""
区间收益
"""
d = end_df / begin_df - 1
return d
def annual_return(range_return, df, n):
"""
年化收益
"""
d = (1 + range_return) ** (n / len(df)) - 1
return d
def gain_loss_ratio(simple_return):
"""
盈亏比
"""
pos = simple_return[simple_return >= 0].sum()
neg = simple_return[simple_return < 0].sum()
d = - pos / neg
return d
def alpha_beta(simple_return, b_simple_return, n):
"""
alpha, beta
"""
df = pd.DataFrame()
from sklearn.linear_model import LinearRegression
linreg = LinearRegression()
l = len(simple_return)
df['returns'] = simple_return
df['b_returns'] = b_simple_return
X = np.array(df[['b_returns']][:l - 1])
y = np.array(df[['returns']][:l - 1])
linreg.fit(X, y)
beta = linreg.coef_[0][0]
alpha = linreg.intercept_[0] * n
return alpha, beta
def win_rate(simple_return, b_simple_return):
"""
胜率
"""
df = pd.DataFrame()
df['diff'] = simple_return - b_simple_return
d = df[df['diff'] >= 0]['diff'].count() / df['diff'].count()
return d
def lpm(returns, threshold, order):
"""
下偏距, 一阶和二阶
order: 是一阶和二阶的设定
threshold: 是期望收益率
"""
# This method returns a lower partial moment of the returns
# Create an array he same length as returns containing the minimum return threshold
threshold_array = np.empty(len(returns))
threshold_array.fill(threshold)
# Calculate the difference between the threshold and the returns
diff = threshold_array - returns
# Set the minimum of each to 0
diff = diff.clip(min=0)
# Return the sum of the different to the power of order
return np.sum(diff ** order) / len(returns)
def var(returns, alpha):
"""
计算var值,历史收益率方法, 将历史收益率由小到大排序,去置信区间的分位点, alpha是置信区间
"""
# This method calculates the historical simulation var of the returns
sorted_returns = np.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# VaR should be positive
return abs(sorted_returns[index])
def cvar(returns, alpha):
# This method calculates the condition VaR of the returns
sorted_returns = np.sort(returns)
# Calculate the index associated with alpha
index = int(alpha * len(sorted_returns))
# Calculate the total VaR beyond alpha
sum_var = sorted_returns[0]
for i in range(1, index):
sum_var += sorted_returns[i]
# Return the average VaR
# CVaR should be positive
return abs(sum_var / index)
def freq_days(fav_freq):
if fav_freq == 1:
N = 250 # 日更新
elif fav_freq == 2:
N = 52 # 周更新
elif fav_freq == 3:
N = 24 # 半周更新
elif fav_freq == 4:
N = 12 # 月更新
elif fav_freq == 5:
N = 3 # 季度更新
else:
N = 250 # 没有设置freq默认是天更
return N
def resample(df, trading_cal, freq):
"""对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果
Args:
df ([DataFrame]): [原始基金净值表]
trading_cal ([type]): [上交所交易日表]
freq ([type]): [重采样频率: 1:工作日,2:周, 3:月, 4:半月, 5:季度]
Returns:
[DataFrame]: [重采样后剔除不在交易日历中的净值表和交易日历以净值日期为索引的合表]
"""
freq_dict = {1: 'B', 2: 'W-FRI', 3: 'M', 4: 'SM', 5: 'Q'}
if math.isnan(freq):
freq = 2
resample_freq = freq_dict[freq]
# 按采样频率进行重采样并进行净值的前向填充
df = df.resample(rule=resample_freq).ffill()
# 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等)
timeoffset_dict = {1: 1, 2: 5, 3: 30, 4: 15, 5: 120}
timeoffetmax = timeoffset_dict[freq]
# Dataframe不允许直接修改index,新建一份index的复制并转为list
new_index = list(df.index)
# 遍历重采样后的日期
for idx, date in enumerate(df.index):
# 如果重采样后的日期不在交易日历中
if date not in trading_cal.index:
# 对重采样后的日期进行偏移
for time_offset in range(1, timeoffetmax):
# 如果偏移后的日期在交易日历中,保留偏移后的日期
if date - datetime.timedelta(days=time_offset) in trading_cal.index:
new_index[idx] = date - datetime.timedelta(days=time_offset)
# 任意一天满足立即退出循环
break
# 更改净值表的日期索引为重采样后且在交易日内的日期
df.index = pd.Series(new_index)
return df