# db = create_engine( # 'mysql+pymysql://tamp_fund:@imeng408@tamper.mysql.polardb.rds.aliyuncs.com:3306/tamp_fund?charset=utf8mb4', # pool_size=50, # pool_recycle=3600, # pool_pre_ping=True) # con = db.connect() import logging logging.basicConfig(level=logging.INFO) from app.api.engine import tamp_fund_engine, TAMP_SQL, tamp_product_engine from app.utils.week_evaluation import * # con = pymysql.connect(host='tamper.mysql.polardb.rds.aliyuncs.com', # user='tamp_fund', # password='@imeng408', # database='tamp_fund', # charset='utf8', # use_unicode='True') def get_nav(fund, start_date, rollback=False, invest_type='public'): """获取基金ID为fund, 起始日期为start_date, 终止日期为当前日期的基金净值表 Args: fund[str]:基金ID start_date[date]:起始日期 rollback[bool]:当起始日期不在净值公布日历中,是否往前取最近的净值公布日 public[bool]:是否为公募 Returns:df[DataFrame]: 索引为净值公布日, 列为复权净值的净值表; 查询失败则返回None """ with TAMP_SQL(tamp_fund_engine) as tamp_product: tamp_product_session = tamp_product.session if invest_type == 'public': sql = "SELECT ts_code, end_date, adj_nav FROM public_fund_nav " \ "WHERE ts_code='{}'".format(fund) cur = tamp_product_session.execute(sql) data = cur.fetchall() df = pd.DataFrame(list(data), columns=['fund_id', 'end_date', 'adj_nav']).dropna(how='any') df.rename({'ts_code': 'fund_id'}, axis=1, inplace=True) else: sql = "SELECT fund_id, price_date, cumulative_nav FROM fund_nav " \ "WHERE fund_id='{}'".format(fund) # df = pd.read_sql(sql, con).dropna(how='any') cur = tamp_product_session.execute(sql) data = cur.fetchall() df = pd.DataFrame(data, columns=['fund_id', 'price_date', 'cumulative_nav']).dropna(how='any') df.rename({'price_date': 'end_date', 'cumulative_nav': 'adj_nav'}, axis=1, inplace=True) if df['adj_nav'].count() == 0: logging.log(logging.ERROR, "CAN NOT FIND {}".format(fund)) return None df['end_date'] = pd.to_datetime(df['end_date']) if rollback and df['end_date'].min() < start_date < df['end_date'].max(): while start_date not in list(df['end_date']): start_date -= datetime.timedelta(days=1) df = df[df['end_date'] >= start_date] df.drop_duplicates(subset='end_date', inplace=True, keep='first') df.set_index('end_date', inplace=True) df.sort_index(inplace=True, ascending=True) return df def get_frequency(df): """获取基金净值一年当中公布的频率 Args: df[DataFrame]:以基金净值公布日期为索引的基金净值表 Returns:[int]: 年公布频率;查询失败则返回ValueError """ index_series = df.index.to_series() # freq_series = index_series - index_series.shift(1) freq_series = index_series.diff(1) logging.log(logging.DEBUG, freq_series.describe()) try: f = freq_series.mode()[0].days except: return 250 if f in range(0, 3): return 250 elif f in range(6, 9): return 52 elif f in range(13, 18): return 24 elif f in range(28, 33): return 12 elif f in range(110, 133): return 3 else: return 250 def get_trade_cal(): """获取上交所交易日历表 Returns:df[DataFrame]: 索引为交易日, 列为交易日的上交所交易日历表 """ with TAMP_SQL(tamp_fund_engine) as tamp_product: tamp_product_session = tamp_product.session sql = 'SELECT cal_date FROM stock_trade_cal WHERE is_open=1' cur = tamp_product_session.execute(sql) data = cur.fetchall() df = pd.DataFrame(list(data), columns=['cal_date']).dropna(how='all') # df = pd.read_sql(sql, con) df['end_date'] = pd.to_datetime(df['cal_date']) df.set_index('end_date', drop=False, inplace=True) return df def get_manager(invest_type): """获取基金对应基金经理表 Args: invest_type: 资产类型:公募, 私募等 Returns: """ with TAMP_SQL(tamp_fund_engine) as tamp_product: tamp_product_session = tamp_product.session if invest_type == 'public': sql = 'SELECT ts_code, name FROM public_fund_manager WHERE end_date IS NULL' # df = pd.read_sql(sql, con) cur = tamp_product_session.execute(sql) data = cur.fetchall() df = pd.DataFrame(list(data), columns=['ts_code', 'name']) else: sql = 'SELECT fund_id, fund_manager_id FROM fund_manager_mapping' # df = pd.read_sql(sql, con) cur = tamp_product_session.execute(sql) data = cur.fetchall() df = pd.DataFrame(list(data), columns=['fund_id', 'fund_manager_id']) return df def get_fund_info(end_date, invest_type): """[summary] Args: end_date ([type]): [description] invest_type ([type]): [description] Returns: [type]: [description] """ with TAMP_SQL(tamp_fund_engine) as tamp_product: tamp_product_session = tamp_product.session if invest_type == 'public': sql = "SELECT ts_code, fund_type, management FROM public_fund_basic " \ "WHERE delist_date IS NULL AND (due_date IS NULL OR due_date>'{}')".format(end_date.strftime('%Y%m%d')) # df = pd.read_sql(sql, con).dropna(how='all') cur = tamp_product_session.execute(sql) data = cur.fetchall() df = pd.DataFrame(list(data), columns=['ts_code', 'fund_type', 'management']) manager_info = get_manager(invest_type) df.rename({'ts_code': 'fund_id'}, axis=1, inplace=True) df = pd.merge(df, manager_info, how="left", on='fund_id') else: sql = "SELECT id, substrategy FROM fund_info WHERE delete_tag=0 " \ "AND substrategy!=-1" cur = tamp_product_session.execute(sql) data = cur.fetchall() df = pd.DataFrame(list(data), columns=['id', 'substrategy']) # df = pd.read_sql(sql, con).dropna(how='all') df.rename({'id': 'fund_id'}, axis=1, inplace=True) manager_info = get_manager(invest_type) df = pd.merge(df, manager_info, how="inner", on='fund_id') return df def resample(df, trading_cal, freq, simple_flag=True): """对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果 Args: df ([DataFrame]): [原始基金净值表] trading_cal ([DataFrame]): [上交所交易日表] freq ([int]): [重采样频率: 1:工作日,2:周, 3:月, 4:半月, 5:季度] Returns: [DataFrame]: [重采样后剔除不在交易日历中的净值表和交易日历以净值日期为索引的合表] """ freq_dict = {250: 'B', 52: 'W-FRI', 12: 'M', 24: 'SM', 3: 'Q'} resample_freq = freq_dict[freq] # 按采样频率进行重采样并进行净值的前向填充 df = df.resample(rule=resample_freq, closed='right').ffill() # 计算年化指标时简化重采样过程 if simple_flag and freq == 250: return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True) elif simple_flag and freq != 250: return df # 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等) timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120} timeoffsetmax = timeoffset_dict[freq] # Dataframe不允许直接修改index,新建一份index的复制并转为list new_index = list(df.index) # 遍历重采样后的日期 for idx, date in enumerate(df.index): # 如果重采样后的日期不在交易日历中 if date not in trading_cal['end_date']: # 对重采样后的日期进行偏移 for time_offset in range(1, timeoffsetmax): # 如果偏移后的日期在交易日历中,保留偏移后的日期 if date - datetime.timedelta(days=time_offset) in trading_cal['end_date']: new_index[idx] = date - datetime.timedelta(days=time_offset) # 任意一天满足立即退出循环 break # 更改净值表的日期索引为重采样后且在交易日内的日期 df.index = pd.Series(new_index) return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True) def z_score(annual_return_rank, downside_risk_rank, max_drawdown_rank, sharp_ratio_rank): return 25 * annual_return_rank + 25 * downside_risk_rank + 25 * max_drawdown_rank + 25 * sharp_ratio_rank def cal_date(date, period_type, period): year, month, day = map(int, date.strftime('%Y-%m-%d').split('-')) if period_type == 'Y': cal_year = year - period return datetime.datetime(cal_year, month, day) elif period_type == 'm': cal_month = month - period if cal_month > 0: return datetime.datetime(year, cal_month, day) else: return datetime.datetime(year - 1, cal_month + 12, day) elif period_type == 'd': return date - datetime.timedelta(days=period) def metric_rank(df): for metric in ['annual_return', 'downside_risk', 'max_drawdown', 'sharp_ratio']: if metric in ['downside_risk', 'max_drawdown']: ascending = False else: ascending = True df['{}_rank'.format(metric)] = df.groupby(['substrategy'])[metric].rank(ascending=ascending, pct=True) return df def fund_rank(start_date, end_date, invest_type='private'): fund_info = get_fund_info(end_date, invest_type=invest_type) group = fund_info.groupby('substrategy') grouped_fund = group['fund_id'].unique() trading_cal = get_trade_cal() metric_df = pd.DataFrame(columns=('fund_id', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk', 'substrategy')) skipped_funds = [] for substrategy in grouped_fund.index: for fund in grouped_fund[substrategy]: df = get_nav(fund, start_date, rollback=False, invest_type=invest_type) try: if df.index[-1] - df.index[0] < 0.6 * (end_date - start_date): skipped_funds.append(fund) logging.log(logging.INFO, 'Skipped {}'.format(fund)) continue n = get_frequency(df) except Exception as e: # logging.log(logging.ERROR, repr(e)) logging.log(logging.INFO, 'Skipped {}'.format(fund)) continue df = resample(df, trading_cal, n) try: _ = get_frequency(df) except ValueError: continue logging.log(logging.INFO, "Dealing with {}".format(fund)) net_worth = df['adj_nav'].astype(float) end_df, begin_df = net_worth.values[-1], net_worth.values[0] sim_return = simple_return(net_worth) ex_return = excess_return(sim_return, bank_rate=0.015, n=n) drawdown = float(max_drawdown(net_worth)[0]) shp_ratio = sharpe_ratio(ex_return, sim_return, n) rng_return = float(range_return(end_df, begin_df)) ann_return = annual_return(rng_return, net_worth, n) vol = volatility(sim_return, n) down_risk = downside_risk(sim_return, bank_rate=0.015, n=n) sor_ratio = sortino_ratio(ex_return, down_risk, n) manager = fund_info[fund_info['fund_id'] == fund]['fund_manager_id'].values # management = fund_info[fund_info['fund_id'] == fund]['management'].values row = pd.Series([fund, rng_return, ann_return, drawdown, shp_ratio, vol, sor_ratio, down_risk, substrategy, manager], index=['fund_id', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk', 'substrategy', 'manager']) metric_df = metric_df.append(row, ignore_index=True) metric_df.set_index('fund_id', inplace=True) df = metric_rank(metric_df) df['z_score'] = z_score(df['annual_return_rank'], df['downside_risk_rank'], df['max_drawdown_rank'], df['sharp_ratio_rank']) return df if __name__ == '__main__': end_date = datetime.datetime.now() - datetime.timedelta(days=1) start_date = cal_date(end_date, 'Y', 1) fund_rank = fund_rank(start_date, end_date, False) # fund_rank.to_csv("fund_rank.csv", encoding='gbk') # df = pd.read_csv('fund_rank.csv') # df.to_sql("fund_rank", con, if_exists='replace') # con.close()