#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # @Time : 2020/11/18 19:12 # @Author : Jie. Z # @Email : zhaojiestudy@163.com # @File : data_service.py # @Software: PyCharm import pandas as pd import numpy as np from sqlalchemy import and_ import tushare as ts import datetime import math from decimal import Decimal from app.api.engine import tamp_user_engine, tamp_product_engine, TAMP_SQL # from app.model.tamp_user_models import CustomerOrder, CustomerInfo # from app.model.tamp_product_models import FundInfo class UserCustomerDataAdaptor: user_id = "" customer_id = "" customer_real_name = "" month_date = "" end_date = "" group_data = {} trade_cal_date = None all_fund_distribution = {} all_fund_performance = {} def __init__(self, user_id, customer_id, end_date=str(datetime.date.today()), index_id="IN0000007M"): self.user_id = user_id self.customer_id = customer_id self.compare_index_id = index_id p_end_date = pd.to_datetime(end_date).date() p_end_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1) - datetime.timedelta(days=1) self.end_date = pd.to_datetime(str(p_end_date)) # self.end_date = pd.to_datetime("2020-12-04") p_start_date = datetime.date(year=p_end_date.year, month=p_end_date.month, day=1) self.month_start_date = p_start_date # self.month_start_date = pd.to_datetime("2020-11-01") self.user_customer_order_df = self.get_user_customer_order_data() self.fund_nav_total, self.fund_cnav_total = self.get_customer_fund_nav_data() self.index_df = self.get_customer_index_nav_data() self.total_customer_order_cnav_df = self.total_combine_data() self.group_operate() @staticmethod def get_trade_cal(start_date, end_date): ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc') pro = ts.pro_api() if end_date is not None: df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1') else: df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1') df.drop(['exchange', 'is_open'], axis=1, inplace=True) df.rename(columns={'cal_date': 'end_date'}, inplace=True) df["datetime"] = df["end_date"].apply(lambda x: datetime.datetime.strptime(x, "%Y%m%d")) return df # 获取理财师下该用户所有订单列表 def get_user_customer_order_data(self): # data1 = tamp_user_session.query(CustomerOrder)\ # # .filter(user_id = self.user_id).all() # data2 = tamp_user_session.query(t_customer_info).all() # data3 = tamp_product_session.query(t_fund_info).all() with TAMP_SQL(tamp_user_engine) as tamp_user, TAMP_SQL(tamp_product_engine) as tamp_product: tamp_user_session = tamp_user.session tamp_product_session = tamp_product.session sql_user = """select f1.id, f2.realname,f3.customer_name,fund_id,f1.order_type,f1.pay_date,f1.subscription_fee,f1.confirm_share_date,f1.confirm_share,f1.confirm_amount,f1.nav,f1.folio_name from customer_order f1, user_info f2,customer_info f3 where f2.id=f1.user_id and f3.id=f1.customer_id and f1.user_id='{}' and f1.customer_id='{}'""".format(self.user_id, self.customer_id) cur = tamp_user_session.execute(sql_user) data = cur.fetchall() order_df = pd.DataFrame(list(data), columns=['order_id', 'username', 'customer_name', 'fund_id', 'order_type', 'pay_date', 'subscription_fee', 'confirm_share_date', 'confirm_share', 'confirm_amount', 'nav', 'folio_name']) sql_product = "select distinct `id`, `fund_short_name`, `nav_frequency`, `substrategy` from `fund_info`" cur = tamp_product_session.execute(sql_product) data = cur.fetchall() product_df = pd.DataFrame(list(data), columns=['fund_id', 'fund_name', 'freq', 'substrategy']) user_customer_order_df = order_df.set_index('fund_id').join(product_df.set_index('fund_id')).reset_index() self.start_date = user_customer_order_df["confirm_share_date"].min() self.customer_real_name = user_customer_order_df["customer_name"].values[0] return user_customer_order_df # 获取客户持有的基金净值数据 def get_customer_fund_nav_data(self): with TAMP_SQL(tamp_product_engine) as tamp_product: tamp_product_session = tamp_product.session now_date = datetime.datetime.now().strftime("%Y%m%d") trade_date_df = self.get_trade_cal("20000101", now_date) self.trade_cal_date = trade_date_df all_fund_nav = pd.DataFrame(index=trade_date_df["datetime"]) all_fund_cnav = pd.DataFrame(index=trade_date_df["datetime"]) for cur_fund_id in self.user_customer_order_df["fund_id"].unique(): # 对应基金净值 sql = """select distinct `price_date`, `nav`,`cumulative_nav` from `fund_nav` where `fund_id`='{}' order by `price_date` ASC""".format(cur_fund_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() cur_fund_nav_df = pd.DataFrame(list(data), columns=['price_date', 'nav', 'cnav']) # # 对应基金分红 sql = """select distinct `distribute_date`, `distribution` from `fund_distribution` where `fund_id`='{}' and `distribute_type`='1' order by `distribute_date` ASC""".format( cur_fund_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() cur_fund_distribution_df = pd.DataFrame(list(data), columns=['price_date', 'distribution']) self.all_fund_distribution[cur_fund_id] = cur_fund_distribution_df # 对应基金performance数据 sql = """select distinct `price_date`, `ret_1w`, `ret_cum_1m`, `ret_cum_6m`, `ret_cum_1y`, `ret_cum_ytd`, `ret_cum_incep` from `fund_performance` where `fund_id`='{}' order by `price_date` ASC""".format( cur_fund_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() cur_fund_performance_df = pd.DataFrame(list(data), columns=['price_date', 'ret_1w', 'ret_cum_1m', 'ret_cum_6m', 'ret_cum_1y', 'ret_cum_ytd', 'ret_cum_incep']) self.all_fund_performance[cur_fund_id] = cur_fund_performance_df cur_fund_nav_df["price_date"] = pd.to_datetime(cur_fund_nav_df["price_date"]) cur_fund_nav_df.drop_duplicates(subset="price_date", keep='first', inplace=True) cur_fund_nav_df.set_index("price_date", inplace=True) cur_fund_nav_df = cur_fund_nav_df[cur_fund_nav_df.index.isin(all_fund_nav.index)] all_fund_nav[cur_fund_id] = cur_fund_nav_df["nav"] all_fund_cnav[cur_fund_id] = cur_fund_nav_df["cnav"] all_fund_nav = all_fund_nav[all_fund_nav.index <= self.end_date] all_fund_cnav = all_fund_cnav[all_fund_cnav.index <= self.end_date] # for cur_fund_id in self.user_customer_order_df["fund_id"].unique(): # all_fund_nav[cur_fund_id][all_fund_nav[cur_fund_id].apply(lambda x: math.isnan(x))]=np.nan # all_fund_cnav[cur_fund_id][all_fund_cnav[cur_fund_id].apply(lambda x: math.isnan(x))] = np.nan self.last_nav_date = str(all_fund_cnav.dropna(how="all").index.values[-1])[:10] return all_fund_nav, all_fund_cnav # 获取客户对比指数净值数据 def get_customer_index_nav_data(self, index_id="IN0000007M"): with TAMP_SQL(tamp_product_engine) as tamp_product: tamp_product_session = tamp_product.session sql = "select distinct price_date,close from fund_market_indexes where index_id='{}' order by price_date ASC".format(index_id) cur = tamp_product_session.execute(sql) data = cur.fetchall() index_df = pd.DataFrame(list(data), columns=['price_date', 'index']) index_df["price_date"] = pd.to_datetime(index_df["price_date"]) index_df.set_index("price_date", inplace=True) self.fund_cnav_total["index"] = index_df["index"] self.index_df = index_df return index_df # 分组合计算 def group_operate(self): for folio in self.user_customer_order_df["folio_name"].unique(): cur_folio_order_df = self.user_customer_order_df[self.user_customer_order_df["folio_name"] == folio] fund_id_list = list(self.user_customer_order_df["fund_id"].unique()) cur_folio_nav_df = self.fund_nav_total[fund_id_list] # fund_id_list.append("index") cur_folio_cnav_df = self.fund_cnav_total[fund_id_list] self.signal_folio_operate(folio, cur_folio_order_df, cur_folio_nav_df, cur_folio_cnav_df) continue # 单个组合数据操作 def signal_folio_operate(self, p_folio, p_order_df, p_nav_df, p_cnav_df): start_date = pd.to_datetime(p_order_df["confirm_share_date"].min()) cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy() p_fund_id_list = list(p_order_df["fund_id"].unique()) for p_fund_id in p_fund_id_list: order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min() if pd.to_datetime(order_min_date) > start_date: cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]): last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # 判断上个净值日和当前确认日之中是否存在分红日 """need add judge""" if len(last_nav_data) < 1: cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] else: diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0]) cur_cnav = last_cnav_data.values[0] + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav else: confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1) confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1) diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0]) cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill') for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 # if pd.isnull(cnav_df.loc[confirm_share_date, cur_fund_id]): # last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # # 判断上个净值日和当前确认日之中是否存在分红日 # """need add judge""" # # if len(last_nav_data) < 1: # cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] # else: # diff_nav = row["nav"] - last_nav_data.values[0] # cur_cnav = last_cnav_data.values[0] + diff_nav # cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav if cur_fund_id+"_amount" not in cnav_df: price = cnav_df[cur_fund_id].dropna() profit = price.diff().fillna(Decimal(0)) cnav_df[cur_fund_id + "_profit"] = profit cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1) cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) cnav_df[cur_fund_id+"_amount"] = 0 cnav_df[cur_fund_id + "_earn"] = 0 cnav_df[cur_fund_id + "_share"] = 0 # buy if row['order_type'] == 1: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"] # sell elif row['order_type'] == 2: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] -= row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] -= row["confirm_share"] # cnav_df[cur_fund_id + "_earn"] = cnav_df[cur_fund_id + "_profit"] * cnav_df[cur_fund_id + "_share"] # cnav_df[cur_fund_id + "_earn"] = cnav_df[cur_fund_id + "_earn"].apply(lambda x: float(x)) # cnav_df[cur_fund_id + "_cum_earn"] = cnav_df[cur_fund_id + "_earn"].cumsum() for p_fund_id_ in p_fund_id_list: cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply(lambda x: float(x)).fillna(0) # cnav_df[p_fund_id_ + "_earn"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: float(x)) cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0) cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + cnav_df[p_fund_id_ + "_amount"] # cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_share"] * cnav_df[p_fund_id_] # cnav_df[p_fund_id_ + "_profit_ratio"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: Decimal(x)) / cnav_df[ # p_fund_id_ + "_net_amount"].shift() self.group_data[p_folio] = {"result_cnav_data": cnav_df, "order_df": p_order_df} return cnav_df # 所有的数据操作 def total_combine_data(self): p_order_df = self.user_customer_order_df.copy() p_nav_df = self.fund_nav_total.copy() p_cnav_df = self.fund_cnav_total.copy() start_date = pd.to_datetime(p_order_df["confirm_share_date"].min()) cnav_df = p_cnav_df[p_cnav_df.index >= start_date].copy() p_fund_id_list = list(p_order_df["fund_id"].unique()) for p_fund_id in p_fund_id_list: order_min_date = p_order_df[p_order_df["fund_id"] == p_fund_id]["confirm_share_date"].min() if pd.to_datetime(order_min_date) > start_date: cnav_df.loc[:order_min_date - datetime.timedelta(days=1), p_fund_id] = np.nan for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) # 根据确认净值日查看是否含有累积净值的数据,如果没有按照前后差值推算当天累积净值 if pd.isnull(p_nav_df.loc[confirm_share_date, cur_fund_id]): last_nav_data = p_nav_df[p_nav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) last_cnav_data = p_cnav_df[p_cnav_df.index < confirm_share_date][cur_fund_id].dropna().tail(1) # 判断上个净值日和当前确认日之中是否存在分红日 """need add judge""" if len(last_nav_data) < 1: cnav_df.loc[confirm_share_date, cur_fund_id] = row["nav"] else: diff_nav = Decimal(row["nav"]) - Decimal(last_nav_data.values[0]) cur_cnav = last_cnav_data.values[0] + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav else: confirm_date_nav_data = p_nav_df[p_nav_df.index == confirm_share_date][cur_fund_id].tail(1) confirm_date_cnav_data = p_cnav_df[p_cnav_df.index == confirm_share_date][cur_fund_id].tail(1) diff_nav = Decimal(row["nav"]) - Decimal(confirm_date_nav_data.values[0]) cur_cnav = Decimal(confirm_date_cnav_data.values[0]) + diff_nav cnav_df.loc[confirm_share_date, cur_fund_id] = cur_cnav cnav_df = cnav_df.dropna(axis=0, how="all").fillna(method='ffill') for index, row in p_order_df.iterrows(): cur_fund_id = str(row["fund_id"]) confirm_share_date = pd.to_datetime(row["confirm_share_date"]) if cur_fund_id + "_amount" not in cnav_df: price = cnav_df[cur_fund_id].dropna() profit = price.diff().fillna(Decimal(0)) cnav_df[cur_fund_id + "_profit"] = profit cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) profit_ratio = profit / cnav_df[cur_fund_id].dropna().shift(1) cnav_df[cur_fund_id + "_profit_ratio"] = profit_ratio cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) cnav_df[cur_fund_id + "_amount"] = 0 cnav_df[cur_fund_id + "_earn"] = 0 cnav_df[cur_fund_id + "_share"] = 0 # profit = cnav_df[cur_fund_id].dropna() - cnav_df[cur_fund_id].dropna().shift(1) # cnav_df[cur_fund_id + "_profit"] = profit # cnav_df[cur_fund_id + "_profit"] = cnav_df[cur_fund_id + "_profit"].fillna(Decimal(0)) # cnav_df[cur_fund_id + "_profit_ratio"] = profit / cnav_df[cur_fund_id].dropna().shift(1) # cnav_df[cur_fund_id + "_profit_ratio"] = cnav_df[cur_fund_id + "_profit_ratio"].fillna(Decimal(0)) # cnav_df[cur_fund_id + "_amount"] = 0 # cnav_df[cur_fund_id + "_earn"] = 0 # cnav_df[cur_fund_id + "_cum_earn"] = 0 # cnav_df[cur_fund_id + "_share"] = 0 # buy if row['order_type'] == 1: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] += row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] += row["confirm_share"] # sell elif row['order_type'] == 2: cnav_df.loc[confirm_share_date:, cur_fund_id + "_amount"] -= row["confirm_amount"] cnav_df.loc[confirm_share_date:, cur_fund_id + "_share"] -= row["confirm_share"] for p_fund_id_ in p_fund_id_list: cnav_df[p_fund_id_ + "_earn"] = (cnav_df[p_fund_id_ + "_profit"] * cnav_df[p_fund_id_ + "_share"]).apply(lambda x: float(x)).fillna(0) # cnav_df[p_fund_id_ + "_earn"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: float(x)) cnav_df[p_fund_id_ + "_cum_earn"] = cnav_df[p_fund_id_ + "_earn"].cumsum().fillna(0) cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_cum_earn"].apply(lambda x: Decimal(x)) + cnav_df[p_fund_id_ + "_amount"] # cnav_df[p_fund_id_ + "_profit_ratio"] = cnav_df[p_fund_id_ + "_earn"].apply(lambda x: Decimal(x)) / cnav_df[ # p_fund_id_ + "_net_amount"].shift() # cnav_df[p_fund_id_ + "_net_amount"] = cnav_df[p_fund_id_ + "_share"] * cnav_df[p_fund_id_] return cnav_df