fund_rank.py 8.84 KB
import pymysql
import tushare as ts
import logging
logging.basicConfig(level=logging.DEBUG)

from week_evaluation import *


con = pymysql.connect(host='tamper.mysql.polardb.rds.aliyuncs.com',
                      user='tamp_fund',
                      password='@imeng408',
                      database='tamp_fund',
                      charset='utf8',
                      use_unicode='True')


def get_dataframe(fund, start_date, rollback=False):
    sql = "SELECT ts_code, end_date, adj_nav FROM public_fund_nav " \
          "WHERE ts_code='{0}'".format(fund)
    df = pd.read_sql(sql, con).dropna(how='any')
    if df['adj_nav'].count() == 0:
        logging.log(logging.ERROR, "CAN NOT FIND {}".format(fund))
        return None

    df['end_date'] = pd.to_datetime(df['end_date'])

    if rollback:
        while start_date not in list(df['end_date']):
            start_date = start_date - datetime.timedelta(days=1)

    df = df[df['end_date'] >= start_date]
    df.drop_duplicates(subset='end_date', inplace=True, keep='first')
    df.set_index('end_date', inplace=True)
    df.sort_index(inplace=True, ascending=True)
    return df


def get_frequency(df):
    index_series = df.index.to_series()
    freq_series = index_series - index_series.shift(1)
    logging.log(logging.INFO, freq_series.describe())
    f = freq_series.mode()[0].days
    if f in range(0, 3):
        return 250
    elif f in range(6, 9):
        return 52
    elif f in range(13, 18):
        return 24
    elif f in range(28, 33):
        return 12
    elif f in range(110, 133):
        return 3
    else:
        raise ValueError


def get_trade_cal(start_date, end_date, method):
    if method == 'mysql':
        sql = 'SELECT cal_date FROM stock_trade_cal WHERE is_open=1'
        df = pd.read_sql(sql, con)
        df['end_date'] = pd.to_datetime(df['cal_date'])
        df.set_index('end_date', drop=False, inplace=True)

    elif method == 'tushare':
        ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc')
        pro = ts.pro_api()
        if end_date is not None:
            df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1')
        else:
            df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1')
        df.drop(['exchange', 'is_open'], axis=1, inplace=True)
        df.rename(columns={'cal_date': 'end_date'}, inplace=True)
    return df


def get_manager():
    sql = 'SELECT ts_code, name FROM public_fund_manager WHERE end_date IS NULL'
    df = pd.read_sql(sql, con)
    return df


def get_fund_info(end_date):
    sql = "SELECT ts_code, fund_type, management FROM public_fund_basic " \
          "WHERE delist_date IS NULL AND (due_date IS NULL OR due_date>'{}')".format(end_date.strftime('%Y%m%d'))
    df = pd.read_sql(sql, con).dropna(how='all')
    manager_info = get_manager()
    df = pd.merge(df, manager_info, how="left", on='ts_code')
    return df


def resample(df, trading_cal, freq):
    """对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果

    Args:
        df ([DataFrame]): [原始基金净值表]
        trading_cal ([DataFrame]): [上交所交易日表]
        freq ([int]): [重采样频率: 1:工作日,2:周, 3:月, 4:半月, 5:季度]

    Returns:
        [DataFrame]: [重采样后剔除不在交易日历中的净值表和交易日历以净值日期为索引的合表]
    """
    freq_dict = {250: 'B', 52: 'W-FRI', 12: 'M', 24: 'SM', 3: 'Q'}
    resample_freq = freq_dict[freq]
    # 按采样频率进行重采样并进行净值的前向填充
    df = df.resample(rule=resample_freq).ffill()

    # 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等)
    timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120}
    timeoffsetmax = timeoffset_dict[freq]

    # Dataframe不允许直接修改index,新建一份index的复制并转为list
    new_index = list(df.index)
    # 遍历重采样后的日期
    for idx, date in enumerate(df.index):
        # 如果重采样后的日期不在交易日历中
        if date not in trading_cal['end_date']:
            # 对重采样后的日期进行偏移
            for time_offset in range(1, timeoffsetmax):
                # 如果偏移后的日期在交易日历中,保留偏移后的日期
                if date - datetime.timedelta(days=time_offset) in trading_cal['end_date']:
                    new_index[idx] = date - datetime.timedelta(days=time_offset)
                    # 任意一天满足立即退出循环
                    break

    # 更改净值表的日期索引为重采样后且在交易日内的日期
    df.index = pd.Series(new_index)
    return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True)


