data_service.py 20.6 KB
#!/usr/bin/python3.6
# -*- coding: utf-8 -*-
# @Time    : 2020/11/18 19:12
# @Author  : Jie. Z
# @Email   : zhaojiestudy@163.com
# @File    : data_service.py
# @Software: PyCharm

import pandas as pd
import numpy as np
from sqlalchemy import and_
import tushare as ts
import datetime
import math
from decimal import Decimal
from app.api.engine import tamp_user_engine, tamp_product_engine, TAMP_SQL
# from app.model.tamp_user_models import CustomerOrder, CustomerInfo
# from app.model.tamp_product_models import FundInfo
from app.utils.fund_rank import get_trade_cal


class UserCustomerDataAdaptor:
    user_id = ""
    customer_id = ""
    customer_real_name = ""
    month_date = ""
    end_date = ""
    group_data = {}
    trade_cal_date = None
    all_fund_distribution = {}
    all_fund_performance = {}

    def __init__(self, user_id, customer_id, end_date=str(datetime.date.today()), index_id="IN0000007M"):
        self.user_id = user_id
        self.customer_id = customer_id
        self.compare_index_id = index_id
        p_end_date = pd.to_datetime(end_date).date()
        p_end_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1) - datetime.timedelta(days=1)
        self.end_date = pd.to_datetime(str(p_end_date))
        self.end_date = pd.to_datetime("2020-12-11")
        p_start_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1)
        self.month_start_date = p_start_date
        self.month_start_date = pd.to_datetime("2020-11-01")
        self.user_customer_order_df = self.get_user_customer_order_data()
        self.fund_nav_total, self.fund_cnav_total = self.get_customer_fund_nav_data()
        self.index_df = self.get_customer_index_nav_data()
        self.total_customer_order_cnav_df = self.total_combine_data()
        self.group_operate()

    @staticmethod
    def get_trade_cal(start_date, end_date):
        try:
            df = get_trade_cal()
            df = df[df["cal_date"] >= start_date]
            df.drop(['end_date'], axis=1, inplace=True)
            df.rename(columns={'cal_date': 'end_date'}, inplace=True)
            df["datetime"] = df["end_date"].apply(lambda x: datetime.datetime.strptime(x, "%Y%m%d"))
            return df
        except:
            pass

        ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc')
        pro = ts.pro_api()
        if end_date is not None:
            df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1')
        else:
            df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1')
        df.drop(['exchange', 'is_open'], axis=1, inplace=True)
        df.rename(columns={'cal_date': 'end_date'}, inplace=True)
        df["datetime"] = df["end_date"].apply(lambda x: datetime.datetime.strptime(x, "%Y%m%d"))

        return df

    # 获取理财师下该用户所有订单列表
    def get_user_customer_order_data(self):
        # data1 = tamp_user_session.query(CustomerOrder)\
        #         #     .filter(user_id = self.user_id).all()
        # data2 = tamp_user_session.query(t_customer_info).all()
        # data3 = tamp_product_session.query(t_fund_info).all()
        with TAMP_SQL(tamp_user_engine) as tamp_user, TAMP_SQL(tamp_product_engine) as tamp_product:
            tamp_user_session = tamp_user.session
            tamp_product_session = tamp_product.session
            sql_user = """select f1.id, f2.realname,f3.customer_name,fund_id,f1.order_type,f1.pay_date,f1.subscription_fee,f1.confirm_share_date,f1.confirm_share,f1.confirm_amount,f1.nav,f1.folio_name from customer_order f1, user_info f2,customer_info f3   where f2.id=f1.user_id and f3.id=f1.customer_id and f1.user_id='{}' and f1.customer_id='{}'""".format(self.user_id, self.customer_id)
            cur = tamp_user_session.execute(sql_user)
            data = cur.fetchall()
            order_df = pd.DataFrame(list(data), columns=['order_id', 'username', 'customer_name', 'fund_id', 'order_type', 'pay_date',
                                                         'subscription_fee', 'confirm_share_date', 'confirm_share',
                                                         'confirm_amount', 'nav', 'folio_name'])

            sql_product = "select distinct `id`, `fund_short_name`, `nav_frequency`, `substrategy` from `fund_info`"
            cur = tamp_product_session.execute(sql_product)
            data = cur.fetchall()
            product_df = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'freq', 'substrategy'])

            user_customer_order_df = order_df.set_index('fund_id').join(product_df.set_index('fund_id')).reset_index()
            self.start_date = user_customer_order_df["confirm_share_date"].min()
            self.customer_real_name = user_customer_order_df["customer_name"].values[0]
            self.ifa_real_name = user_customer_order_df["username"].values[0]
            user_customer_order_df = user_customer_order_df[user_customer_order_df["confirm_share_date"] <= self.end_date]
            user_customer_order_df.index = pd.Series(range(len(user_customer_order_df)))
            return user_customer_order_df

    # 获取客户持有的基金净值数据
    def get_customer_fund_nav_data(self):
        with TAMP_SQL(tamp_product_engine) as tamp_product:
            tamp_product_session = tamp_product.session
            now_date = datetime.datetime.now().strftime("%Y%m%d")
            trade_date_df = self.get_trade_cal("20000101", now_date)
            self.trade_cal_date = trade_date_df
            all_fund_nav = pd.DataFrame(index=trade_date_df["datetime"])
            all_fund_cnav = pd.DataFrame(index=trade_date_df["datetime"])

            for cur_fund_id in self.user_customer_order_df["fund_id"].unique():
                # 对应基金净值
                sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `fund_nav` where `fund_id`='{}'  order by `price_date` ASC""".format(cur_fund_id)
                cur = tamp_product_session.execute(sql)
                data = cur.fetchall()
                cur_fund_nav_df = pd.DataFrame(list(data), columns=['price_date', 'nav', 'cnav'])

                # # 对应基金分红
                sql = """select distinct `distribute_date`, `distribution` from `fund_distribution` where `fund_id`='{}' and `distribute_type`='1' order by `distribute_date` ASC""".format(
                    cur_fund_id)
                cur = tamp_product_session.execute(sql)
                data = cur.fetchall()
                cur_fund_distribution_df = pd.DataFrame(list(data), columns=['price_date', 'distribution'])
                self.all_fund_distribution[cur_fund_id] = cur_fund_distribution_df

                # 对应基金performance数据
                sql = """select distinct `price_date`, `ret_1w`, `ret_cum_1m`, `ret_cum_6m`, `ret_cum_1y`, `ret_cum_ytd`, `ret_cum_incep` from `fund_performance` where `fund_id`='{}' order by `price_date` ASC""".format(
                    cur_fund_id)
                cur = tamp_product_session.execute(sql)
                data = cur.fetchall()
                cur_fund_performance_df = pd.DataFrame(list(data),
                columns=['price_date', 'ret_1w', 'ret_cum_1m', 'ret_cum_6m', 'ret_cum_1y', 'ret_cum_ytd', 'ret_cum_incep'])
                self.all_fund_performance[cur_fund_id] = cur_fund_performance_df

                cur_fund_nav_df["price_date"] = pd.to_datetime(cur_fund_nav_df["price_date"])
                cur_fund_nav_df.drop_duplicates(subset="price_date", keep='first', inplace=True)
                cur_fund_nav_df.set_index("price_date", inplace=True)
                cur_fund_nav_df = cur_fund_nav_df[cur_fund_nav_df.index.isin(all_fund_nav.index)]
                all_fund_nav[cur_fund_id] = cur_fund_nav_df["nav"]
                all_fund_cnav[cur_fund_id] = cur_fund_nav_df["cnav"]

            all_fund_nav = all_fund_nav[all_fund_nav.index <= self.end_date]
            all_fund_cnav = all_fund_cnav[all_fund_cnav.index <= self.end_date]
            # for cur_fund_id in self.user_customer_order_df["fund_id"].unique():
            #     all_fund_nav[cur_fund_id][all_fund_nav[cur_fund_id].apply(lambda x: math.isnan(x))]=np.nan
            #     all_fund_cnav[cur_fund_id][all_fund_cnav[cur_fund_id].apply(lambda x: math.isnan(x))] = np.nan
            self.last_nav_date = str(all_fund_cnav.dropna(how="all").index.values[-1])[:10]
            return all_fund_nav, all_fund_cnav

    # 获取客户对比指数净值数据
    def get_customer_index_nav_data(self, index_id="IN0000007M"):
        with TAMP_SQL(tamp_product_engine) as tamp_product:
            tamp_product_session = tamp_product.session
            sql = "select distinct price_date,close from fund_market_indexes where index_id='{}'  order by price_date ASC".format(index_id)
            cur = tamp_product_session.execute(sql)
            data = cur.fetchall()
            index_df = pd.DataFrame(list(data), columns=['price_date', 'index'])
            index_df["price_date"] = pd.to_datetime(index_df["price_date"])
            index_df.set_index("price_date", inplace=True)
            self.fund_cnav_total["index"] = index_df["index"]
            self.index_df = index_df

            return index_df

    # 分组合计算
    def group_operate(self):
        for folio in self.user_customer_order_df["folio_name"].unique():
            cur_folio_order_df = self.user_customer_order_df[self.user_customer_order_df["folio_name"] == folio]
            fund_id_list = list(self.user_customer_order_df["fund_id"].unique())
            cur_folio_nav_df = self.fund_nav_total[fund_id_list]
            # fund_id_list.append("index")
            cur_folio_cnav_df = self.fund_cnav_total[fund_id_list]
            self.signal_folio_operate(folio, cur_folio_order_df, cur_folio_nav_df, cur_folio_cnav_df)
            continue

    # 单个组合数据操作
    def signal_folio_operate(self, p_folio, p_order_df, p_nav_df, p_cnav_df):
        start_date = pd.to_datetime(p_order_df["confirm_share_date"].min())

        cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy()

        p_fund_id_list = list(p_order_df["fund_id"].unique())
        for p_fund_id in p_fund_id_list:
            order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min()
            if pd.to_datetime(order_min_date) > start_date:
                cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan

        for index, row in p_order_df.iterrows():
            cur_fund_id = str(row["fund_id"])
            confirm_share_date = pd.to_datetime(row["confirm_share_date"])

            # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值
            if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]):
                last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1)
                last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1)
                # 判断上个净值日和当前确认日之中是否存在分红日
                """need add judge"""

                if len(last_nav_data) < 1:
                    cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"]
                else:
                    diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0])
                    cur_cnav = last_cnav_data.values[0] + diff_nav
                    cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav
            else:
                confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1)
                confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1)
                diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0])
                cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav
                cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav

        cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill')
        for index, row in p_order_df.iterrows():
            cur_fund_id = str(row["fund_id"])
            confirm_share_date = pd.to_datetime(row["confirm_share_date"])

            # # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值
            # if pd.isnull(cnav_df.loc[confirm_share_date, cur_fund_id]):
            #     last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1)
            #     last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1)
            #     # 判断上个净值日和当前确认日之中是否存在分红日
            #     """need add judge"""
            #
            #     if len(last_nav_data) < 1:
            #         cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"]
            #     else:
            #         diff_nav = row["nav"] - last_nav_data.values[0]
            #         cur_cnav = last_cnav_data.values[0] + diff_nav
            #         cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav

            if cur_fund_id+"_amount" not in cnav_df:
                price = cnav_df[cur_fund_id].dropna()
                profit = price.diff().fillna(Decimal(0))
                cnav_df[cur_fund_id + "_profit"] = profit
                cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0))
                profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1)
                cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio
                cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0))
                cnav_df[cur_fund_id+"_amount"] = 0
                cnav_df[cur_fund_id + "_earn"] = 0
                cnav_df[cur_fund_id + "_share"] = 0

            # buy
            if row['order_type'] == 1:
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["confirm_amount"]
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"]
            # sell
            elif row['order_type'] == 2:
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] -= row["confirm_amount"]
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] -= row["confirm_share"]

            # cnav_df[cur_fund_id + "_earn"] = cnav_df[cur_fund_id + "_profit"] * cnav_df[cur_fund_id + "_share"]
            # cnav_df[cur_fund_id + "_earn"] = cnav_df[cur_fund_id + "_earn"].apply(lambda x: float(x))
            # cnav_df[cur_fund_id + "_cum_earn"] = cnav_df[cur_fund_id + "_earn"].cumsum()

        for p_fund_id_ in p_fund_id_list:
            cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply(lambda x: float(x)).fillna(0)
            # cnav_df[p_fund_id_ + "_earn"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: float(x))
            cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0)
            cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + cnav_df[p_fund_id_ + "_amount"]
            # cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_share"] * cnav_df[p_fund_id_]
            # cnav_df[p_fund_id_ + "_profit_ratio"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: Decimal(x)) / cnav_df[
            #     p_fund_id_ + "_net_amount"].shift()
        self.group_data[p_folio] = {"result_cnav_data": cnav_df, "order_df": p_order_df}
        return cnav_df

    # 所有的数据操作
    def total_combine_data(self):

        p_order_df = self.user_customer_order_df.copy()
        p_nav_df = self.fund_nav_total.copy()
        p_cnav_df = self.fund_cnav_total.copy()

        start_date = pd.to_datetime(p_order_df["confirm_share_date"].min())
        cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy()

        p_fund_id_list = list(p_order_df["fund_id"].unique())
        for p_fund_id in p_fund_id_list:
            order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min()
            if pd.to_datetime(order_min_date) > start_date:
                cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan

        for index, row in p_order_df.iterrows():
            cur_fund_id = str(row["fund_id"])
            confirm_share_date = pd.to_datetime(row["confirm_share_date"])

            # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值
            if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]):
                last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1)
                last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1)
                # 判断上个净值日和当前确认日之中是否存在分红日
                """need add judge"""

                if len(last_nav_data) < 1:
                    cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"]
                else:
                    diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0])
                    cur_cnav = last_cnav_data.values[0] + diff_nav
                    cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav
            else:
                confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1)
                confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1)
                diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0])
                cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav
                cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav

        cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill')
        for index, row in p_order_df.iterrows():
            cur_fund_id = str(row["fund_id"])
            confirm_share_date = pd.to_datetime(row["confirm_share_date"])
            if cur_fund_id + "_amount" not in cnav_df:
                price = cnav_df[cur_fund_id].dropna()
                profit = price.diff().fillna(Decimal(0))
                cnav_df[cur_fund_id + "_profit"] = profit
                cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0))
                profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1)
                cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio
                cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0))
                cnav_df[cur_fund_id + "_amount"] = 0
                cnav_df[cur_fund_id + "_earn"] = 0
                cnav_df[cur_fund_id + "_share"] = 0

                # profit = cnav_df[cur_fund_id].dropna() - cnav_df[cur_fund_id].dropna().shift(1)
                # cnav_df[cur_fund_id + "_profit"] = profit
                # cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0))
                # cnav_df[cur_fund_id + "_profit_ratio"] = profit / cnav_df[cur_fund_id].dropna().shift(1)
                # cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0))
                # cnav_df[cur_fund_id + "_amount"] = 0
                # cnav_df[cur_fund_id + "_earn"] = 0
                # cnav_df[cur_fund_id + "_cum_earn"] = 0
                # cnav_df[cur_fund_id + "_share"] = 0

            # buy
            if row['order_type'] == 1:
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["confirm_amount"]
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"]
            # sell
            elif row['order_type'] == 2:
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] -= row["confirm_amount"]
                cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] -= row["confirm_share"]

        for p_fund_id_ in p_fund_id_list:
            cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply(lambda x: float(x)).fillna(0)
            # cnav_df[p_fund_id_ + "_earn"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: float(x))
            cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0)
            cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + cnav_df[p_fund_id_ + "_amount"]

            # cnav_df[p_fund_id_ + "_profit_ratio"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: Decimal(x)) / cnav_df[
            #     p_fund_id_ + "_net_amount"].shift()
            # cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_share"] * cnav_df[p_fund_id_]

        return cnav_df