# coding: utf-8 """ 计算各个指标的方法 """ import pandas as pd import numpy as np import datetime import calendar import math dict_substrategy = {1010: '主观多头', 1020: '股票多空', 1030: '量化多头', 2010: '宏观策略', 3010: '主观趋势', 3020: '主观套利', 3030: '量化趋势', 3040: '量化套利', 3050: 'CTA策略', 4010: '并购重组', 4020: '定向增发', 4030: '大宗交易', 4040: '事件驱动复合', 5010: '市场中性', 5020: '套利策略', 5030: '相对价值复合', 6010: '纯债策略', 6020: '强债策略', 6030: '债券策略', 7010: 'MOM', 7020: 'FOF', 8010: '主观多策略', 8020: '量化多策略', -1: '其他策略'} BANK_RATE = 0.015 def simple_return(net_worth): """ 简单收益率 net_worth:净值或指数数据 """ d = net_worth / net_worth.shift(1) - 1 d.iloc[0] = 0 return d def excess_return(returns, bank_rate, n): """ 超额收益率 returns:简单收益率 bank_rate: 银行收益率, 是已经除过的无风险收益。也可以是其他的基准收益 n: 数据类型, 周(52), 月(12), 日(250) """ d = returns.mean() - bank_rate / n # print(pd.Series(d*np.ones(len(returns)))) return d # pd.Series(d*np.ones(len(returns))) def sharpe_ratio(excess_return, simple_return, n): """ 夏普比率 excess_return: 超额收益率 simple_return: 简单收益率 n: 数据类型, 周(52), 月(12), 日(250) """ import math try: d = math.sqrt(n) * excess_return / simple_return.std(ddof=1) if d == float("inf") or d == float("-inf"): return 0.0 except: return 0.0 return d def volatility(simple_return, n): """ 波动率 :param simple_return: :param n:数据类型, 周(52), 月(12), 日(250) :return: """ d = math.sqrt(n) * simple_return.std(ddof=1) return d def IR(excess_return, n): """ excess_return: 收益减去基准收益率 """ d = math.sqrt(n) * excess_return.mean() / excess_return.std(ddof=1) return d def max_drawdown(return_list): """ 最大回撤 return_list:净值或指数数据的列表 返回最大回撤值,以及开始位置,和结束位置值 """ i = np.argmax((np.maximum.accumulate(return_list) - return_list) / np.maximum.accumulate(return_list)) # 结束位置 if i == 0: return 0, 0, 0 # 没有回撤 j = np.argmax(return_list[:i]) # 开始位置 return (return_list[j] - return_list[i]) / (return_list[j]), j, i def month_differ(x, y): """ 计算月份相差 只根据month,year计算相差月份, 没有考虑day :param x: datetime.datetime :param y: :return: """ m_d = abs((x.year - y.year) * 12 + (x.month - y.month) * 1) return m_d def downside_risk(r, bank_rate, n): """ 下行风险 r: 简单收益率 """ _r = r.map(lambda x: x / 100) # mean = _r.mean() r_adjust = -r.map(lambda x: min(x - bank_rate / n, 0)) risk = np.sqrt((r_adjust ** 2).mean() * len(r_adjust) / (len(r_adjust) - 1)) return risk def sortino_ratio(excess_return, downside, n): """ 索提诺比率 df: 净值或指数数据 """ import math sortino_ratio = math.sqrt(n) * excess_return.mean() / downside return sortino_ratio def month_minus(date, n): """ 计算对标的前几个月份,如2020,3的前三个月是2019.12 输入datetime格式 注意:二月份没有30,31号的,而且3月31号,的前几个月有的是没有31号的。 :return: """ # day = date.day if date.month > n: month = date.month - n year = date.year else: month = date.month + 12 - n year = date.year - 1 # print('month////',month) try: pre_date = datetime.datetime(year, month, date.day) except: pre_date = datetime.datetime(year, month, calendar.monthrange(year, month)[1]) return pre_date def is_exsits(a, b): """ 判断日期是否存在, 将日期与基金最开始的时间对比, 如果存在,返回日期, 不存在,返回None :param a: 基金初始时间 :param b: 需要计算收益的起始时间 :return: """ if a < b: return True else: return False def year_minus(date, n): """ 计算对标的前几个年份,如2020.3的前1年是2019.3 输入datetime格式 :return: """ day = date.day month = date.month year = date.year - n pre_date = datetime.datetime(year, month, day) return pre_date def range_return(end_df, begin_df): """ 区间收益 """ d = end_df / begin_df - 1 return d def annual_return(range_return, df, n): """ 年化收益 """ d = (1 + range_return) ** (n / len(df)) - 1 return d def gain_loss_ratio(simple_return): """ 盈亏比 """ pos = simple_return[simple_return >= 0].sum() neg = simple_return[simple_return < 0].sum() d = - pos / neg return d def alpha_beta(simple_return, b_simple_return, n): """ alpha, beta """ df = pd.DataFrame() from sklearn.linear_model import LinearRegression linreg = LinearRegression() l = len(simple_return) df['returns'] = simple_return df['b_returns'] = b_simple_return X = np.array(df[['b_returns']][:l - 1]) y = np.array(df[['returns']][:l - 1]) linreg.fit(X, y) beta = linreg.coef_[0][0] alpha = linreg.intercept_[0] * n return alpha, beta def win_rate(simple_return, b_simple_return): """ 胜率 """ df = pd.DataFrame() df['diff'] = simple_return - b_simple_return d = df[df['diff'] >= 0]['diff'].count() / df['diff'].count() return d def lpm(returns, threshold, order): """ 下偏距, 一阶和二阶 order: 是一阶和二阶的设定 threshold: 是期望收益率 """ # This method returns a lower partial moment of the returns # Create an array he same length as returns containing the minimum return threshold threshold_array = np.empty(len(returns)) threshold_array.fill(threshold) # Calculate the difference between the threshold and the returns diff = threshold_array - returns # Set the minimum of each to 0 diff = diff.clip(min=0) # Return the sum of the different to the power of order return np.sum(diff ** order) / len(returns) def var(returns, alpha): """ 计算var值,历史收益率方法, 将历史收益率由小到大排序,去置信区间的分位点, alpha是置信区间 """ # This method calculates the historical simulation var of the returns sorted_returns = np.sort(returns) # Calculate the index associated with alpha index = int(alpha * len(sorted_returns)) # VaR should be positive return abs(sorted_returns[index]) def cvar(returns, alpha): # This method calculates the condition VaR of the returns sorted_returns = np.sort(returns) # Calculate the index associated with alpha index = int(alpha * len(sorted_returns)) # Calculate the total VaR beyond alpha sum_var = sorted_returns[0] for i in range(1, index): sum_var += sorted_returns[i] # Return the average VaR # CVaR should be positive return abs(sum_var / index) def freq_days(fav_freq): if fav_freq == 1: N = 250 # 日更新 elif fav_freq == 2: N = 52 # 周更新 elif fav_freq == 3: N = 24 # 半周更新 elif fav_freq == 4: N = 12 # 月更新 elif fav_freq == 5: N = 3 # 季度更新 else: N = 250 # 没有设置freq默认是天更 return N def resample(df, trading_cal, freq): """对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果 Args: df ([DataFrame]): [原始基金净值表] trading_cal ([type]): [上交所交易日表] freq ([type]): [重采样频率: 1:工作日,2:周, 3:月, 4:半月, 5:季度] Returns: [DataFrame]: [重采样后剔除不在交易日历中的净值表和交易日历以净值日期为索引的合表] """ freq_dict = {1: 'B', 2: 'W-FRI', 3: 'M', 4: 'SM', 5: 'Q'} if math.isnan(freq): freq = 2 resample_freq = freq_dict[freq] # 按采样频率进行重采样并进行净值的前向填充 df = df.resample(rule=resample_freq).ffill() # 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等) timeoffset_dict = {1: 1, 2: 5, 3: 30, 4: 15, 5: 120} timeoffetmax = timeoffset_dict[freq] # Dataframe不允许直接修改index,新建一份index的复制并转为list new_index = list(df.index) # 遍历重采样后的日期 for idx, date in enumerate(df.index): # 如果重采样后的日期不在交易日历中 if date not in trading_cal.index: # 对重采样后的日期进行偏移 for time_offset in range(1, timeoffetmax): # 如果偏移后的日期在交易日历中,保留偏移后的日期 if date - datetime.timedelta(days=time_offset) in trading_cal.index: new_index[idx] = date - datetime.timedelta(days=time_offset) # 任意一天满足立即退出循环 break # 更改净值表的日期索引为重采样后且在交易日内的日期 df.index = pd.Series(new_index) return df