def z_score(annual_return_rank, downside_risk_rank, max_drawdown_rank, sharp_ratio_rank):
    return 25 * annual_return_rank + 25 * downside_risk_rank + 25 * max_drawdown_rank + 25 * sharp_ratio_rank


def cal_date(date, period_type, period):
    year, month, day = map(int, date.strftime('%Y-%m-%d').split('-'))
    if period_type == 'Y':
        cal_year = year - period
        return datetime.datetime(cal_year, month, day)
    elif period_type == 'm':
        cal_month = month - period
        if cal_month > 0:
            return datetime.datetime(year, cal_month, day)
        else:
            return datetime.datetime(year - 1, cal_month + 12, day)
    elif period_type == 'd':
        return date - datetime.timedelta(days=period)


def metric_rank(df):
    for metric in ['annual_return', 'downside_risk', 'max_drawdown', 'sortino_ratio']:
        if metric in ['downside_risk', 'max_drawdown']:
            ascending = False
        else:
            ascending = True
        df['{}_rank'.format(metric)] = df.groupby(['invest_type'])[metric].rank(ascending=ascending, pct=True)
    return df


def public_fund_rank(start_date, end_date):
    fund_info = get_fund_info(end_date)
    group = fund_info.groupby('fund_type')
    grouped_fund = group['ts_code'].unique()

    trading_cal = get_trade_cal(start_date, end_date, method='mysql')

    metric_df = pd.DataFrame(columns=('ts_code', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio',
                                      'volatility', 'sortino_ratio', 'downside_risk', 'invest_type'))

    skipped_funds = []
    for invest_type in grouped_fund.index:
        for fund in grouped_fund[invest_type]:

            df = get_dataframe(fund, start_date)

            try:
                if df.index[-1] - df.index[0] < 0.6 * (end_date - start_date):
                    skipped_funds.append(fund)
                n = get_frequency(df)
            except Exception as e:
                logging.log(logging.ERROR, repr(e))
                logging.log(logging.INFO, 'Skipped {}'.format(fund))
                continue

            df = resample(df, trading_cal, n)
            _ = get_frequency(df)

            logging.log(logging.INFO, "Dealing with {}".format(fund))
            net_worth = df['adj_nav'].astype(float)

            end_df, begin_df = net_worth.values[-1], net_worth.values[0]

            sim_return = simple_return(net_worth)
            ex_return = excess_return(sim_return, bank_rate=0.015, n=n)
            drawdown = float(max_drawdown(net_worth)[0])
            shp_ratio = sharpe_ratio(ex_return, sim_return, n)
            rng_return = float(range_return(end_df, begin_df))
            ann_return = annual_return(rng_return, net_worth, n)
            vol = volatility(sim_return, n)
            down_risk = downside_risk(sim_return, bank_rate=0.015, n=n)
            sor_ratio = sortino_ratio(ex_return, down_risk, n)

            manager = fund_info[fund_info['ts_code'] == fund]['name'].values
            management = fund_info[fund_info['ts_code'] == fund]['management'].values

            row = pd.Series([fund, rng_return, ann_return, drawdown, shp_ratio,
                             vol, sor_ratio, down_risk, invest_type, manager, management],
                            index=['ts_code', 'range_return', 'annual_return', 'max_drawdown',
                                   'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk',
                                   'invest_type', 'manager', 'management'])
            metric_df = metric_df.append(row, ignore_index=True)
    metric_df.set_index('ts_code', inplace=True)

    df = metric_rank(metric_df)
    df['z_score'] = z_score(df['annual_return_rank'],
                            df['downside_risk_rank'],
                            df['max_drawdown_rank'],
                            df['sharp_ratio_rank'])
    return df


if __name__ == '__main__':
    end_date = datetime.datetime.now() - datetime.timedelta(days=1)
    start_date = cal_date(end_date, 'Y', 1)
    public_fund_rank = public_fund_rank(start_date, end_date)
    public_fund_rank.to_csv('public_fund_rank.csv', encoding='gbk')