import pymysql import tushare as ts import logging logging.basicConfig(level=logging.DEBUG) from week_evaluation import * con = pymysql.connect(host='tamper.mysql.polardb.rds.aliyuncs.com', user='tamp_fund', password='@imeng408', database='tamp_fund', charset='utf8', use_unicode='True') def get_dataframe(fund, start_date, rollback=False): sql = "SELECT ts_code, end_date, adj_nav FROM public_fund_nav " \ "WHERE ts_code='{0}'".format(fund) df = pd.read_sql(sql, con).dropna(how='any') if df['adj_nav'].count() == 0: logging.log(logging.ERROR, "CAN NOT FIND {}".format(fund)) return None df['end_date'] = pd.to_datetime(df['end_date']) if rollback: while start_date not in list(df['end_date']): start_date = start_date - datetime.timedelta(days=1) df = df[df['end_date'] >= start_date] df.drop_duplicates(subset='end_date', inplace=True, keep='first') df.set_index('end_date', inplace=True) df.sort_index(inplace=True, ascending=True) return df def get_frequency(df): index_series = df.index.to_series() freq_series = index_series - index_series.shift(1) logging.log(logging.INFO, freq_series.describe()) f = freq_series.mode()[0].days if f in range(0, 3): return 250 elif f in range(6, 9): return 52 elif f in range(13, 18): return 24 elif f in range(28, 33): return 12 elif f in range(110, 133): return 3 else: raise ValueError def get_trade_cal(start_date, end_date, method): if method == 'mysql': sql = 'SELECT cal_date FROM stock_trade_cal WHERE is_open=1' df = pd.read_sql(sql, con) df['end_date'] = pd.to_datetime(df['cal_date']) df.set_index('end_date', drop=False, inplace=True) elif method == 'tushare': ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc') pro = ts.pro_api() if end_date is not None: df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1') else: df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1') df.drop(['exchange', 'is_open'], axis=1, inplace=True) df.rename(columns={'cal_date': 'end_date'}, inplace=True) return df def get_manager(): sql = 'SELECT ts_code, name FROM public_fund_manager WHERE end_date IS NULL' df = pd.read_sql(sql, con) return df def get_fund_info(end_date): sql = "SELECT ts_code, fund_type, management FROM public_fund_basic " \ "WHERE delist_date IS NULL AND (due_date IS NULL OR due_date>'{}')".format(end_date.strftime('%Y%m%d')) df = pd.read_sql(sql, con).dropna(how='all') manager_info = get_manager() df = pd.merge(df, manager_info, how="left", on='ts_code') return df def resample(df, trading_cal, freq): """对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果 Args: df ([DataFrame]): [原始基金净值表] trading_cal ([DataFrame]): [上交所交易日表] freq ([int]): [重采样频率: 1:工作日,2:周, 3:月, 4:半月, 5:季度] Returns: [DataFrame]: [重采样后剔除不在交易日历中的净值表和交易日历以净值日期为索引的合表] """ freq_dict = {250: 'B', 52: 'W-FRI', 12: 'M', 24: 'SM', 3: 'Q'} resample_freq = freq_dict[freq] # 按采样频率进行重采样并进行净值的前向填充 df = df.resample(rule=resample_freq).ffill() # 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等) timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120} timeoffsetmax = timeoffset_dict[freq] # Dataframe不允许直接修改index,新建一份index的复制并转为list new_index = list(df.index) # 遍历重采样后的日期 for idx, date in enumerate(df.index): # 如果重采样后的日期不在交易日历中 if date not in trading_cal['end_date']: # 对重采样后的日期进行偏移 for time_offset in range(1, timeoffsetmax): # 如果偏移后的日期在交易日历中,保留偏移后的日期 if date - datetime.timedelta(days=time_offset) in trading_cal['end_date']: new_index[idx] = date - datetime.timedelta(days=time_offset) # 任意一天满足立即退出循环 break # 更改净值表的日期索引为重采样后且在交易日内的日期 df.index = pd.Series(new_index) return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True) def z_score(annual_return_rank, downside_risk_rank, max_drawdown_rank, sharp_ratio_rank): return 25 * annual_return_rank + 25 * downside_risk_rank + 25 * max_drawdown_rank + 25 * sharp_ratio_rank def cal_date(date, period_type, period): year, month, day = map(int, date.strftime('%Y-%m-%d').split('-')) if period_type == 'Y': cal_year = year - period return datetime.datetime(cal_year, month, day) elif period_type == 'm': cal_month = month - period if cal_month > 0: return datetime.datetime(year, cal_month, day) else: return datetime.datetime(year - 1, cal_month + 12, day) elif period_type == 'd': return date - datetime.timedelta(days=period) def metric_rank(df): for metric in ['annual_return', 'downside_risk', 'max_drawdown', 'sortino_ratio']: if metric in ['downside_risk', 'max_drawdown']: ascending = False else: ascending = True df['{}_rank'.format(metric)] = df.groupby(['invest_type'])[metric].rank(ascending=ascending, pct=True) return df def public_fund_rank(start_date, end_date): fund_info = get_fund_info(end_date) group = fund_info.groupby('fund_type') grouped_fund = group['ts_code'].unique() trading_cal = get_trade_cal(start_date, end_date, method='mysql') metric_df = pd.DataFrame(columns=('ts_code', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk', 'invest_type')) skipped_funds = [] for invest_type in grouped_fund.index: for fund in grouped_fund[invest_type]: df = get_dataframe(fund, start_date) try: if df.index[-1] - df.index[0] < 0.6 * (end_date - start_date): skipped_funds.append(fund) n = get_frequency(df) except Exception as e: logging.log(logging.ERROR, repr(e)) logging.log(logging.INFO, 'Skipped {}'.format(fund)) continue df = resample(df, trading_cal, n) _ = get_frequency(df) logging.log(logging.INFO, "Dealing with {}".format(fund)) net_worth = df['adj_nav'].astype(float) end_df, begin_df = net_worth.values[-1], net_worth.values[0] sim_return = simple_return(net_worth) ex_return = excess_return(sim_return, bank_rate=0.015, n=n) drawdown = float(max_drawdown(net_worth)[0]) shp_ratio = sharpe_ratio(ex_return, sim_return, n) rng_return = float(range_return(end_df, begin_df)) ann_return = annual_return(rng_return, net_worth, n) vol = volatility(sim_return, n) down_risk = downside_risk(sim_return, bank_rate=0.015, n=n) sor_ratio = sortino_ratio(ex_return, down_risk, n) manager = fund_info[fund_info['ts_code'] == fund]['name'].values management = fund_info[fund_info['ts_code'] == fund]['management'].values row = pd.Series([fund, rng_return, ann_return, drawdown, shp_ratio, vol, sor_ratio, down_risk, invest_type, manager, management], index=['ts_code', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk', 'invest_type', 'manager', 'management']) metric_df = metric_df.append(row, ignore_index=True) metric_df.set_index('ts_code', inplace=True) df = metric_rank(metric_df) df['z_score'] = z_score(df['annual_return_rank'], df['downside_risk_rank'], df['max_drawdown_rank'], df['sharp_ratio_rank']) return df if __name__ == '__main__': end_date = datetime.datetime.now() - datetime.timedelta(days=1) start_date = cal_date(end_date, 'Y', 1) public_fund_rank = public_fund_rank(start_date, end_date) public_fund_rank.to_csv('public_fund_rank.csv', encoding='gbk')