Commit f549e333 authored by 李宗熹's avatar 李宗熹

添加基金诊断模块

parent 27d4cde4
from .black_litterman import (
market_implied_prior_returns,
market_implied_risk_aversion,
BlackLittermanModel,
)
from .cla import CLA
from .discrete_allocation import get_latest_prices, DiscreteAllocation
from .efficient_frontier import EfficientFrontier
from .hierarchical_portfolio import HRPOpt
from .risk_models import CovarianceShrinkage
__version__ = "1.2.6"
__all__ = [
"market_implied_prior_returns",
"market_implied_risk_aversion",
"BlackLittermanModel",
"CLA",
"get_latest_prices",
"DiscreteAllocation",
"EfficientFrontier",
"HRPOpt",
"CovarianceShrinkage",
]
"""
The ``base_optimizer`` module houses the parent classes ``BaseOptimizer`` from which all
optimisers will inherit. ``BaseConvexOptimizer`` is the base class for all ``cvxpy`` (and ``scipy``)
optimisation.
Additionally, we define a general utility function ``portfolio_performance`` to
evaluate return and risk for a given set of portfolio weights.
"""
import collections
import json
import warnings
import numpy as np
import pandas as pd
import cvxpy as cp
import scipy.optimize as sco
from . import objective_functions
from . import exceptions
class BaseOptimizer:
"""
Instance variables:
- ``n_assets`` - int
- ``tickers`` - str list
- ``weights`` - np.ndarray
Public methods:
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(self, n_assets, tickers=None):
"""
:param n_assets: number of assets
:type n_assets: int
:param tickers: name of assets
:type tickers: list
"""
self.n_assets = n_assets
if tickers is None:
self.tickers = list(range(n_assets))
else:
self.tickers = tickers
# Outputs
self.weights = None
def _make_output_weights(self, weights=None):
"""
Utility function to make output weight dict from weight attribute (np.array). If no
arguments passed, use self.tickers and self.weights. If one argument is passed, assume
it is an alternative weight array so use self.tickers and the argument.
"""
if weights is None:
weights = self.weights
return collections.OrderedDict(zip(self.tickers, weights))
def set_weights(self, input_weights):
"""
Utility function to set weights attribute (np.array) from user input
:param input_weights: {ticker: weight} dict
:type input_weights: dict
"""
self.weights = np.array([input_weights[ticker] for ticker in self.tickers])
def clean_weights(self, cutoff=1e-4, rounding=5):
"""
Helper method to clean the raw weights, setting any weights whose absolute
values are below the cutoff to zero, and rounding the rest.
:param cutoff: the lower bound, defaults to 1e-4
:type cutoff: float, optional
:param rounding: number of decimal places to round the weights, defaults to 5.
Set to None if rounding is not desired.
:type rounding: int, optional
:return: asset weights
:rtype: OrderedDict
"""
if self.weights is None:
raise AttributeError("Weights not yet computed")
clean_weights = self.weights.copy()
clean_weights[np.abs(clean_weights) < cutoff] = 0
if rounding is not None:
if not isinstance(rounding, int) or rounding < 1:
raise ValueError("rounding must be a positive integer")
clean_weights = np.round(clean_weights, rounding)
return self._make_output_weights(clean_weights)
def save_weights_to_file(self, filename="weights.csv"):
"""
Utility method to save weights to a text file.
:param filename: name of file. Should be csv, json, or txt.
:type filename: str
"""
clean_weights = self.clean_weights()
ext = filename.split(".")[1]
if ext == "csv":
pd.Series(clean_weights).to_csv(filename, header=False)
elif ext == "json":
with open(filename, "w") as fp:
json.dump(clean_weights, fp)
elif ext == "txt":
with open(filename, "w") as f:
f.write(str(dict(clean_weights)))
else:
raise NotImplementedError("Only supports .txt .json .csv")
class BaseConvexOptimizer(BaseOptimizer):
"""
The BaseConvexOptimizer contains many private variables for use by
``cvxpy``. For example, the immutable optimisation variable for weights
is stored as self._w. Interacting directly with these variables is highly
discouraged.
Instance variables:
- ``n_assets`` - int
- ``tickers`` - str list
- ``weights`` - np.ndarray
- ``solver`` - str
Public methods:
- ``add_objective()`` adds a (convex) objective to the optimisation problem
- ``add_constraint()`` adds a (linear) constraint to the optimisation problem
- ``convex_objective()`` solves for a generic convex objective with linear constraints
- ``nonconvex_objective()`` solves for a generic nonconvex objective using the scipy backend.
This is prone to getting stuck in local minima and is generally *not* recommended.
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(
self, n_assets, tickers=None, weight_bounds=(0, 1), solver=None, verbose=False
):
"""
:param weight_bounds: minimum and maximum weight of each asset OR single min/max pair
if all identical, defaults to (0, 1). Must be changed to (-1, 1)
for portfolios with shorting.
:type weight_bounds: tuple OR tuple list, optional
:param solver: name of solver. list available solvers with: ``cvxpy.installed_solvers()``
:type solver: str, optional. Defaults to "ECOS"
:param verbose: whether performance and debugging info should be printed, defaults to False
:type verbose: bool, optional
"""
super().__init__(n_assets, tickers)
# Optimisation variables
self._w = cp.Variable(n_assets)
self._objective = None
self._additional_objectives = []
self._constraints = []
self._lower_bounds = None
self._upper_bounds = None
self._map_bounds_to_constraints(weight_bounds)
self._solver = solver
self._verbose = verbose
def _map_bounds_to_constraints(self, test_bounds):
"""
Process input bounds into a form acceptable by cvxpy and add to the constraints list.
:param test_bounds: minimum and maximum weight of each asset OR single min/max pair
if all identical OR pair of arrays corresponding to lower/upper bounds. defaults to (0, 1).
:type test_bounds: tuple OR list/tuple of tuples OR pair of np arrays
:raises TypeError: if ``test_bounds`` is not of the right type
:return: bounds suitable for cvxpy
:rtype: tuple pair of np.ndarray
"""
# If it is a collection with the right length, assume they are all bounds.
if len(test_bounds) == self.n_assets and not isinstance(
test_bounds[0], (float, int)
):
bounds = np.array(test_bounds, dtype=np.float)
self._lower_bounds = np.nan_to_num(bounds[:, 0], nan=-np.inf)
self._upper_bounds = np.nan_to_num(bounds[:, 1], nan=np.inf)
else:
# Otherwise this must be a pair.
if len(test_bounds) != 2 or not isinstance(test_bounds, (tuple, list)):
raise TypeError(
"test_bounds must be a pair (lower bound, upper bound) "
"OR a collection of bounds for each asset"
)
lower, upper = test_bounds
# Replace None values with the appropriate +/- 1
if np.isscalar(lower) or lower is None:
lower = -1 if lower is None else lower
self._lower_bounds = np.array([lower] * self.n_assets)
upper = 1 if upper is None else upper
self._upper_bounds = np.array([upper] * self.n_assets)
else:
self._lower_bounds = np.nan_to_num(lower, nan=-1)
self._upper_bounds = np.nan_to_num(upper, nan=1)
self._constraints.append(self._w >= self._lower_bounds)
self._constraints.append(self._w <= self._upper_bounds)
# self._constraints.append()
def _solve_cvxpy_opt_problem(self):
"""
Helper method to solve the cvxpy problem and check output,
once objectives and constraints have been defined
:raises exceptions.OptimizationError: if problem is not solvable by cvxpy
"""
try:
opt = cp.Problem(cp.Minimize(self._objective), self._constraints)
if self._solver is not None:
opt.solve(solver=self._solver, verbose=self._verbose)
else:
opt.solve(verbose=self._verbose)
except (TypeError, cp.DCPError) as e:
raise exceptions.OptimizationError from e
if opt.status != "optimal":
raise exceptions.OptimizationError
self.weights = self._w.value.round(16) + 0.0 # +0.0 removes signed zero
return self._make_output_weights()
def add_objective(self, new_objective, **kwargs):
"""
Add a new term into the objective function. This term must be convex,
and built from cvxpy atomic functions.
Example::
def L1_norm(w, k=1):
return k * cp.norm(w, 1)
ef.add_objective(L1_norm, k=2)
:param new_objective: the objective to be added
:type new_objective: cp.Expression (i.e function of cp.Variable)
"""
self._additional_objectives.append(new_objective(self._w, **kwargs))
def add_constraint(self, new_constraint):
"""
Add a new constraint to the optimisation problem. This constraint must be linear and
must be either an equality or simple inequality.
Examples::
ef.add_constraint(lambda x : x[0] == 0.02)
ef.add_constraint(lambda x : x >= 0.01)
ef.add_constraint(lambda x: x <= np.array([0.01, 0.08, ..., 0.5]))
:param new_constraint: the constraint to be added
:type constraintfunc: lambda function
"""
if not callable(new_constraint):
raise TypeError("New constraint must be provided as a lambda function")
self._constraints.append(new_constraint(self._w))
def add_sector_constraints(self, sector_mapper, sector_lower, sector_upper):
"""
Adds constraints on the sum of weights of different groups of assets.
Most commonly, these will be sector constraints e.g portfolio's exposure to
tech must be less than x%::
sector_mapper = {
"GOOG": "tech",
"FB": "tech",,
"XOM": "Oil/Gas",
"RRC": "Oil/Gas",
"MA": "Financials",
"JPM": "Financials",
}
sector_lower = {"tech": 0.1} # at least 10% to tech
sector_upper = {
"tech": 0.4, # less than 40% tech
"Oil/Gas: 0.1 # less than 10% oil and gas
}
:param sector_mapper: dict that maps tickers to sectors
:type sector_mapper: {str: str} dict
:param sector_lower: lower bounds for each sector
:type sector_lower: {str: float} dict
:param sector_upper: upper bounds for each sector
:type sector_upper: {str:float} dict
"""
if np.any(self._lower_bounds < 0):
warnings.warn(
"Sector constraints may not produce reasonable results if shorts are allowed."
)
for sector in sector_upper:
is_sector = [sector_mapper[t] == sector for t in self.tickers]
self._constraints.append(cp.sum(self._w[is_sector]) <= sector_upper[sector])
for sector in sector_lower:
is_sector = [sector_mapper[t] == sector for t in self.tickers]
self._constraints.append(cp.sum(self._w[is_sector]) >= sector_lower[sector])
print(self._constraints)
def convex_objective(self, custom_objective, weights_sum_to_one=True, **kwargs):
"""
Optimise a custom convex objective function. Constraints should be added with
``ef.add_constraint()``. Optimiser arguments must be passed as keyword-args. Example::
# Could define as a lambda function instead
def logarithmic_barrier(w, cov_matrix, k=0.1):
# 60 Years of Portfolio Optimisation, Kolm et al (2014)
return cp.quad_form(w, cov_matrix) - k * cp.sum(cp.log(w))
w = ef.convex_objective(logarithmic_barrier, cov_matrix=ef.cov_matrix)
:param custom_objective: an objective function to be MINIMISED. This should be written using
cvxpy atoms Should map (w, `**kwargs`) -> float.
:type custom_objective: function with signature (cp.Variable, `**kwargs`) -> cp.Expression
:param weights_sum_to_one: whether to add the default objective, defaults to True
:type weights_sum_to_one: bool, optional
:raises OptimizationError: if the objective is nonconvex or constraints nonlinear.
:return: asset weights for the efficient risk portfolio
:rtype: OrderedDict
"""
# custom_objective must have the right signature (w, **kwargs)
self._objective = custom_objective(self._w, **kwargs)
for obj in self._additional_objectives:
self._objective += obj
if weights_sum_to_one:
self._constraints.append(cp.sum(self._w) == 1)
return self._solve_cvxpy_opt_problem()
def nonconvex_objective(
self,
custom_objective,
objective_args=None,
weights_sum_to_one=True,
constraints=None,
solver="SLSQP",
initial_guess=None,
):
"""
Optimise some objective function using the scipy backend. This can
support nonconvex objectives and nonlinear constraints, but often gets stuck
at local minima. This method is not recommended – caveat emptor. Example::
# Market-neutral efficient risk
constraints = [
{"type": "eq", "fun": lambda w: np.sum(w)}, # weights sum to zero
{
"type": "eq",
"fun": lambda w: target_risk ** 2 - np.dot(w.T, np.dot(ef.cov_matrix, w)),
}, # risk = target_risk
]
ef.nonconvex_objective(
lambda w, mu: -w.T.dot(mu), # min negative return (i.e maximise return)
objective_args=(ef.expected_returns,),
weights_sum_to_one=False,
constraints=constraints,
)
:param objective_function: an objective function to be MINIMISED. This function
should map (weight, args) -> cost
:type objective_function: function with signature (np.ndarray, args) -> float
:param objective_args: arguments for the objective function (excluding weight)
:type objective_args: tuple of np.ndarrays
:param weights_sum_to_one: whether to add the default objective, defaults to True
:type weights_sum_to_one: bool, optional
:param constraints: list of constraints in the scipy format (i.e dicts)
:type constraints: dict list
:param solver: which SCIPY solver to use, e.g "SLSQP", "COBYLA", "BFGS".
User beware: different optimisers require different inputs.
:type solver: string
:param initial_guess: the initial guess for the weights, shape (n,) or (n, 1)
:type initial_guess: np.ndarray
:return: asset weights that optimise the custom objective
:rtype: OrderedDict
"""
# Sanitise inputs
if not isinstance(objective_args, tuple):
objective_args = (objective_args,)
# Make scipy bounds
bound_array = np.vstack((self._lower_bounds, self._upper_bounds)).T
bounds = list(map(tuple, bound_array))
if initial_guess is None:
initial_guess = np.array([1 / self.n_assets] * self.n_assets)
# Construct constraints
final_constraints = []
if weights_sum_to_one:
final_constraints.append({"type": "eq", "fun": lambda x: np.sum(x) - 1})
if constraints is not None:
final_constraints += constraints
result = sco.minimize(
custom_objective,
x0=initial_guess,
args=objective_args,
method=solver,
bounds=bounds,
constraints=final_constraints,
)
self.weights = result["x"]
return self._make_output_weights()
def portfolio_performance(
weights, expected_returns, cov_matrix, verbose=False, risk_free_rate=0.02
):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio.
:param expected_returns: expected returns for each asset. Can be None if
optimising for volatility only (but not recommended).
:type expected_returns: np.ndarray or pd.Series
:param cov_matrix: covariance of returns for each asset
:type cov_matrix: np.array or pd.DataFrame
:param weights: weights or assets
:type weights: list, np.array or dict, optional
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02
:type risk_free_rate: float, optional
:raises ValueError: if weights have not been calcualted yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
if isinstance(weights, dict):
if isinstance(expected_returns, pd.Series):
tickers = list(expected_returns.index)
elif isinstance(cov_matrix, pd.DataFrame):
tickers = list(cov_matrix.columns)
else:
tickers = list(range(len(expected_returns)))
new_weights = np.zeros(len(tickers))
for i, k in enumerate(tickers):
if k in weights:
new_weights[i] = weights[k]
if new_weights.sum() == 0:
raise ValueError("Weights add to zero, or ticker names don't match")
elif weights is not None:
new_weights = np.asarray(weights)
else:
raise ValueError("Weights is None")
sigma = np.sqrt(objective_functions.portfolio_variance(new_weights, cov_matrix))
if expected_returns is not None:
mu = objective_functions.portfolio_return(
new_weights, expected_returns, negative=False
)
sharpe = objective_functions.sharpe_ratio(
new_weights,
expected_returns,
cov_matrix,
risk_free_rate=risk_free_rate,
negative=False,
)
if verbose:
print("Expected annual return: {:.1f}%".format(100 * mu))
print("Annual volatility: {:.1f}%".format(100 * sigma))
print("Sharpe Ratio: {:.2f}".format(sharpe))
return mu, sigma, sharpe
else:
if verbose:
print("Annual volatility: {:.1f}%".format(100 * sigma))
return None, sigma, None
"""
The ``black_litterman`` module houses the BlackLittermanModel class, which
generates posterior estimates of expected returns given a prior estimate and user-supplied
views. In addition, two utility functions are defined, which calculate:
- market-implied prior estimate of returns
- market-implied risk-aversion parameter
"""
import sys
import warnings
import numpy as np
import pandas as pd
from . import base_optimizer
def market_implied_prior_returns(
market_caps, risk_aversion, cov_matrix, risk_free_rate=0.02
):
r"""
Compute the prior estimate of returns implied by the market weights.
In other words, given each asset's contribution to the risk of the market
portfolio, how much are we expecting to be compensated?
.. math::
\Pi = \delta \Sigma w_{mkt}
:param market_caps: market capitalisations of all assets
:type market_caps: {ticker: cap} dict or pd.Series
:param risk_aversion: risk aversion parameter
:type risk_aversion: positive float
:param cov_matrix: covariance matrix of asset returns
:type cov_matrix: pd.DataFrame
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
You should use the appropriate time period, corresponding
to the covariance matrix.
:type risk_free_rate: float, optional
:return: prior estimate of returns as implied by the market caps
:rtype: pd.Series
"""
if not isinstance(cov_matrix, pd.DataFrame):
warnings.warn(
"If cov_matrix is not a dataframe, market cap index must be aligned to cov_matrix",
RuntimeWarning,
)
mcaps = pd.Series(market_caps)
mkt_weights = mcaps / mcaps.sum()
# Pi is excess returns so must add risk_free_rate to get return.
return risk_aversion * cov_matrix.dot(mkt_weights) + risk_free_rate
def market_implied_risk_aversion(market_prices, frequency=252, risk_free_rate=0.02):
r"""
Calculate the market-implied risk-aversion parameter (i.e market price of risk)
based on market prices. For example, if the market has excess returns of 10% a year
with 5% variance, the risk-aversion parameter is 2, i.e you have to be compensated 2x
the variance.
.. math::
\delta = \frac{R - R_f}{\sigma^2}
:param market_prices: the (daily) prices of the market portfolio, e.g SPY.
:type market_prices: pd.Series with DatetimeIndex.
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
The period of the risk-free rate should correspond to the
frequency of expected returns.
:type risk_free_rate: float, optional
:raises TypeError: if market_prices cannot be parsed
:return: market-implied risk aversion
:rtype: float
"""
if not isinstance(market_prices, (pd.Series, pd.DataFrame)):
raise TypeError("Please format market_prices as a pd.Series")
rets = market_prices.pct_change().dropna()
r = rets.mean() * frequency
var = rets.var() * frequency
return (r - risk_free_rate) / var
class BlackLittermanModel(base_optimizer.BaseOptimizer):
"""
A BlackLittermanModel object (inheriting from BaseOptimizer) contains requires
a specific input format, specifying the prior, the views, the uncertainty in views,
and a picking matrix to map views to the asset universe. We can then compute
posterior estimates of returns and covariance. Helper methods have been provided
to supply defaults where possible.
Instance variables:
- Inputs:
- ``cov_matrix`` - np.ndarray
- ``n_assets`` - int
- ``tickers`` - str list
- ``Q`` - np.ndarray
- ``P`` - np.ndarray
- ``pi`` - np.ndarray
- ``omega`` - np.ndarray
- ``tau`` - float
- Output:
- ``posterior_rets`` - pd.Series
- ``posterior_cov`` - pd.DataFrame
- ``weights`` - np.ndarray
Public methods:
- ``default_omega()`` - view uncertainty proportional to asset variance
- ``idzorek_method()`` - convert views specified as percentages into BL uncertainties
- ``bl_returns()`` - posterior estimate of returns
- ``bl_cov()`` - posterior estimate of covariance
- ``bl_weights()`` - weights implied by posterior returns
- ``portfolio_performance()`` calculates the expected return, volatility
and Sharpe ratio for the allocated portfolio.
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(
self,
cov_matrix,
pi=None,
absolute_views=None,
Q=None,
P=None,
omega=None,
view_confidences=None,
tau=0.05,
risk_aversion=1,
**kwargs
):
"""
:param cov_matrix: NxN covariance matrix of returns
:type cov_matrix: pd.DataFrame or np.ndarray
:param pi: Nx1 prior estimate of returns, defaults to None.
If pi="market", calculate a market-implied prior (requires market_caps
to be passed).
If pi="equal", use an equal-weighted prior.
:type pi: np.ndarray, pd.Series, optional
:param absolute_views: a colleciton of K absolute views on a subset of assets,
defaults to None. If this is provided, we do not need P, Q.
:type absolute_views: pd.Series or dict, optional
:param Q: Kx1 views vector, defaults to None
:type Q: np.ndarray or pd.DataFrame, optional
:param P: KxN picking matrix, defaults to None
:type P: np.ndarray or pd.DataFrame, optional
:param omega: KxK view uncertainty matrix (diagonal), defaults to None
Can instead pass "idzorek" to use Idzorek's method (requires
you to pass view_confidences). If omega="default" or None,
we set the uncertainty proportional to the variance.
:type omega: np.ndarray or Pd.DataFrame, or string, optional
:param view_confidences: Kx1 vector of percentage view confidences (between 0 and 1),
required to compute omega via Idzorek's method.
:type view_confidences: np.ndarray, pd.Series, list, optional
:param tau: the weight-on-views scalar (default is 0.05)
:type tau: float, optional
:param risk_aversion: risk aversion parameter, defaults to 1
:type risk_aversion: positive float, optional
:param market_caps: (kwarg) market caps for the assets, required if pi="market"
:type market_caps: np.ndarray, pd.Series, optional
:param risk_free_rate: (kwarg) risk_free_rate is needed in some methods
:type risk_free_rate: float, defaults to 0.02
"""
if sys.version_info[1] == 5: # if python 3.5
warnings.warn(
"When using python 3.5 you must explicitly construct the Black-Litterman inputs"
)
# Keep raw dataframes
self._raw_cov_matrix = cov_matrix
#  Initialise base optimiser
if isinstance(cov_matrix, np.ndarray):
self.cov_matrix = cov_matrix
super().__init__(len(cov_matrix), list(range(len(cov_matrix))))
else:
self.cov_matrix = cov_matrix.values
super().__init__(len(cov_matrix), cov_matrix.columns)
#  Sanitise inputs
if absolute_views is not None:
self.Q, self.P = self._parse_views(absolute_views)
else:
self._set_Q_P(Q, P)
self._set_risk_aversion(risk_aversion)
self._set_pi(pi, **kwargs)
self._set_tau(tau)
# Make sure all dimensions work
self._check_attribute_dimensions()
self._set_omega(omega, view_confidences)
# Private intermediaries
self._tau_sigma_P = None
self._A = None
self.posterior_rets = None
self.posterior_cov = None
def _parse_views(self, absolute_views):
"""
Given a collection (dict or series) of absolute views, construct
the appropriate views vector and picking matrix. The views must
be a subset of the tickers in the covariance matrix.
{"AAPL": 0.20, "GOOG": 0.12, "XOM": -0.30}
:param absolute_views: absolute views on asset performances
:type absolute_views: dict, pd.Series
"""
if not isinstance(absolute_views, (dict, pd.Series)):
raise TypeError("views should be a dict or pd.Series")
# Coerce to series
views = pd.Series(absolute_views)
k = len(views)
Q = np.zeros((k, 1))
P = np.zeros((k, self.n_assets))
for i, view_ticker in enumerate(views.keys()):
try:
Q[i] = views[view_ticker]
P[i, list(self.tickers).index(view_ticker)] = 1
except ValueError:
#  Could make this smarter by just skipping
raise ValueError("Providing a view on an asset not in the universe")
return Q, P
def _set_Q_P(self, Q, P):
if isinstance(Q, (pd.Series, pd.DataFrame)):
self.Q = Q.values.reshape(-1, 1)
elif isinstance(Q, np.ndarray):
self.Q = Q.reshape(-1, 1)
else:
raise TypeError("Q must be an array or dataframe")
if isinstance(P, pd.DataFrame):
self.P = P.values
elif isinstance(P, np.ndarray):
self.P = P
elif len(self.Q) == self.n_assets:
# If a view on every asset is provided, P defaults
# to the identity matrix.
self.P = np.eye(self.n_assets)
else:
raise TypeError("P must be an array or dataframe")
def _set_pi(self, pi, **kwargs):
if pi is None:
warnings.warn("Running Black-Litterman with no prior.")
self.pi = np.zeros((self.n_assets, 1))
elif isinstance(pi, (pd.Series, pd.DataFrame)):
self.pi = pi.values.reshape(-1, 1)
elif isinstance(pi, np.ndarray):
self.pi = pi.reshape(-1, 1)
elif pi == "market":
if "market_caps" not in kwargs:
raise ValueError(
"Please pass a series/array of market caps via the market_caps keyword argument"
)
market_caps = kwargs.get("market_caps")
risk_free_rate = kwargs.get("risk_free_rate", 0)
market_prior = market_implied_prior_returns(
market_caps, self.risk_aversion, self._raw_cov_matrix, risk_free_rate
)
self.pi = market_prior.values.reshape(-1, 1)
elif pi == "equal":
self.pi = np.ones((self.n_assets, 1)) / self.n_assets
else:
raise TypeError("pi must be an array or series")
def _set_tau(self, tau):
if tau <= 0 or tau > 1:
raise ValueError("tau should be between 0 and 1")
self.tau = tau
def _set_risk_aversion(self, risk_aversion):
if risk_aversion <= 0:
raise ValueError("risk_aversion should be a positive float")
self.risk_aversion = risk_aversion
def _set_omega(self, omega, view_confidences):
if isinstance(omega, pd.DataFrame):
self.omega = omega.values
elif isinstance(omega, np.ndarray):
self.omega = omega
elif omega == "idzorek":
if view_confidences is None:
raise ValueError(
"To use Idzorek's method, please supply a vector of percentage "
"confidence levels for each view."
)
if not isinstance(view_confidences, np.ndarray):
try:
view_confidences = np.array(view_confidences).reshape(-1, 1)
assert view_confidences.shape[0] == self.Q.shape[0]
assert np.issubdtype(view_confidences.dtype, np.number)
except AssertionError:
raise ValueError(
"view_confidences should be a numpy 1D array or vector with the same length "
"as the number of views."
)
self.omega = BlackLittermanModel.idzorek_method(
view_confidences,
self.cov_matrix,
self.pi,
self.Q,
self.P,
self.tau,
self.risk_aversion,
)
elif omega is None or omega == "default":
self.omega = BlackLittermanModel.default_omega(
self.cov_matrix, self.P, self.tau
)
else:
raise TypeError("self.omega must be a square array, dataframe, or string")
K = len(self.Q)
assert self.omega.shape == (K, K), "omega must have dimensions KxK"
def _check_attribute_dimensions(self):
"""
Helper method to ensure that all of the attributes created by the initialiser
have the correct dimensions, to avoid linear algebra errors later on.
:raises ValueError: if there are incorrect dimensions.
"""
N = self.n_assets
K = len(self.Q)
assert self.pi.shape == (N, 1), "pi must have dimensions Nx1"
assert self.P.shape == (K, N), "P must have dimensions KxN"
assert self.cov_matrix.shape == (N, N), "cov_matrix must have shape NxN"
@staticmethod
def default_omega(cov_matrix, P, tau):
"""
If the uncertainty matrix omega is not provided, we calculate using the method of
He and Litterman (1999), such that the ratio omega/tau is proportional to the
variance of the view portfolio.
:return: KxK diagonal uncertainty matrix
:rtype: np.ndarray
"""
return np.diag(np.diag(tau * P @ cov_matrix @ P.T))
@staticmethod
def idzorek_method(view_confidences, cov_matrix, pi, Q, P, tau, risk_aversion=1):
"""
Use Idzorek's method to create the uncertainty matrix given user-specified
percentage confidences. We use the closed-form solution described by
Jay Walters in The Black-Litterman Model in Detail (2014).
:param view_confidences: Kx1 vector of percentage view confidences (between 0 and 1),
required to compute omega via Idzorek's method.
:type view_confidences: np.ndarray, pd.Series, list,, optional
:return: KxK diagonal uncertainty matrix
:rtype: np.ndarray
"""
view_omegas = []
for view_idx in range(len(Q)):
conf = view_confidences[view_idx]
if conf < 0 or conf > 1:
raise ValueError("View confidences must be between 0 and 1")
# Special handler to avoid dividing by zero.
# If zero conf, return very big number as uncertainty
if conf == 0:
view_omegas.append(1e6)
continue
P_view = P[view_idx].reshape(1, -1)
alpha = (1 - conf) / conf # formula (44)
omega = tau * alpha * P_view @ cov_matrix @ P_view.T # formula (41)
view_omegas.append(omega.item())
return np.diag(view_omegas)
def bl_returns(self):
"""
Calculate the posterior estimate of the returns vector,
given views on some assets.
:return: posterior returns vector
:rtype: pd.Series
"""
if self._tau_sigma_P is None:
self._tau_sigma_P = self.tau * self.cov_matrix @ self.P.T
# Solve the linear system Ax = b to avoid inversion
if self._A is None:
self._A = (self.P @ self._tau_sigma_P) + self.omega
b = self.Q - self.P @ self.pi
post_rets = self.pi + self._tau_sigma_P @ np.linalg.solve(self._A, b)
return pd.Series(post_rets.flatten(), index=self.tickers)
def bl_cov(self):
"""
Calculate the posterior estimate of the covariance matrix,
given views on some assets. Based on He and Litterman (2002).
It is assumed that omega is diagonal. If this is not the case,
please manually set omega_inv.
:return: posterior covariance matrix
:rtype: pd.DataFrame
"""
if self._tau_sigma_P is None:
self._tau_sigma_P = self.tau * self.cov_matrix @ self.P.T
if self._A is None:
self._A = (self.P @ self._tau_sigma_P) + self.omega
b = self._tau_sigma_P.T
M = self.tau * self.cov_matrix - self._tau_sigma_P @ np.linalg.solve(self._A, b)
posterior_cov = self.cov_matrix + M
return pd.DataFrame(posterior_cov, index=self.tickers, columns=self.tickers)
def bl_weights(self, risk_aversion=None):
r"""
Compute the weights implied by the posterior returns, given the
market price of risk. Technically this can be applied to any
estimate of the expected returns, and is in fact a special case
of efficient frontier optimisation.
.. math::
w = (\delta \Sigma)^{-1} E(R)
:param risk_aversion: risk aversion parameter, defaults to 1
:type risk_aversion: positive float, optional
:return: asset weights implied by returns
:rtype: OrderedDict
"""
if risk_aversion is None:
risk_aversion = self.risk_aversion
self.posterior_rets = self.bl_returns()
A = risk_aversion * self.cov_matrix
b = self.posterior_rets
raw_weights = np.linalg.solve(A, b)
self.weights = raw_weights / raw_weights.sum()
return self._make_output_weights()
def optimize(self, risk_aversion=None):
"""
Alias for bl_weights for consistency with other methods.
"""
return self.bl_weights(risk_aversion)
def portfolio_performance(self, verbose=False, risk_free_rate=0.02):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio.
This method uses the BL posterior returns and covariance matrix.
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
The period of the risk-free rate should correspond to the
frequency of expected returns.
:type risk_free_rate: float, optional
:raises ValueError: if weights have not been calcualted yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
if self.posterior_cov is None:
self.posterior_cov = self.bl_cov()
return base_optimizer.portfolio_performance(
self.weights,
self.posterior_rets,
self.posterior_cov,
verbose,
risk_free_rate,
)
"""
The ``cla`` module houses the CLA class, which
generates optimal portfolios using the Critical Line Algorithm as implemented
by Marcos Lopez de Prado and David Bailey.
"""
import math
import numpy as np
import pandas as pd
from . import base_optimizer
class CLA(base_optimizer.BaseOptimizer):
"""
Instance variables:
- Inputs:
- ``n_assets`` - int
- ``tickers`` - str list
- ``mean`` - np.ndarray
- ``cov_matrix`` - np.ndarray
- ``expected_returns`` - np.ndarray
- ``lb`` - np.ndarray
- ``ub`` - np.ndarray
- Optimisation parameters:
- ``w`` - np.ndarray list
- ``ls`` - float list
- ``g`` - float list
- ``f`` - float list list
- Outputs:
- ``weights`` - np.ndarray
- ``frontier_values`` - (float list, float list, np.ndarray list)
Public methods:
- ``max_sharpe()`` optimises for maximal Sharpe ratio (a.k.a the tangency portfolio)
- ``min_volatility()`` optimises for minimum volatility
- ``efficient_frontier()`` computes the entire efficient frontier
- ``portfolio_performance()`` calculates the expected return, volatility and Sharpe ratio for
the optimised portfolio.
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(self, expected_returns, cov_matrix, weight_bounds=(0, 1)):
"""
:param expected_returns: expected returns for each asset. Set to None if
optimising for volatility only.
:type expected_returns: pd.Series, list, np.ndarray
:param cov_matrix: covariance of returns for each asset
:type cov_matrix: pd.DataFrame or np.array
:param weight_bounds: minimum and maximum weight of an asset, defaults to (0, 1).
Must be changed to (-1, 1) for portfolios with shorting.
:type weight_bounds: tuple (float, float) or (list/ndarray, list/ndarray)
:raises TypeError: if ``expected_returns`` is not a series, list or array
:raises TypeError: if ``cov_matrix`` is not a dataframe or array
"""
# Initialize the class
self.mean = np.array(expected_returns).reshape((len(expected_returns), 1))
if (self.mean == np.ones(self.mean.shape) * self.mean.mean()).all():
self.mean[-1, 0] += 1e-5
self.expected_returns = self.mean.reshape((len(self.mean),))
self.cov_matrix = np.asarray(cov_matrix)
# Bounds
if len(weight_bounds) == len(self.mean) and not isinstance(
weight_bounds[0], (float, int)
):
self.lB = np.array([b[0] for b in weight_bounds]).reshape(-1, 1)
self.uB = np.array([b[1] for b in weight_bounds]).reshape(-1, 1)
else:
if isinstance(weight_bounds[0], (float, int)):
self.lB = np.ones(self.mean.shape) * weight_bounds[0]
else:
self.lB = np.array(weight_bounds[0]).reshape(self.mean.shape)
if isinstance(weight_bounds[0], (float, int)):
self.uB = np.ones(self.mean.shape) * weight_bounds[1]
else:
self.uB = np.array(weight_bounds[1]).reshape(self.mean.shape)
self.w = [] # solution
self.ls = [] # lambdas
self.g = [] # gammas
self.f = [] # free weights
self.frontier_values = None # result of computing efficient frontier
if isinstance(expected_returns, pd.Series):
tickers = list(expected_returns.index)
else:
tickers = list(range(len(self.mean)))
super().__init__(len(tickers), tickers)
@staticmethod
def _infnone(x):
"""
Helper method to map None to float infinity.
:param x: argument
:type x: float
:return: infinity if the argmument was None otherwise x
:rtype: float
"""
return float("-inf") if x is None else x
def _init_algo(self):
# Initialize the algo
# 1) Form structured array
a = np.zeros((self.mean.shape[0]), dtype=[("id", int), ("mu", float)])
b = [self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
# fill structured array
a[:] = list(zip(list(range(self.mean.shape[0])), b))
# 2) Sort structured array
b = np.sort(a, order="mu")
# 3) First free weight
i, w = b.shape[0], np.copy(self.lB)
while sum(w) < 1:
i -= 1
w[b[i][0]] = self.uB[b[i][0]]
w[b[i][0]] += 1 - sum(w)
return [b[i][0]], w
def _compute_bi(self, c, bi):
if c > 0:
bi = bi[1][0]
if c < 0:
bi = bi[0][0]
return bi
def _compute_w(self, covarF_inv, covarFB, meanF, wB):
# 1) compute gamma
onesF = np.ones(meanF.shape)
g1 = np.dot(np.dot(onesF.T, covarF_inv), meanF)
g2 = np.dot(np.dot(onesF.T, covarF_inv), onesF)
if wB is None:
g, w1 = float(-self.ls[-1] * g1 / g2 + 1 / g2), 0
else:
onesB = np.ones(wB.shape)
g3 = np.dot(onesB.T, wB)
g4 = np.dot(covarF_inv, covarFB)
w1 = np.dot(g4, wB)
g4 = np.dot(onesF.T, w1)
g = float(-self.ls[-1] * g1 / g2 + (1 - g3 + g4) / g2)
# 2) compute weights
w2 = np.dot(covarF_inv, onesF)
w3 = np.dot(covarF_inv, meanF)
return -w1 + g * w2 + self.ls[-1] * w3, g
def _compute_lambda(self, covarF_inv, covarFB, meanF, wB, i, bi):
# 1) C
onesF = np.ones(meanF.shape)
c1 = np.dot(np.dot(onesF.T, covarF_inv), onesF)
c2 = np.dot(covarF_inv, meanF)
c3 = np.dot(np.dot(onesF.T, covarF_inv), meanF)
c4 = np.dot(covarF_inv, onesF)
c = -c1 * c2[i] + c3 * c4[i]
if c == 0:
return None, None
# 2) bi
if type(bi) == list:
bi = self._compute_bi(c, bi)
# 3) Lambda
if wB is None:
# All free assets
return float((c4[i] - c1 * bi) / c), bi
else:
onesB = np.ones(wB.shape)
l1 = np.dot(onesB.T, wB)
l2 = np.dot(covarF_inv, covarFB)
l3 = np.dot(l2, wB)
l2 = np.dot(onesF.T, l3)
return float(((1 - l1 + l2) * c4[i] - c1 * (bi + l3[i])) / c), bi
def _get_matrices(self, f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF = self._reduce_matrix(self.cov_matrix, f, f)
meanF = self._reduce_matrix(self.mean, f, [0])
b = self._get_b(f)
covarFB = self._reduce_matrix(self.cov_matrix, f, b)
wB = self._reduce_matrix(self.w[-1], b, [0])
return covarF, covarFB, meanF, wB
def _get_b(self, f):
return self._diff_lists(list(range(self.mean.shape[0])), f)
@staticmethod
def _diff_lists(list1, list2):
return list(set(list1) - set(list2))
@staticmethod
def _reduce_matrix(matrix, listX, listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX) == 0 or len(listY) == 0:
return
matrix_ = matrix[:, listY[0] : listY[0] + 1]
for i in listY[1:]:
a = matrix[:, i : i + 1]
matrix_ = np.append(matrix_, a, 1)
matrix__ = matrix_[listX[0] : listX[0] + 1, :]
for i in listX[1:]:
a = matrix_[i : i + 1, :]
matrix__ = np.append(matrix__, a, 0)
return matrix__
def _purge_num_err(self, tol):
# Purge violations of inequality constraints (associated with ill-conditioned cov matrix)
i = 0
while True:
flag = False
if i == len(self.w):
break
if abs(sum(self.w[i]) - 1) > tol:
flag = True
else:
for j in range(self.w[i].shape[0]):
if (
self.w[i][j] - self.lB[j] < -tol
or self.w[i][j] - self.uB[j] > tol
):
flag = True
break
if flag is True:
del self.w[i]
del self.ls[i]
del self.g[i]
del self.f[i]
else:
i += 1
def _purge_excess(self):
# Remove violations of the convex hull
i, repeat = 0, False
while True:
if repeat is False:
i += 1
if i == len(self.w) - 1:
break
w = self.w[i]
mu = np.dot(w.T, self.mean)[0, 0]
j, repeat = i + 1, False
while True:
if j == len(self.w):
break
w = self.w[j]
mu_ = np.dot(w.T, self.mean)[0, 0]
if mu < mu_:
del self.w[i]
del self.ls[i]
del self.g[i]
del self.f[i]
repeat = True
break
else:
j += 1
def _golden_section(self, obj, a, b, **kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed
tol, sign, args = 1.0e-9, 1, None
if "minimum" in kargs and kargs["minimum"] is False:
sign = -1
if "args" in kargs:
args = kargs["args"]
numIter = int(math.ceil(-2.078087 * math.log(tol / abs(b - a))))
r = 0.618033989
c = 1.0 - r
# Initialize
x1 = r * a + c * b
x2 = c * a + r * b
f1 = sign * obj(x1, *args)
f2 = sign * obj(x2, *args)
# Loop
for i in range(numIter):
if f1 > f2:
a = x1
x1 = x2
f1 = f2
x2 = c * a + r * b
f2 = sign * obj(x2, *args)
else:
b = x2
x2 = x1
f2 = f1
x1 = r * a + c * b
f1 = sign * obj(x1, *args)
if f1 < f2:
return x1, sign * f1
else:
return x2, sign * f2
def _eval_sr(self, a, w0, w1):
# Evaluate SR of the portfolio within the convex combination
w = a * w0 + (1 - a) * w1
b = np.dot(w.T, self.mean)[0, 0]
c = np.dot(np.dot(w.T, self.cov_matrix), w)[0, 0] ** 0.5
return b / c
def _solve(self):
# Compute the turning points,free sets and weights
f, w = self._init_algo()
self.w.append(np.copy(w)) # store solution
self.ls.append(None)
self.g.append(None)
self.f.append(f[:])
while True:
# 1) case a): Bound one free weight
l_in = None
if len(f) > 1:
covarF, covarFB, meanF, wB = self._get_matrices(f)
covarF_inv = np.linalg.inv(covarF)
j = 0
for i in f:
l, bi = self._compute_lambda(
covarF_inv, covarFB, meanF, wB, j, [self.lB[i], self.uB[i]]
)
if CLA._infnone(l) > CLA._infnone(l_in):
l_in, i_in, bi_in = l, i, bi
j += 1
# 2) case b): Free one bounded weight
l_out = None
if len(f) < self.mean.shape[0]:
b = self._get_b(f)
for i in b:
covarF, covarFB, meanF, wB = self._get_matrices(f + [i])
covarF_inv = np.linalg.inv(covarF)
l, bi = self._compute_lambda(
covarF_inv,
covarFB,
meanF,
wB,
meanF.shape[0] - 1,
self.w[-1][i],
)
if (self.ls[-1] is None or l < self.ls[-1]) and l > CLA._infnone(
l_out
):
l_out, i_out = l, i
if (l_in is None or l_in < 0) and (l_out is None or l_out < 0):
# 3) compute minimum variance solution
self.ls.append(0)
covarF, covarFB, meanF, wB = self._get_matrices(f)
covarF_inv = np.linalg.inv(covarF)
meanF = np.zeros(meanF.shape)
else:
# 4) decide lambda
if CLA._infnone(l_in) > CLA._infnone(l_out):
self.ls.append(l_in)
f.remove(i_in)
w[i_in] = bi_in # set value at the correct boundary
else:
self.ls.append(l_out)
f.append(i_out)
covarF, covarFB, meanF, wB = self._get_matrices(f)
covarF_inv = np.linalg.inv(covarF)
# 5) compute solution vector
wF, g = self._compute_w(covarF_inv, covarFB, meanF, wB)
for i in range(len(f)):
w[f[i]] = wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if self.ls[-1] == 0:
break
# 6) Purge turning points
self._purge_num_err(10e-10)
self._purge_excess()
def max_sharpe(self):
"""
Maximise the Sharpe ratio.
:return: asset weights for the max-sharpe portfolio
:rtype: OrderedDict
"""
if not self.w:
self._solve()
# 1) Compute the local max SR portfolio between any two neighbor turning points
w_sr, sr = [], []
for i in range(len(self.w) - 1):
w0 = np.copy(self.w[i])
w1 = np.copy(self.w[i + 1])
kargs = {"minimum": False, "args": (w0, w1)}
a, b = self._golden_section(self._eval_sr, 0, 1, **kargs)
w_sr.append(a * w0 + (1 - a) * w1)
sr.append(b)
self.weights = w_sr[sr.index(max(sr))].reshape((self.n_assets,))
return self._make_output_weights()
def min_volatility(self):
"""
Minimise volatility.
:return: asset weights for the volatility-minimising portfolio
:rtype: OrderedDict
"""
if not self.w:
self._solve()
var = []
for w in self.w:
a = np.dot(np.dot(w.T, self.cov_matrix), w)
var.append(a)
# return min(var)**.5, self.w[var.index(min(var))]
self.weights = self.w[var.index(min(var))].reshape((self.n_assets,))
return self._make_output_weights()
def efficient_frontier(self, points=100):
"""
Efficiently compute the entire efficient frontier
:param points: rough number of points to evaluate, defaults to 100
:type points: int, optional
:raises ValueError: if weights have not been computed
:return: return list, std list, weight list
:rtype: (float list, float list, np.ndarray list)
"""
if not self.w:
self._solve()
mu, sigma, weights = [], [], []
# remove the 1, to avoid duplications
a = np.linspace(0, 1, points // len(self.w))[:-1]
b = list(range(len(self.w) - 1))
for i in b:
w0, w1 = self.w[i], self.w[i + 1]
if i == b[-1]:
# include the 1 in the last iteration
a = np.linspace(0, 1, points // len(self.w))
for j in a:
w = w1 * j + (1 - j) * w0
weights.append(np.copy(w))
mu.append(np.dot(w.T, self.mean)[0, 0])
sigma.append(np.dot(np.dot(w.T, self.cov_matrix), w)[0, 0] ** 0.5)
self.frontier_values = (mu, sigma, weights)
return mu, sigma, weights
def set_weights(self, _):
# Overrides parent method since set_weights does nothing.
raise NotImplementedError("set_weights does nothing for CLA")
def portfolio_performance(self, verbose=False, risk_free_rate=0.02):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio.
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02
:type risk_free_rate: float, optional
:raises ValueError: if weights have not been calculated yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
return base_optimizer.portfolio_performance(
self.weights,
self.expected_returns,
self.cov_matrix,
verbose,
risk_free_rate,
)
"""
The ``discrete_allocation`` module contains the ``DiscreteAllocation`` class, which
offers multiple methods to generate a discrete portfolio allocation from continuous weights.
"""
import collections
import numpy as np
import pandas as pd
import cvxpy as cp
from . import exceptions
def get_latest_prices(prices):
"""
A helper tool which retrieves the most recent asset prices from a dataframe of
asset prices, required in order to generate a discrete allocation.
:param prices: historical asset prices
:type prices: pd.DataFrame
:raises TypeError: if prices are not in a dataframe
:return: the most recent price of each asset
:rtype: pd.Series
"""
if not isinstance(prices, pd.DataFrame):
raise TypeError("prices not in a dataframe")
return prices.ffill().iloc[-1]
class DiscreteAllocation:
"""
Generate a discrete portfolio allocation from continuous weights
Instance variables:
- Inputs:
- ``weights`` - dict
- ``latest_prices`` - pd.Series or dict
- ``total_portfolio_value`` - int/float
- ``short_ratio``- float
- Output: ``allocation`` - dict
Public methods:
- ``greedy_portfolio()`` - uses a greedy algorithm
- ``lp_portfolio()`` - uses linear programming
"""
def __init__(
self, weights, latest_prices, total_portfolio_value=10000, short_ratio=0.30
):
"""
:param weights: continuous weights generated from the ``efficient_frontier`` module
:type weights: dict
:param latest_prices: the most recent price for each asset
:type latest_prices: pd.Series
:param total_portfolio_value: the desired total value of the portfolio, defaults to 10000
:type total_portfolio_value: int/float, optional
:param short_ratio: the short ratio, e.g 0.3 corresponds to 130/30
:type short_ratio: float
:raises TypeError: if ``weights`` is not a dict
:raises TypeError: if ``latest_prices`` isn't a series
:raises ValueError: if ``short_ratio < 0``
"""
if not isinstance(weights, dict):
raise TypeError("weights should be a dictionary of {ticker: weight}")
if not isinstance(latest_prices, pd.Series):
raise TypeError("latest_prices should be a pd.Series")
if total_portfolio_value <= 0:
raise ValueError("total_portfolio_value must be greater than zero")
if short_ratio <= 0:
raise ValueError("short_ratio must be positive")
# Drop any companies with negligible weights. Use a tuple because order matters.
self.weights = list(weights.items())
self.latest_prices = latest_prices
self.total_portfolio_value = total_portfolio_value
self.short_ratio = short_ratio
@staticmethod
def _remove_zero_positions(allocation):
"""
Utility function to remove zero positions (i.e with no shares being bought)
:type allocation: dict
"""
return {k: v for k, v in allocation.items() if v != 0}
def _allocation_rmse_error(self, verbose=True):
"""
Utility function to calculate and print RMSE error between discretised
weights and continuous weights. RMSE was used instead of MAE because we
want to penalise large variations.
:param verbose: print weight discrepancies?
:type verbose: bool
:return: rmse error
:rtype: float
"""
portfolio_val = 0
for ticker, num in self.allocation.items():
portfolio_val += num * self.latest_prices[ticker]
sse = 0 # sum of square errors
for ticker, weight in self.weights:
if ticker in self.allocation:
allocation_weight = (
self.allocation[ticker] * self.latest_prices[ticker] / portfolio_val
)
else:
allocation_weight = 0
sse += (weight - allocation_weight) ** 2
if verbose:
print(
"{}: allocated {:.3f}, desired {:.3f}".format(
ticker, allocation_weight, weight
)
)
rmse = np.sqrt(sse / len(self.weights))
print("Allocation has RMSE: {:.3f}".format(rmse))
return rmse
def greedy_portfolio(self, verbose=False):
"""
Convert continuous weights into a discrete portfolio allocation
using a greedy iterative approach.
:param verbose: print error analysis?
:type verbose: bool
:return: the number of shares of each ticker that should be purchased,
along with the amount of funds leftover.
:rtype: (dict, float)
"""
# Sort in descending order of weight
self.weights.sort(key=lambda x: x[1], reverse=True)
# If portfolio contains shorts
if self.weights[-1][1] < 0:
longs = {t: w for t, w in self.weights if w >= 0}
shorts = {t: -w for t, w in self.weights if w < 0}
# Make them sum to one
long_total_weight = sum(longs.values())
short_total_weight = sum(shorts.values())
longs = {t: w / long_total_weight for t, w in longs.items()}
shorts = {t: w / short_total_weight for t, w in shorts.items()}
# Construct long-only discrete allocations for each
short_val = self.total_portfolio_value * self.short_ratio
if verbose:
print("\nAllocating long sub-portfolio...")
da1 = DiscreteAllocation(
longs,
self.latest_prices[longs.keys()],
total_portfolio_value=self.total_portfolio_value,
)
long_alloc, long_leftover = da1.greedy_portfolio()
if verbose:
print("\nAllocating short sub-portfolio...")
da2 = DiscreteAllocation(
shorts,
self.latest_prices[shorts.keys()],
total_portfolio_value=short_val,
)
short_alloc, short_leftover = da2.greedy_portfolio()
short_alloc = {t: -w for t, w in short_alloc.items()}
# Combine and return
self.allocation = long_alloc.copy()
self.allocation.update(short_alloc)
self.allocation = self._remove_zero_positions(self.allocation)
return self.allocation, long_leftover + short_leftover
# Otherwise, portfolio is long only and we proceed with greedy algo
available_funds = self.total_portfolio_value
shares_bought = []
buy_prices = []
# First round
for ticker, weight in self.weights:
price = self.latest_prices[ticker]
# Attempt to buy the lower integer number of shares
n_shares = int(weight * self.total_portfolio_value / price)
cost = n_shares * price
if cost > available_funds:
# Buy as many as possible
n_shares = available_funds // price
if n_shares == 0:
print("Insufficient funds")
available_funds -= cost
shares_bought.append(n_shares)
buy_prices.append(price)
# Second round
while available_funds > 0:
# Calculate the equivalent continuous weights of the shares that
# have already been bought
current_weights = np.array(buy_prices) * np.array(shares_bought)
current_weights /= current_weights.sum()
ideal_weights = np.array([i[1] for i in self.weights])
deficit = ideal_weights - current_weights
# Attempt to buy the asset whose current weights deviate the most
idx = np.argmax(deficit)
ticker, weight = self.weights[idx]
price = self.latest_prices[ticker]
# If we can't afford this asset, search for the next highest deficit that we
# can purchase.
counter = 0
while price > available_funds:
deficit[idx] = 0 # we can no longer purchase the asset at idx
idx = np.argmax(deficit) # find the next most deviant asset
# If either of these conditions is met, we break out of both while loops
# hence the repeated statement below
if deficit[idx] < 0 or counter == 10:
break
ticker, weight = self.weights[idx]
price = self.latest_prices[ticker]
counter += 1
if deficit[idx] <= 0 or counter == 10:
# Dirty solution to break out of both loops
break
# Buy one share at a time
shares_bought[idx] += 1
available_funds -= price
self.allocation = self._remove_zero_positions(
collections.OrderedDict(zip([i[0] for i in self.weights], shares_bought))
)
if verbose:
print("Funds remaining: {:.2f}".format(available_funds))
self._allocation_rmse_error(verbose)
return self.allocation, available_funds
def lp_portfolio(self, verbose=False):
"""
Convert continuous weights into a discrete portfolio allocation
using integer programming.
:param verbose: print error analysis?
:type verbose: bool
:return: the number of shares of each ticker that should be purchased, along with the amount
of funds leftover.
:rtype: (dict, float)
"""
if any([w < 0 for _, w in self.weights]):
longs = {t: w for t, w in self.weights if w >= 0}
shorts = {t: -w for t, w in self.weights if w < 0}
# Make them sum to one
long_total_weight = sum(longs.values())
short_total_weight = sum(shorts.values())
longs = {t: w / long_total_weight for t, w in longs.items()}
shorts = {t: w / short_total_weight for t, w in shorts.items()}
# Construct long-only discrete allocations for each
short_val = self.total_portfolio_value * self.short_ratio
if verbose:
print("\nAllocating long sub-portfolio:")
da1 = DiscreteAllocation(
longs,
self.latest_prices[longs.keys()],
total_portfolio_value=self.total_portfolio_value,
)
long_alloc, long_leftover = da1.lp_portfolio()
if verbose:
print("\nAllocating short sub-portfolio:")
da2 = DiscreteAllocation(
shorts,
self.latest_prices[shorts.keys()],
total_portfolio_value=short_val,
)
short_alloc, short_leftover = da2.lp_portfolio()
short_alloc = {t: -w for t, w in short_alloc.items()}
# Combine and return
self.allocation = long_alloc.copy()
self.allocation.update(short_alloc)
self.allocation = self._remove_zero_positions(self.allocation)
return self.allocation, long_leftover + short_leftover
p = self.latest_prices.values
n = len(p)
w = np.fromiter([i[1] for i in self.weights], dtype=float)
# Integer allocation
x = cp.Variable(n, integer=True)
# Remaining dollars
r = self.total_portfolio_value - p.T @ x
# Set up linear program
eta = w * self.total_portfolio_value - cp.multiply(x, p)
u = cp.Variable(n)
constraints = [eta <= u, eta >= -u, x >= 0, r >= 0]
objective = cp.sum(u) + r
opt = cp.Problem(cp.Minimize(objective), constraints)
opt.solve(solver="GLPK_MI")
if opt.status not in {"optimal", "optimal_inaccurate"}:
raise exceptions.OptimizationError("Please try greedy_portfolio")
vals = np.rint(x.value).astype(int)
self.allocation = self._remove_zero_positions(
collections.OrderedDict(zip([i[0] for i in self.weights], vals))
)
if verbose:
print("Funds remaining: {:.2f}".format(r.value))
self._allocation_rmse_error()
return self.allocation, r.value
"""
The ``efficient_frontier`` module houses the EfficientFrontier class, which
generates optimal portfolios for various possible objective functions and parameters.
"""
import warnings
import numpy as np
import pandas as pd
import cvxpy as cp
from . import objective_functions, base_optimizer
class EfficientFrontier(base_optimizer.BaseConvexOptimizer):
"""
An EfficientFrontier object (inheriting from BaseConvexOptimizer) contains multiple
optimisation methods that can be called (corresponding to different objective
functions) with various parameters. Note: a new EfficientFrontier object should
be instantiated if you want to make any change to objectives/constraints/bounds/parameters.
Instance variables:
- Inputs:
- ``n_assets`` - int
- ``tickers`` - str list
- ``bounds`` - float tuple OR (float tuple) list
- ``cov_matrix`` - np.ndarray
- ``expected_returns`` - np.ndarray
- ``solver`` - str
- Output: ``weights`` - np.ndarray
Public methods:
- ``max_sharpe()`` optimises for maximal Sharpe ratio (a.k.a the tangency portfolio)
- ``min_volatility()`` optimises for minimum volatility
- ``max_quadratic_utility()`` maximises the quadratic utility, given some risk aversion.
- ``efficient_risk()`` maximises Sharpe for a given target risk
- ``efficient_return()`` minimises risk for a given target return
- ``add_objective()`` adds a (convex) objective to the optimisation problem
- ``add_constraint()`` adds a (linear) constraint to the optimisation problem
- ``convex_objective()`` solves for a generic convex objective with linear constraints
- ``nonconvex_objective()`` solves for a generic nonconvex objective using the scipy backend.
This is prone to getting stuck in local minima and is generally *not* recommended.
- ``portfolio_performance()`` calculates the expected return, volatility and Sharpe ratio for
the optimised portfolio.
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(
self,
expected_returns,
cov_matrix,
weight_bounds=(0, 1),
gamma=0,
expected_drawdown=None,
solver=None,
verbose=False,
):
"""
:param expected_returns: expected returns for each asset. Can be None if
optimising for volatility only (but not recommended).
:type expected_returns: pd.Series, list, np.ndarray
:param cov_matrix: covariance of returns for each asset. This **must** be
positive semidefinite, otherwise optimisation will fail.
:type cov_matrix: pd.DataFrame or np.array
:param weight_bounds: minimum and maximum weight of each asset OR single min/max pair
if all identical, defaults to (0, 1). Must be changed to (-1, 1)
for portfolios with shorting.
:type weight_bounds: tuple OR tuple list, optional
:param gamma: L2 regularisation parameter, defaults to 0. Increase if you want more
non-negligible weights
:type gamma: float, optional
:param solver: name of solver. list available solvers with: `cvxpy.installed_solvers()`
:type solver: str
:param verbose: whether performance and debugging info should be printed, defaults to False
:type verbose: bool, optional
:raises TypeError: if ``expected_returns`` is not a series, list or array
:raises TypeError: if ``cov_matrix`` is not a dataframe or array
"""
# Inputs
self.cov_matrix = EfficientFrontier._validate_cov_matrix(cov_matrix)
self.expected_returns = EfficientFrontier._validate_expected_returns(
expected_returns)
if expected_drawdown is not None:
self.expected_drawdown = EfficientFrontier._validate_expected_drawdown(
expected_drawdown)
# Labels
if isinstance(expected_returns, pd.Series):
tickers = list(expected_returns.index)
elif isinstance(cov_matrix, pd.DataFrame):
tickers = list(cov_matrix.columns)
else: # use integer labels
tickers = list(range(len(expected_returns)))
if expected_returns is not None:
if cov_matrix.shape != (len(expected_returns), len(expected_returns)):
raise ValueError("Covariance matrix does not match expected returns")
super().__init__(
len(tickers), tickers, weight_bounds, solver=solver, verbose=verbose
)
@staticmethod
def _validate_expected_returns(expected_returns):
if expected_returns is None:
warnings.warn(
"No expected returns provided. You may only use ef.min_volatility()"
)
return None
elif isinstance(expected_returns, pd.Series):
return expected_returns.values
elif isinstance(expected_returns, list):
return np.array(expected_returns)
elif isinstance(expected_returns, np.ndarray):
return expected_returns.ravel()
else:
raise TypeError("expected_returns is not a series, list or array")
@staticmethod
def _validate_expected_drawdown(expected_drawdown):
if expected_drawdown is None:
warnings.warn(
"No expected drawdown provided. You may only use ef.min_volatility()"
)
return None
elif isinstance(expected_drawdown, pd.Series):
return expected_drawdown.values
elif isinstance(expected_drawdown, list):
return np.array(expected_drawdown)
elif isinstance(expected_drawdown, np.ndarray):
return expected_drawdown.ravel()
else:
raise TypeError("expected_returns is not a series, list or array")
@staticmethod
def _validate_cov_matrix(cov_matrix):
if cov_matrix is None:
raise ValueError("cov_matrix must be provided")
elif isinstance(cov_matrix, pd.DataFrame):
return cov_matrix.values
elif isinstance(cov_matrix, np.ndarray):
return cov_matrix
else:
raise TypeError("cov_matrix is not a series, list or array")
def _market_neutral_bounds_check(self):
"""
Helper method to make sure bounds are suitable for a market neutral
optimisation.
"""
portfolio_possible = np.any(self._lower_bounds < 0)
if not portfolio_possible:
warnings.warn(
"Market neutrality requires shorting - bounds have been amended",
RuntimeWarning,
)
self._map_bounds_to_constraints((-1, 1))
# Delete original constraints
del self._constraints[0]
del self._constraints[0]
def min_volatility(self):
"""
Minimise volatility.
:return: asset weights for the volatility-minimising portfolio
:rtype: OrderedDict
"""
self._objective = objective_functions.portfolio_variance(
self._w, self.cov_matrix
)
for obj in self._additional_objectives:
self._objective += obj
self._constraints.append(cp.sum(self._w) == 1)
return self._solve_cvxpy_opt_problem()
def max_sharpe(self, risk_free_rate=0.02):
"""
Maximise the Sharpe Ratio. The result is also referred to as the tangency portfolio,
as it is the portfolio for which the capital market line is tangent to the efficient frontier.
This is a convex optimisation problem after making a certain variable substitution. See
`Cornuejols and Tutuncu (2006) <http://web.math.ku.dk/~rolf/CT_FinOpt.pdf>`_ for more.
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
The period of the risk-free rate should correspond to the
frequency of expected returns.
:type risk_free_rate: float, optional
:raises ValueError: if ``risk_free_rate`` is non-numeric
:return: asset weights for the Sharpe-maximising portfolio
:rtype: OrderedDict
"""
if not isinstance(risk_free_rate, (int, float)):
raise ValueError("risk_free_rate should be numeric")
# max_sharpe requires us to make a variable transformation.
# Here we treat w as the transformed variable.
self._objective = cp.quad_form(self._w, self.cov_matrix)
k = cp.Variable()
# Note: objectives are not scaled by k. Hence there are subtle differences
# between how these objectives work for max_sharpe vs min_volatility
if len(self._additional_objectives) > 0:
warnings.warn(
"max_sharpe transforms the optimisation problem so additional objectives may not work as expected."
)
for obj in self._additional_objectives:
self._objective += obj
new_constraints = []
# Must rebuild the constraints
for constr in self._constraints:
if isinstance(constr, cp.constraints.nonpos.Inequality):
# Either the first or second item is the expression
if isinstance(
constr.args[0], cp.expressions.constants.constant.Constant
):
new_constraints.append(constr.args[1] >= constr.args[0] * k)
else:
new_constraints.append(constr.args[0] <= constr.args[1] * k)
elif isinstance(constr, cp.constraints.zero.Equality):
new_constraints.append(constr.args[0] == constr.args[1] * k)
else:
raise TypeError(
"Please check that your constraints are in a suitable format"
)
# Transformed max_sharpe convex problem:
self._constraints = [
(self.expected_returns - risk_free_rate).T @ self._w == 1,
cp.sum(self._w) == k,
k >= 0,
] + new_constraints
self._solve_cvxpy_opt_problem()
# Inverse-transform
self.weights = (self._w.value / k.value).round(16) + 0.0
return self._make_output_weights()
def max_quadratic_utility(self, risk_aversion=1, market_neutral=False):
r"""
Maximise the given quadratic utility, i.e:
.. math::
\max_w w^T \mu - \frac \delta 2 w^T \Sigma w
:param risk_aversion: risk aversion parameter (must be greater than 0),
defaults to 1
:type risk_aversion: positive float
:param market_neutral: whether the portfolio should be market neutral (weights sum to zero),
defaults to False. Requires negative lower weight bound.
:param market_neutral: bool, optional
:return: asset weights for the maximum-utility portfolio
:rtype: OrderedDict
"""
if risk_aversion <= 0:
raise ValueError("risk aversion coefficient must be greater than zero")
self._objective = objective_functions.quadratic_utility(
self._w, self.expected_returns, self.cov_matrix, risk_aversion=risk_aversion
)
for obj in self._additional_objectives:
self._objective += obj
if market_neutral:
self._market_neutral_bounds_check()
self._constraints.append(cp.sum(self._w) == 0)
else:
self._constraints.append(cp.sum(self._w) == 1)
return self._solve_cvxpy_opt_problem()
def efficient_risk(self, target_volatility, market_neutral=False):
"""
Maximise return for a target risk. The resulting portfolio will have a volatility
less than the target (but not guaranteed to be equal).
:param target_volatility: the desired maximum volatility of the resulting portfolio.
:type target_volatility: float
:param market_neutral: whether the portfolio should be market neutral (weights sum to zero),
defaults to False. Requires negative lower weight bound.
:param market_neutral: bool, optional
:raises ValueError: if ``target_volatility`` is not a positive float
:raises ValueError: if no portfolio can be found with volatility equal to ``target_volatility``
:raises ValueError: if ``risk_free_rate`` is non-numeric
:return: asset weights for the efficient risk portfolio
:rtype: OrderedDict
"""
if not isinstance(target_volatility, (float, int)) or target_volatility < 0:
raise ValueError("target_volatility should be a positive float")
global_min_volatility = np.sqrt(1 / np.sum(np.linalg.inv(self.cov_matrix)))
if target_volatility < global_min_volatility:
raise ValueError(
"The minimum volatility is {:.3f}. Please use a higher target_volatility".format(
global_min_volatility
)
)
self._objective = objective_functions.portfolio_return(
self._w, self.expected_returns
)
variance = objective_functions.portfolio_variance(self._w, self.cov_matrix)
for obj in self._additional_objectives:
self._objective += obj
self._constraints.append(variance <= target_volatility ** 2)
# The equality constraint is either "weights sum to 1" (default), or
# "weights sum to 0" (market neutral).
if market_neutral:
self._market_neutral_bounds_check()
self._constraints.append(cp.sum(self._w) == 0)
else:
self._constraints.append(cp.sum(self._w) == 1)
return self._solve_cvxpy_opt_problem()
def efficient_drawdown(self, drawdown_limit, market_neutral=False):
"""
Maximise return for a target risk. The resulting portfolio will have a volatility
less than the target (but not guaranteed to be equal).
:param target_volatility: the desired maximum volatility of the resulting portfolio.
:type target_volatility: float
:param market_neutral: whether the portfolio should be market neutral (weights sum to zero),
defaults to False. Requires negative lower weight bound.
:param market_neutral: bool, optional
:raises ValueError: if ``target_volatility`` is not a positive float
:raises ValueError: if no portfolio can be found with volatility equal to ``target_volatility``
:raises ValueError: if ``risk_free_rate`` is non-numeric
:return: asset weights for the efficient risk portfolio
:rtype: OrderedDict
"""
if not isinstance(drawdown_limit, (float, int)) or drawdown_limit < 0:
raise ValueError("target_volatility should be a positive float")
global_min_drawdown = self.expected_drawdown.min()
if drawdown_limit < global_min_drawdown:
raise ValueError(
"The minimum drawdown is {:.3f}. Please use a higher drawdown".format(
global_min_drawdown
)
)
self._objective = objective_functions.portfolio_return(
self._w, self.expected_returns)
# self._objective = objective_functions.portfolio_variance(
# self._w, self.cov_matrix)
drawdown = self.expected_drawdown.T @ self._w
for obj in self._additional_objectives:
self._objective += obj
self._constraints.append(drawdown <= drawdown_limit)
# The equality constraint is either "weights sum to 1" (default), or
# "weights sum to 0" (market neutral).
if market_neutral:
self._market_neutral_bounds_check()
self._constraints.append(cp.sum(self._w) == 0)
else:
self._constraints.append(cp.sum(self._w) == 1)
return self._solve_cvxpy_opt_problem()
def efficient_return(self, target_return, market_neutral=False):
"""
Calculate the 'Markowitz portfolio', minimising volatility for a given target return.
:param target_return: the desired return of the resulting portfolio.
:type target_return: float
:param market_neutral: whether the portfolio should be market neutral (weights sum to zero),
defaults to False. Requires negative lower weight bound.
:type market_neutral: bool, optional
:raises ValueError: if ``target_return`` is not a positive float
:raises ValueError: if no portfolio can be found with return equal to ``target_return``
:return: asset weights for the Markowitz portfolio
:rtype: OrderedDict
"""
if not isinstance(target_return, float) or target_return < 0:
raise ValueError("target_return should be a positive float")
if target_return > self.expected_returns.max():
raise ValueError(
"target_return must be lower than the largest expected return"
)
self._objective = objective_functions.portfolio_variance(
self._w, self.cov_matrix
)
ret = objective_functions.portfolio_return(
self._w, self.expected_returns, negative=False
)
self.objective = cp.quad_form(self._w, self.cov_matrix)
ret = self.expected_returns.T @ self._w
for obj in self._additional_objectives:
self._objective += obj
self._constraints.append(ret >= target_return)
# The equality constraint is either "weights sum to 1" (default), or
# "weights sum to 0" (market neutral).
if market_neutral:
self._market_neutral_bounds_check()
self._constraints.append(cp.sum(self._w) == 0)
else:
self._constraints.append(cp.sum(self._w) == 1)
return self._solve_cvxpy_opt_problem()
def portfolio_performance(self, verbose=False, risk_free_rate=0.02):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio.
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
The period of the risk-free rate should correspond to the
frequency of expected returns.
:type risk_free_rate: float, optional
:raises ValueError: if weights have not been calcualted yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
return base_optimizer.portfolio_performance(
self.weights,
self.expected_returns,
self.cov_matrix,
verbose,
risk_free_rate,
)
"""
The ``exceptions`` module houses custom exceptions. Currently implemented:
- OptimizationError
"""
class OptimizationError(Exception):
"""
When an optimization routine fails – usually, this means
that cvxpy has not returned the "optimal" flag.
"""
def __init__(self, *args, **kwargs):
default_message = (
"Please check your objectives/constraints or use a different solver."
)
super().__init__(default_message, *args, **kwargs)
"""
The ``expected_returns`` module provides functions for estimating the expected returns of
the assets, which is a required input in mean-variance optimisation.
By convention, the output of these methods is expected *annual* returns. It is assumed that
*daily* prices are provided, though in reality the functions are agnostic
to the time period (just change the ``frequency`` parameter). Asset prices must be given as
a pandas dataframe, as per the format described in the :ref:`user-guide`.
All of the functions process the price data into percentage returns data, before
calculating their respective estimates of expected returns.
Currently implemented:
- general return model function, allowing you to run any return model from one function.
- mean historical return
- exponentially weighted mean historical return
- CAPM estimate of returns
Additionally, we provide utility functions to convert from returns to prices and vice-versa.
"""
import warnings
import pandas as pd
import numpy as np
def returns_from_prices(prices, log_returns=False):
"""
Calculate the returns given prices.
:param prices: adjusted (daily) closing prices of the asset, each row is a
date and each column is a ticker/id.
:type prices: pd.DataFrame
:param log_returns: whether to compute using log returns
:type log_returns: bool, defaults to False
:return: (daily) returns
:rtype: pd.DataFrame
"""
if log_returns:
return np.log(1 + prices.pct_change()).dropna(how="all")
else:
return prices.pct_change().dropna(how="all")
def drawdown_from_prices(prices):
drawdown = pd.Series()
for column in prices.columns:
prices_series = prices[column].dropna(how="all")
max_drawdown = ((np.maximum.accumulate(prices_series) - prices_series) /
np.maximum.accumulate(prices_series)).max()
drawdown[column] = max_drawdown
return drawdown
def log_returns_from_prices(prices):
"""
Calculate the log returns given prices.
:param prices: adjusted (daily) closing prices of the asset, each row is a
date and each column is a ticker/id.
:type prices: pd.DataFrame
:return: (daily) returns
:rtype: pd.DataFrame
"""
warnings.warn(
"log_returns_from_prices is deprecated. Please use returns_from_prices(prices, log_returns=True)"
)
return np.log(1 + prices.pct_change()).dropna(how="all")
def prices_from_returns(returns, log_returns=False):
"""
Calculate the pseudo-prices given returns. These are not true prices because
the initial prices are all set to 1, but it behaves as intended when passed
to any PyPortfolioOpt method.
:param returns: (daily) percentage returns of the assets
:type returns: pd.DataFrame
:param log_returns: whether to compute using log returns
:type log_returns: bool, defaults to False
:return: (daily) pseudo-prices.
:rtype: pd.DataFrame
"""
if log_returns:
returns = np.exp(returns)
ret = 1 + returns
ret.iloc[0] = 1 # set first day pseudo-price
return ret.cumprod()
def return_model(prices, method="mean_historical_return", **kwargs):
"""
Compute an estimate of future returns, using the return model specified in ``method``.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param method: the return model to use. Should be one of:
- ``mean_historical_return``
- ``ema_historical_return``
- ``capm_return``
:type method: str, optional
:raises NotImplementedError: if the supplied method is not recognised
:return: annualised sample covariance matrix
:rtype: pd.DataFrame
"""
if method == "mean_historical_return":
return mean_historical_return(prices, **kwargs)
elif method == "ema_historical_return":
return ema_historical_return(prices, **kwargs)
elif method == "capm_return":
return capm_return(prices, **kwargs)
else:
raise NotImplementedError("Return model {} not implemented".format(method))
def mean_historical_return(prices, returns_data=False, compounding=True, frequency=252):
"""
Calculate annualised mean (daily) historical return from input (daily) asset prices.
By default, this uses the arithmetic mean (correct if log_returns are used).
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param compounding: whether to properly compound the returns, optional.
:type compounding: bool, defaults to True
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:return: annualised mean (daily) return for each asset
:rtype: pd.Series
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
if compounding:
return (1 + returns).prod() ** (frequency / returns.count()) - 1
else:
return returns.mean() * frequency
def cal_max_drawdown(prices, returns_data=False):
"""
Calculate annualised mean (daily) historical return from input (daily) asset prices.
By default, this uses the arithmetic mean (correct if log_returns are used).
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:return: max drawdown for each asset
:rtype: pd.Series
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
return prices
else:
return drawdown_from_prices(prices)
def ema_historical_return(
prices, returns_data=False, compounding=True, span=500, frequency=252
):
"""
Calculate the exponentially-weighted mean of (daily) historical returns, giving
higher weight to more recent data.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param compounding: whether to properly compound the returns, optional.
:type compounding: bool, defaults to True
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:param span: the time-span for the EMA, defaults to 500-day EMA.
:type span: int, optional
:return: annualised exponentially-weighted mean (daily) return of each asset
:rtype: pd.Series
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
if compounding:
return (1 + returns.ewm(span=span).mean().iloc[-1]) ** frequency - 1
else:
return returns.ewm(span=span).mean().iloc[-1] * frequency
def james_stein_shrinkage(prices, returns_data=False, compounding=True, frequency=252):
raise NotImplementedError(
"Deprecated because its implementation here was misguided."
)
def capm_return(
prices,
market_prices=None,
returns_data=False,
risk_free_rate=0.02,
compounding=True,
frequency=252,
):
"""
Compute a return estimate using the Capital Asset Pricing Model. Under the CAPM,
asset returns are equal to market returns plus a :math:`\beta` term encoding
the relative risk of the asset.
.. math::
R_i = R_f + \\beta_i (E(R_m) - R_f)
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param market_prices: adjusted closing prices of the benchmark, defaults to None
:type market_prices: pd.DataFrame, optional
:param returns_data: if true, the first arguments are returns instead of prices.
:type returns_data: bool, defaults to False.
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
You should use the appropriate time period, corresponding
to the frequency parameter.
:type risk_free_rate: float, optional
:param compounding: whether to properly compound the returns, optional.
:type compounding: bool, defaults to True
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:return: annualised return estimate
:rtype: pd.Series
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("prices are not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
market_returns = market_prices
else:
returns = returns_from_prices(prices)
if market_prices is not None:
market_returns = returns_from_prices(market_prices)
else:
market_returns = None
# Use the equally-weighted dataset as a proxy for the market
if market_returns is None:
# Append market return to right and compute sample covariance matrix
returns["mkt"] = returns.mean(axis=1)
else:
market_returns.columns = ["mkt"]
returns = returns.join(market_returns, how="left")
# Compute covariance matrix for the new dataframe (including markets)
cov = returns.cov()
# The far-right column of the cov matrix is covariances to market
betas = cov["mkt"] / cov.loc["mkt", "mkt"]
betas = betas.drop("mkt")
# Find mean market return on a given time period
if compounding:
mkt_mean_ret = (1 + returns["mkt"]).prod() ** (
frequency / returns["mkt"].count()
) - 1
else:
mkt_mean_ret = returns["mkt"].mean() * frequency
# CAPM formula
return risk_free_rate + betas * (mkt_mean_ret - risk_free_rate)
"""
The ``hierarchical_portfolio`` module seeks to implement one of the recent advances in
portfolio optimisation – the application of hierarchical clustering models in allocation.
All of the hierarchical classes have a similar API to ``EfficientFrontier``, though since
many hierarchical models currently don't support different objectives, the actual allocation
happens with a call to `optimize()`.
Currently implemented:
- ``HRPOpt`` implements the Hierarchical Risk Parity (HRP) portfolio. Code reproduced with
permission from Marcos Lopez de Prado (2016).
"""
import collections
import numpy as np
import pandas as pd
import scipy.cluster.hierarchy as sch
import scipy.spatial.distance as ssd
from . import base_optimizer, risk_models
class HRPOpt(base_optimizer.BaseOptimizer):
"""
A HRPOpt object (inheriting from BaseOptimizer) constructs a hierarchical
risk parity portfolio.
Instance variables:
- Inputs
- ``n_assets`` - int
- ``tickers`` - str list
- ``returns`` - pd.Series
- Output:
- ``weights`` - np.ndarray
- ``clusters`` - linkage matrix corresponding to clustered assets.
Public methods:
- ``optimize()`` calculates weights using HRP
- ``portfolio_performance()`` calculates the expected return, volatility and Sharpe ratio for
the optimised portfolio.
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(self, returns=None, cov_matrix=None):
"""
:param returns: asset historical returns
:type returns: pd.DataFrame
:param cov_matrix: covariance of asset returns
:type cov_matrix: pd.DataFrame.
:raises TypeError: if ``returns`` is not a dataframe
"""
if returns is None and cov_matrix is None:
raise ValueError("Either returns or cov_matrix must be provided")
if returns is not None and not isinstance(returns, pd.DataFrame):
raise TypeError("returns are not a dataframe")
self.returns = returns
self.cov_matrix = cov_matrix
self.clusters = None
if returns is None:
tickers = list(cov_matrix.columns)
else:
tickers = list(returns.columns)
super().__init__(len(tickers), tickers)
@staticmethod
def _get_cluster_var(cov, cluster_items):
"""
Compute the variance per cluster
:param cov: covariance matrix
:type cov: np.ndarray
:param cluster_items: tickers in the cluster
:type cluster_items: list
:return: the variance per cluster
:rtype: float
"""
# Compute variance per cluster
cov_slice = cov.loc[cluster_items, cluster_items]
weights = 1 / np.diag(cov_slice) # Inverse variance weights
weights /= weights.sum()
return np.linalg.multi_dot((weights, cov_slice, weights))
@staticmethod
def _get_quasi_diag(link):
"""
Sort clustered items by distance
:param link: linkage matrix after clustering
:type link: np.ndarray
:return: sorted list of indices
:rtype: list
"""
return sch.to_tree(link, rd=False).pre_order()
@staticmethod
def _raw_hrp_allocation(cov, ordered_tickers):
"""
Given the clusters, compute the portfolio that minimises risk by
recursively traversing the hierarchical tree from the top.
:param cov: covariance matrix
:type cov: np.ndarray
:param ordered_tickers: list of tickers ordered by distance
:type ordered_tickers: str list
:return: raw portfolio weights
:rtype: pd.Series
"""
w = pd.Series(1, index=ordered_tickers)
cluster_items = [ordered_tickers] # initialize all items in one cluster
while len(cluster_items) > 0:
cluster_items = [
i[j:k]
for i in cluster_items
for j, k in ((0, len(i) // 2), (len(i) // 2, len(i)))
if len(i) > 1
] # bi-section
# For each pair, optimise locally.
for i in range(0, len(cluster_items), 2):
first_cluster = cluster_items[i]
second_cluster = cluster_items[i + 1]
# Form the inverse variance portfolio for this pair
first_variance = HRPOpt._get_cluster_var(cov, first_cluster)
second_variance = HRPOpt._get_cluster_var(cov, second_cluster)
alpha = 1 - first_variance / (first_variance + second_variance)
w[first_cluster] *= alpha # weight 1
w[second_cluster] *= 1 - alpha # weight 2
return w
def optimize(self, linkage_method="single"):
"""
Construct a hierarchical risk parity portfolio, using Scipy hierarchical clustering
(see `here <https://docs.scipy.org/doc/scipy/reference/generated/scipy.cluster.hierarchy.linkage.html>`_)
:param linkage_method: which scipy linkage method to use
:type linkage_method: str
:return: weights for the HRP portfolio
:rtype: OrderedDict
"""
if linkage_method not in sch._LINKAGE_METHODS:
raise ValueError("linkage_method must be one recognised by scipy")
if self.returns is None:
cov = self.cov_matrix
corr = risk_models.cov_to_corr(self.cov_matrix).round(6)
else:
corr, cov = self.returns.corr(), self.returns.cov()
# Compute distance matrix, with ClusterWarning fix as
# per https://stackoverflow.com/questions/18952587/
# this can avoid some nasty floating point issues
matrix = np.sqrt(np.clip((1.0 - corr) / 2.0, a_min=0.0, a_max=1.0))
dist = ssd.squareform(matrix, checks=False)
self.clusters = sch.linkage(dist, linkage_method)
sort_ix = HRPOpt._get_quasi_diag(self.clusters)
ordered_tickers = corr.index[sort_ix].tolist()
hrp = HRPOpt._raw_hrp_allocation(cov, ordered_tickers)
weights = collections.OrderedDict(hrp.sort_index())
self.set_weights(weights)
return weights
def portfolio_performance(self, verbose=False, risk_free_rate=0.02, frequency=252):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio
assuming returns are daily
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
The period of the risk-free rate should correspond to the
frequency of expected returns.
:type risk_free_rate: float, optional
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:raises ValueError: if weights have not been calculated yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
if self.returns is None:
cov = self.cov_matrix
mu = None
else:
cov = self.returns.cov() * frequency
mu = self.returns.mean() * frequency
return base_optimizer.portfolio_performance(
self.weights, mu, cov, verbose, risk_free_rate
)
"""
The ``objective_functions`` module provides optimisation objectives, including the actual
objective functions called by the ``EfficientFrontier`` object's optimisation methods.
These methods are primarily designed for internal use during optimisation and each requires
a different signature (which is why they have not been factored into a class).
For obvious reasons, any objective function must accept ``weights``
as an argument, and must also have at least one of ``expected_returns`` or ``cov_matrix``.
The objective functions either compute the objective given a numpy array of weights, or they
return a cvxpy *expression* when weights are a ``cp.Variable``. In this way, the same objective
function can be used both internally for optimisation and externally for computing the objective
given weights. ``_objective_value()`` automatically chooses between the two behaviours.
``objective_functions`` defaults to objectives for minimisation. In the cases of objectives
that clearly should be maximised (e.g Sharpe Ratio, portfolio return), the objective function
actually returns the negative quantity, since minimising the negative is equivalent to maximising
the positive. This behaviour is controlled by the ``negative=True`` optional argument.
Currently implemented:
- Portfolio variance (i.e square of volatility)
- Portfolio return
- Sharpe ratio
- L2 regularisation (minimising this reduces nonzero weights)
- Quadratic utility
- Transaction cost model (a simple one)
"""
import numpy as np
import cvxpy as cp
def _objective_value(w, obj):
"""
Helper method to return either the value of the objective function
or the objective function as a cvxpy object depending on whether
w is a cvxpy variable or np array.
:param w: weights
:type w: np.ndarray OR cp.Variable
:param obj: objective function expression
:type obj: cp.Expression
:return: value of the objective function OR objective function expression
:rtype: float OR cp.Expression
"""
if isinstance(w, np.ndarray):
if np.isscalar(obj):
return obj
elif np.isscalar(obj.value):
return obj.value
else:
return obj.value.item()
else:
return obj
def portfolio_variance(w, cov_matrix):
"""
Calculate the total portfolio variance (i.e square volatility).
:param w: asset weights in the portfolio
:type w: np.ndarray OR cp.Variable
:param cov_matrix: covariance matrix
:type cov_matrix: np.ndarray
:return: value of the objective function OR objective function expression
:rtype: float OR cp.Expression
"""
variance = cp.quad_form(w, cov_matrix)
return _objective_value(w, variance)
def portfolio_return(w, expected_returns, negative=True):
"""
Calculate the (negative) mean return of a portfolio
:param w: asset weights in the portfolio
:type w: np.ndarray OR cp.Variable
:param expected_returns: expected return of each asset
:type expected_returns: np.ndarray
:param negative: whether quantity should be made negative (so we can minimise)
:type negative: boolean
:return: negative mean return
:rtype: float
"""
sign = -1 if negative else 1
mu = w @ expected_returns
return _objective_value(w, sign * mu)
def sharpe_ratio(w, expected_returns, cov_matrix, risk_free_rate=0.02, negative=True):
"""
Calculate the (negative) Sharpe ratio of a portfolio
:param w: asset weights in the portfolio
:type w: np.ndarray OR cp.Variable
:param expected_returns: expected return of each asset
:type expected_returns: np.ndarray
:param cov_matrix: covariance matrix
:type cov_matrix: np.ndarray
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02.
The period of the risk-free rate should correspond to the
frequency of expected returns.
:type risk_free_rate: float, optional
:param negative: whether quantity should be made negative (so we can minimise)
:type negative: boolean
:return: (negative) Sharpe ratio
:rtype: float
"""
mu = w @ expected_returns
sigma = cp.sqrt(cp.quad_form(w, cov_matrix))
sign = -1 if negative else 1
sharpe = (mu - risk_free_rate) / sigma
return _objective_value(w, sign * sharpe)
def L2_reg(w, gamma=1):
r"""
L2 regularisation, i.e :math:`\gamma ||w||^2`, to increase the number of nonzero weights.
Example::
ef = EfficientFrontier(mu, S)
ef.add_objective(objective_functions.L2_reg, gamma=2)
ef.min_volatility()
:param w: asset weights in the portfolio
:type w: np.ndarray OR cp.Variable
:param gamma: L2 regularisation parameter, defaults to 1. Increase if you want more
non-negligible weights
:type gamma: float, optional
:return: value of the objective function OR objective function expression
:rtype: float OR cp.Expression
"""
L2_reg = gamma * cp.sum_squares(w)
return _objective_value(w, L2_reg)
def quadratic_utility(w, expected_returns, cov_matrix, risk_aversion, negative=True):
r"""
Quadratic utility function, i.e :math:`\mu - \frac 1 2 \delta w^T \Sigma w`.
:param w: asset weights in the portfolio
:type w: np.ndarray OR cp.Variable
:param expected_returns: expected return of each asset
:type expected_returns: np.ndarray
:param cov_matrix: covariance matrix
:type cov_matrix: np.ndarray
:param risk_aversion: risk aversion coefficient. Increase to reduce risk.
:type risk_aversion: float
:param negative: whether quantity should be made negative (so we can minimise).
:type negative: boolean
:return: value of the objective function OR objective function expression
:rtype: float OR cp.Expression
"""
sign = -1 if negative else 1
mu = w @ expected_returns
variance = cp.quad_form(w, cov_matrix)
utility = mu - 0.5 * risk_aversion * variance
return _objective_value(w, sign * utility)
def transaction_cost(w, w_prev, k=0.001):
"""
A very simple transaction cost model: sum all the weight changes
and multiply by a given fraction (default to 10bps). This simulates
a fixed percentage commission from your broker.
:param w: asset weights in the portfolio
:type w: np.ndarray OR cp.Variable
:param w_prev: previous weights
:type w_prev: np.ndarray
:param k: fractional cost per unit weight exchanged
:type k: float
:return: value of the objective function OR objective function expression
:rtype: float OR cp.Expression
"""
return _objective_value(w, k * cp.norm(w - w_prev, 1))
"""
The ``plotting`` module houses all the functions to generate various plots.
Currently implemented:
- ``plot_covariance`` - plot a correlation matrix
- ``plot_dendrogram`` - plot the hierarchical clusters in a portfolio
- ``plot_efficient_frontier`` – plot the efficient frontier, using the CLA algorithm.
- ``plot_weights`` - bar chart of weights
"""
import numpy as np
from . import risk_models
import scipy.cluster.hierarchy as sch
try:
import matplotlib.pyplot as plt
plt.style.use("seaborn-deep")
except (ModuleNotFoundError, ImportError):
raise ImportError("Please install matplotlib via pip or poetry")
def _plot_io(**kwargs):
"""
Helper method to optionally save the figure to file.
:param filename: name of the file to save to, defaults to None (doesn't save)
:type filename: str, optional
:param dpi: dpi of figure to save or plot, defaults to 300
:type dpi: int (between 50-500)
:param showfig: whether to plt.show() the figure, defaults to True
:type showfig: bool, optional
"""
filename = kwargs.get("filename", None)
showfig = kwargs.get("showfig", True)
dpi = kwargs.get("dpi", 300)
plt.tight_layout()
if filename:
plt.savefig(fname=filename, dpi=dpi)
if showfig:
plt.show()
def plot_covariance(cov_matrix, plot_correlation=False, show_tickers=True, **kwargs):
"""
Generate a basic plot of the covariance (or correlation) matrix, given a
covariance matrix.
:param cov_matrix: covariance matrix
:type cov_matrix: pd.DataFrame or np.ndarray
:param plot_correlation: whether to plot the correlation matrix instead, defaults to False.
:type plot_correlation: bool, optional
:param show_tickers: whether to use tickers as labels (not recommended for large portfolios),
defaults to True
:type show_tickers: bool, optional
:return: matplotlib axis
:rtype: matplotlib.axes object
"""
if plot_correlation:
matrix = risk_models.cov_to_corr(cov_matrix)
else:
matrix = cov_matrix
fig, ax = plt.subplots()
cax = ax.imshow(matrix)
fig.colorbar(cax)
if show_tickers:
ax.set_xticks(np.arange(0, matrix.shape[0], 1))
ax.set_xticklabels(matrix.index)
ax.set_yticks(np.arange(0, matrix.shape[0], 1))
ax.set_yticklabels(matrix.index)
plt.xticks(rotation=90)
_plot_io(**kwargs)
return ax
def plot_dendrogram(hrp, show_tickers=True, **kwargs):
"""
Plot the clusters in the form of a dendrogram.
:param hrp: HRPpt object that has already been optimized.
:type hrp: object
:param show_tickers: whether to use tickers as labels (not recommended for large portfolios),
defaults to True
:type show_tickers: bool, optional
:param filename: name of the file to save to, defaults to None (doesn't save)
:type filename: str, optional
:param showfig: whether to plt.show() the figure, defaults to True
:type showfig: bool, optional
:return: matplotlib axis
:rtype: matplotlib.axes object
"""
if hrp.clusters is None:
hrp.optimize()
fig, ax = plt.subplots()
if show_tickers:
sch.dendrogram(hrp.clusters, labels=hrp.tickers, ax=ax, orientation="top")
plt.xticks(rotation=90)
plt.tight_layout()
else:
sch.dendrogram(hrp.clusters, no_labels=True, ax=ax)
_plot_io(**kwargs)
return ax
def plot_efficient_frontier(cla, points=100, show_assets=True, **kwargs):
"""
Plot the efficient frontier based on a CLA object
:param points: number of points to plot, defaults to 100
:type points: int, optional
:param show_assets: whether we should plot the asset risks/returns also, defaults to True
:type show_assets: bool, optional
:param filename: name of the file to save to, defaults to None (doesn't save)
:type filename: str, optional
:param showfig: whether to plt.show() the figure, defaults to True
:type showfig: bool, optional
:return: matplotlib axis
:rtype: matplotlib.axes object
"""
if cla.weights is None:
cla.max_sharpe()
optimal_ret, optimal_risk, _ = cla.portfolio_performance()
if cla.frontier_values is None:
cla.efficient_frontier(points=points)
mus, sigmas, _ = cla.frontier_values
fig, ax = plt.subplots()
ax.plot(sigmas, mus, label="Efficient frontier")
if show_assets:
ax.scatter(
np.sqrt(np.diag(cla.cov_matrix)),
cla.expected_returns,
s=30,
color="k",
label="assets",
)
ax.scatter(optimal_risk, optimal_ret, marker="x", s=100, color="r", label="optimal")
ax.legend()
ax.set_xlabel("Volatility")
ax.set_ylabel("Return")
_plot_io(**kwargs)
return ax
def plot_weights(weights, **kwargs):
"""
Plot the portfolio weights as a horizontal bar chart
:param weights: the weights outputted by any PyPortfolioOpt optimiser
:type weights: {ticker: weight} dict
:return: matplotlib axis
:rtype: matplotlib.axes object
"""
desc = sorted(weights.items(), key=lambda x: x[1], reverse=True)
labels = [i[0] for i in desc]
vals = [i[1] for i in desc]
y_pos = np.arange(len(labels))
fig, ax = plt.subplots()
ax.barh(y_pos, vals)
ax.set_xlabel("Weight")
ax.set_yticks(y_pos)
ax.set_yticklabels(labels)
ax.invert_yaxis()
_plot_io(**kwargs)
return ax
"""
The ``risk_models`` module provides functions for estimating the covariance matrix given
historical returns.
The format of the data input is the same as that in :ref:`expected-returns`.
**Currently implemented:**
- fix non-positive semidefinite matrices
- general risk matrix function, allowing you to run any risk model from one function.
- sample covariance
- semicovariance
- exponentially weighted covariance
- minimum covariance determinant
- shrunk covariance matrices:
- manual shrinkage
- Ledoit Wolf shrinkage
- Oracle Approximating shrinkage
- covariance to correlation matrix
"""
import warnings
import numpy as np
import pandas as pd
from .expected_returns import returns_from_prices
def _is_positive_semidefinite(matrix):
"""
Helper function to check if a given matrix is positive semidefinite.
Any method that requires inverting the covariance matrix will struggle
with a non-positive semidefinite matrix
:param matrix: (covariance) matrix to test
:type matrix: np.ndarray, pd.DataFrame
:return: whether matrix is positive semidefinite
:rtype: bool
"""
try:
# Significantly more efficient than checking eigenvalues (stackoverflow.com/questions/16266720)
np.linalg.cholesky(matrix + 1e-16 * np.eye(len(matrix)))
return True
except np.linalg.LinAlgError:
return False
def fix_nonpositive_semidefinite(matrix, fix_method="spectral"):
"""
Check if a covariance matrix is positive semidefinite, and if not, fix it
with the chosen method.
The ``spectral`` method sets negative eigenvalues to zero then rebuilds the matrix,
while the ``diag`` method adds a small positive value to the diagonal.
:param matrix: raw covariance matrix (may not be PSD)
:type matrix: pd.DataFrame
:param fix_method: {"spectral", "diag"}, defaults to "spectral"
:type fix_method: str, optional
:raises NotImplementedError: if a method is passed that isn't implemented
:return: positive semidefinite covariance matrix
:rtype: pd.DataFrame
"""
if _is_positive_semidefinite(matrix):
return matrix
else:
warnings.warn(
"The covariance matrix is non positive semidefinite. Amending eigenvalues."
)
# Eigendecomposition
q, V = np.linalg.eigh(matrix)
if fix_method == "spectral":
# Remove negative eigenvalues
q = np.where(q > 0, q, 0)
# Reconstruct matrix
fixed_matrix = V @ np.diag(q) @ V.T
elif fix_method == "diag":
min_eig = np.min(q)
if min_eig < 0:
fixed_matrix = matrix - 1.1 * min_eig * np.eye(len(matrix))
else:
raise NotImplementedError("Method {} not implemented".format(fix_method))
if not _is_positive_semidefinite(fixed_matrix):
warnings.warn("Could not fix matrix. Please try a different risk model.")
# Rebuild labels if provided
if isinstance(matrix, pd.DataFrame):
tickers = matrix.index
return pd.DataFrame(fixed_matrix, index=tickers, columns=tickers)
else:
return fixed_matrix
def risk_matrix(prices, method="sample_cov", **kwargs):
"""
Compute a covariance matrix, using the risk model supplied in the ``method``
parameter.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param method: the risk model to use. Should be one of:
- ``sample_cov``
- ``semicovariance``
- ``exp_cov``
- ``min_cov_determinant``
- ``ledoit_wolf``
- ``ledoit_wolf_constant_variance``
- ``ledoit_wolf_single_factor``
- ``ledoit_wolf_constant_correlation``
- ``oracle_approximating``
:type method: str, optional
:raises NotImplementedError: if the supplied method is not recognised
:return: annualised sample covariance matrix
:rtype: pd.DataFrame
"""
if method == "sample_cov":
return sample_cov(prices, **kwargs)
elif method == "semicovariance":
return semicovariance(prices, **kwargs)
elif method == "exp_cov":
return exp_cov(prices, **kwargs)
elif method == "min_cov_determinant":
return min_cov_determinant(prices, **kwargs)
elif method == "ledoit_wolf" or method == "ledoit_wolf_constant_variance":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf()
elif method == "ledoit_wolf_single_factor":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="single_factor"
)
elif method == "ledoit_wolf_constant_correlation":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="constant_correlation"
)
elif method == "oracle_approximating":
return CovarianceShrinkage(prices, **kwargs).oracle_approximating()
else:
raise NotImplementedError("Risk model {} not implemented".format(method))
def sample_cov(prices, returns_data=False, frequency=252, **kwargs):
"""
Calculate the annualised sample covariance matrix of (daily) asset returns.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:return: annualised sample covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
return fix_nonpositive_semidefinite(
returns.cov() * frequency, kwargs.get("fix_method", "spectral")
)
def semicovariance(
prices, returns_data=False, benchmark=0.000079, frequency=252, **kwargs
):
"""
Estimate the semicovariance matrix, i.e the covariance given that
the returns are less than the benchmark.
.. semicov = E([min(r_i - B, 0)] . [min(r_j - B, 0)])
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param benchmark: the benchmark return, defaults to the daily risk-free rate, i.e
:math:`1.02^{(1/252)} -1`.
:type benchmark: float
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year). Ensure that you use the appropriate
benchmark, e.g if ``frequency=12`` use the monthly risk-free rate.
:type frequency: int, optional
:return: semicovariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
drops = np.fmin(returns - benchmark, 0)
return fix_nonpositive_semidefinite(
drops.cov() * frequency, kwargs.get("fix_method", "spectral")
)
def _pair_exp_cov(X, Y, span=180):
"""
Calculate the exponential covariance between two timeseries of returns.
:param X: first time series of returns
:type X: pd.Series
:param Y: second time series of returns
:type Y: pd.Series
:param span: the span of the exponential weighting function, defaults to 180
:type span: int, optional
:return: the exponential covariance between X and Y
:rtype: float
"""
covariation = (X - X.mean()) * (Y - Y.mean())
# Exponentially weight the covariation and take the mean
if span < 10:
warnings.warn("it is recommended to use a higher span, e.g 30 days")
return covariation.ewm(span=span).mean().iloc[-1]
def exp_cov(prices, returns_data=False, span=180, frequency=252, **kwargs):
"""
Estimate the exponentially-weighted covariance matrix, which gives
greater weight to more recent data.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param span: the span of the exponential weighting function, defaults to 180
:type span: int, optional
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:return: annualised estimate of exponential covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
assets = prices.columns
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
N = len(assets)
# Loop over matrix, filling entries with the pairwise exp cov
S = np.zeros((N, N))
for i in range(N):
for j in range(i, N):
S[i, j] = S[j, i] = _pair_exp_cov(
returns.iloc[:, i], returns.iloc[:, j], span
)
cov = pd.DataFrame(S * frequency, columns=assets, index=assets)
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
def min_cov_determinant(
prices, returns_data=False, frequency=252, random_state=None, **kwargs
):
"""
Calculate the minimum covariance determinant, an estimator of the covariance matrix
that is more robust to noise.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:param random_state: random seed to make results reproducible, defaults to None
:type random_state: int, optional
:return: annualised estimate of covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
# Extra dependency
try:
import sklearn.covariance
except (ModuleNotFoundError, ImportError):
raise ImportError("Please install scikit-learn via pip or poetry")
assets = prices.columns
if returns_data:
X = prices.dropna(how="all")
else:
X = prices.pct_change().dropna(how="all")
X = np.nan_to_num(X.values)
raw_cov_array = sklearn.covariance.fast_mcd(X, random_state=random_state)[1]
cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * frequency
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
def cov_to_corr(cov_matrix):
"""
Convert a covariance matrix to a correlation matrix.
:param cov_matrix: covariance matrix
:type cov_matrix: pd.DataFrame
:return: correlation matrix
:rtype: pd.DataFrame
"""
if not isinstance(cov_matrix, pd.DataFrame):
warnings.warn("cov_matrix is not a dataframe", RuntimeWarning)
cov_matrix = pd.DataFrame(cov_matrix)
Dinv = np.diag(1 / np.sqrt(np.diag(cov_matrix)))
corr = np.dot(Dinv, np.dot(cov_matrix, Dinv))
return pd.DataFrame(corr, index=cov_matrix.index, columns=cov_matrix.index)
def corr_to_cov(corr_matrix, stdevs):
"""
Convert a correlation matrix to a covariance matrix
:param corr_matrix: correlation matrix
:type corr_matrix: pd.DataFrame
:param stdevs: vector of standard deviations
:type stdevs: array-like
:return: covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(corr_matrix, pd.DataFrame):
warnings.warn("cov_matrix is not a dataframe", RuntimeWarning)
corr_matrix = pd.DataFrame(corr_matrix)
return corr_matrix * np.outer(stdevs, stdevs)
class CovarianceShrinkage:
"""
Provide methods for computing shrinkage estimates of the covariance matrix, using the
sample covariance matrix and choosing the structured estimator to be an identity matrix
multiplied by the average sample variance. The shrinkage constant can be input manually,
though there exist methods (notably Ledoit Wolf) to estimate the optimal value.
Instance variables:
- ``X`` - pd.DataFrame (returns)
- ``S`` - np.ndarray (sample covariance matrix)
- ``delta`` - float (shrinkage constant)
- ``frequency`` - int
"""
def __init__(self, prices, returns_data=False, frequency=252):
"""
:param prices: adjusted closing prices of the asset, each row is a date and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param frequency: number of time periods in a year, defaults to 252 (the number of trading days in a year)
:type frequency: int, optional
"""
# Optional import
try:
from sklearn import covariance
self.covariance = covariance
except (ModuleNotFoundError, ImportError):
raise ImportError("Please install scikit-learn via pip or poetry")
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
self.frequency = frequency
if returns_data:
self.X = prices.dropna(how="all")
else:
self.X = prices.pct_change().dropna(how="all")
self.S = self.X.cov().values
self.delta = None # shrinkage constant
def _format_and_annualize(self, raw_cov_array):
"""
Helper method which annualises the output of shrinkage calculations,
and formats the result into a dataframe
:param raw_cov_array: raw covariance matrix of daily returns
:type raw_cov_array: np.ndarray
:return: annualised covariance matrix
:rtype: pd.DataFrame
"""
assets = self.X.columns
cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * self.frequency
return fix_nonpositive_semidefinite(cov, fix_method="spectral")
def shrunk_covariance(self, delta=0.2):
"""
Shrink a sample covariance matrix to the identity matrix (scaled by the average
sample variance). This method does not estimate an optimal shrinkage parameter,
it requires manual input.
:param delta: shrinkage parameter, defaults to 0.2.
:type delta: float, optional
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
self.delta = delta
N = self.S.shape[1]
# Shrinkage target
mu = np.trace(self.S) / N
F = np.identity(N) * mu
# Shrinkage
shrunk_cov = delta * F + (1 - delta) * self.S
return self._format_and_annualize(shrunk_cov)
def ledoit_wolf(self, shrinkage_target="constant_variance"):
"""
Calculate the Ledoit-Wolf shrinkage estimate for a particular
shrinkage target.
:param shrinkage_target: choice of shrinkage target, either ``constant_variance``,
``single_factor`` or ``constant_correlation``. Defaults to
``constant_variance``.
:type shrinkage_target: str, optional
:raises NotImplementedError: if the shrinkage_target is unrecognised
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
if shrinkage_target == "constant_variance":
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.ledoit_wolf(X)
elif shrinkage_target == "single_factor":
shrunk_cov, self.delta = self._ledoit_wolf_single_factor()
elif shrinkage_target == "constant_correlation":
shrunk_cov, self.delta = self._ledoit_wolf_constant_correlation()
else:
raise NotImplementedError(
"Shrinkage target {} not recognised".format(shrinkage_target)
)
return self._format_and_annualize(shrunk_cov)
def _ledoit_wolf_single_factor(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the Sharpe single-factor matrix as the shrinkage target.
See Ledoit and Wolf (2001).
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
# De-mean returns
t, n = np.shape(X)
Xm = X - X.mean(axis=0)
xmkt = X.mean(axis=1).reshape(t, 1)
# compute sample covariance matrix
sample = np.cov(np.append(Xm, xmkt, axis=1), rowvar=False) * (t - 1) / t
betas = sample[0:n, n].reshape(n, 1)
varmkt = sample[n, n]
sample = sample[:n, :n]
F = np.dot(betas, betas.T) / varmkt
F[np.eye(n) == 1] = np.diag(sample)
# compute shrinkage parameters
c = np.linalg.norm(sample - F, "fro") ** 2
y = Xm ** 2
p = 1 / t * np.sum(np.dot(y.T, y)) - np.sum(sample ** 2)
# r is divided into diagonal
# and off-diagonal terms, and the off-diagonal term
# is itself divided into smaller terms
rdiag = 1 / t * np.sum(y ** 2) - sum(np.diag(sample) ** 2)
z = Xm * np.tile(xmkt, (n,))
v1 = 1 / t * np.dot(y.T, z) - np.tile(betas, (n,)) * sample
roff1 = (
np.sum(v1 * np.tile(betas, (n,)).T) / varmkt
- np.sum(np.diag(v1) * betas.T) / varmkt
)
v3 = 1 / t * np.dot(z.T, z) - varmkt * sample
roff3 = (
np.sum(v3 * np.dot(betas, betas.T)) / varmkt ** 2
- np.sum(np.diag(v3).reshape(-1, 1) * betas ** 2) / varmkt ** 2
)
roff = 2 * roff1 - roff3
r = rdiag + roff
# compute shrinkage constant
k = (p - r) / c
delta = max(0, min(1, k / t))
# compute the estimator
shrunk_cov = delta * F + (1 - delta) * sample
return shrunk_cov, delta
def _ledoit_wolf_constant_correlation(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the constant correlation matrix as the shrinkage target.
See Ledoit and Wolf (2003)
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
t, n = np.shape(X)
S = self.S # sample cov matrix
# Constant correlation target
var = np.diag(S).reshape(-1, 1)
std = np.sqrt(var)
_var = np.tile(var, (n,))
_std = np.tile(std, (n,))
r_bar = (np.sum(S / (_std * _std.T)) - n) / (n * (n - 1))
F = r_bar * (_std * _std.T)
F[np.eye(n) == 1] = var.reshape(-1)
# Estimate pi
Xm = X - X.mean(axis=0)
y = Xm ** 2
pi_mat = np.dot(y.T, y) / t - 2 * np.dot(Xm.T, Xm) * S / t + S ** 2
pi_hat = np.sum(pi_mat)
# Theta matrix, expanded term by term
term1 = np.dot((Xm ** 3).T, Xm) / t
help_ = np.dot(Xm.T, Xm) / t
help_diag = np.diag(help_)
term2 = np.tile(help_diag, (n, 1)).T * S
term3 = help_ * _var
term4 = _var * S
theta_mat = term1 - term2 - term3 + term4
theta_mat[np.eye(n) == 1] = np.zeros(n)
rho_hat = sum(np.diag(pi_mat)) + r_bar * np.sum(
np.dot((1 / std), std.T) * theta_mat
)
# Estimate gamma
gamma_hat = np.linalg.norm(S - F, "fro") ** 2
# Compute shrinkage constant
kappa_hat = (pi_hat - rho_hat) / gamma_hat
delta = max(0.0, min(1.0, kappa_hat / t))
# Compute shrunk covariance matrix
shrunk_cov = delta * F + (1 - delta) * S
return shrunk_cov, delta
def oracle_approximating(self):
"""
Calculate the Oracle Approximating Shrinkage estimate
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.oas(X)
return self._format_and_annualize(shrunk_cov)
from app.utils.fund_rank import *
from app.utils.risk_parity import *
from app.pypfopt import risk_models
from app.pypfopt import expected_returns
from app.pypfopt import EfficientFrontier
def cal_correlation(prod):
"""计算组合内基金相关性
Args:
prod: 组合净值表:索引为日期,列名为基金ID, 内容为净值
Returns:屏蔽基金与自身相关性的相关矩阵,因为基金与自身相关性为1,妨碍后续高相关性基金筛选的判断
"""
prod_return = prod.iloc[:, :].apply(lambda x: simple_return(x))
correlation = prod_return.corr()
return correlation.mask(np.eye(correlation.shape[0], dtype=np.bool))
def rename_col(df, fund_id):
"""将列名由adj_nav改为基金ID
Args:
df: 原始净值表:索引为日期,列名分别为 ”fund_id“, "adj_nav", 内容为[基金ID,净值]
fund_id: 基金ID
Returns:删除 ”fund_id” 列, 重命名 “adj_nav” 列为基金ID的净值表
"""
df.rename(columns={'adj_nav': fund_id}, inplace=True)
df.drop('fund_id', axis=1, inplace=True)
return df
def replace_fund(manager, substrategy, fund_rank):
"""查找不足半年数据的基金的替代基金
Args:
manager: 基金经理ID
substrategy: 基金二级策略
fund_rank: 基金打分排名表
Returns: 满足相同基金经理ID下的同种二级策略的基金ID的第一个结果
"""
df = fund_rank[(fund_rank['manager'] == manager) &
(fund_rank['substrategy'] == substrategy)]
return df['fund_id'].values[0]
def search_rank(fund_rank, fund, metric):
"""查找基金在基金排名表中的指标
Args:
fund_rank: 基金排名表
fund: 输入基金ID
metric: 查找的指标名称
Returns: 基金指标的值
"""
return fund_rank[fund_rank['fund_id'] == fund][metric].values[0]
def translate_single(content, evaluation):
'''
content = [["优秀","良好","一般"],
["优秀","良好","合格","较差"],
["优秀","良好","合格","较差"],
["高","一般","较低"]]
evaluation = [0,1,1,2]
'''
return tuple([content[i][v] if type(v) == int else v for i, v in enumerate(evaluation)])
def choose_good_evaluation(evaluation):
"""抽取好的评价
Args:
evaluation: 个基的评价
Returns: 个基好的评价
"""
v1 = evaluation[1]
v2 = evaluation[2]
v3 = evaluation[3]
v4 = evaluation[4]
v5 = evaluation[5]
if v1[0] > 1:
del evaluation[1]
if v2[0] > 1:
del evaluation[2]
if v3[0] > 1:
del evaluation[3]
if v4[0] != 0 or v4[1] != 0:
del evaluation[4]
if v5[0] < 3 or v5[2] > 1: # 基金经理的基金管理年限小于三年或平均业绩处于中下水平
del evaluation[5]
return evaluation
def choose_bad_evaluation(evaluation):
v1 = evaluation[1]
v2 = evaluation[2]
v3 = evaluation[3]
v4 = evaluation[4]
if v1[0] < 2:
del evaluation[1]
if v2[0] < 2:
del evaluation[2]
if v3[0] < 2:
del evaluation[3]
if v4[0] != 1 or v4[1] != 1:
del evaluation[4]
return evaluation
def get_fund_rank():
sql = "SELECT * FROM fund_rank"
df = pd.read_sql(sql, con)
# df = pd.read_csv('fund_rank.csv', encoding='gbk')
return df
def get_index_daily(index_id):
"""获取指数数据
Args:
index_id: 指数ID
Returns:与组合净值形式相同的表
"""
sql = "SELECT ts_code, trade_date, close FROM index_daily WHERE ts_code='{}'".format(index_id)
df = pd.read_sql(sql, con).dropna(how='any')
df.rename({'ts_code': 'fund_id', 'trade_date': 'end_date', 'close': 'adj_nav'}, axis=1, inplace=True)
df['end_date'] = pd.to_datetime(df['end_date'])
df.set_index('end_date', drop=True, inplace=True)
df.sort_index(inplace=True, ascending=True)
df = rename_col(df, index_id)
return df
def get_tamp_fund():
"""获取探普产品池净值表
Returns:
"""
sql = "SELECT id FROM tamp_fund_info WHERE id LIKE 'HF%'"
df = pd.read_sql(sql, con)
df.rename({'id': 'fund_id'}, axis=1, inplace=True)
return df
def get_risk_level(substrategy):
"""获取风险类型
Args:
substrategy: 二级策略
Returns:
"""
substrategy2risk = {1: "H",
1010: "H", 1020: "H", 1030: "H",
2010: "H",
3010: "H", 3020: "L", 3030: "H", 3040: "L", 3050: "M",
4010: "M", 4020: "M", 4030: "M", 4040: "M",
5010: "M", 5020: "L", 5030: "M",
6010: "L", 6020: "M", 6030: "L",
7010: "H", 7020: "H",
8010: "H", 8020: "M"}
return substrategy2risk[substrategy]
fund_rank = get_fund_rank()
class PortfolioDiagnose(object):
def __init__(self, client_type, portfolio, invest_amount, expect_return=None,
expect_drawdown=None, index_id='000905.SH', invest_type='private', start_date=None, end_date=None):
"""基金诊断
Args:
client_type: 客户类型:1:保守型, 2:稳健型, 3:平衡型, 4:成长型, 5:进取型
portfolio: 投资组合:[基金1, 基金2, 基金3...]
invest_amount: 投资金额:10000000元
invest_type: 投资类型:public, private, ...
start_date: 诊断所需净值的开始日期
end_date: 诊断所需净值的结束日期
"""
self.freq_list = []
self.client_type = client_type
self.portfolio = portfolio
self.expect_return = expect_return
self.expect_drawdown = expect_drawdown
self.index_id = index_id
self.invest_amount = invest_amount
self.invest_type = invest_type
self.start_date = start_date
self.end_date = end_date
if self.end_date is None:
self.end_date = datetime.datetime.now() - datetime.timedelta(days=1)
self.start_date = cal_date(self.end_date, 'Y', 1)
self.replace_pair = dict() # 由于数据不足半年而被替换为相同基金经理和策略的原基金和替换基金的映射
self.no_data_fund = [] # 未在数据库中找到基金净值或者基金经理记录的基金
self.abandon_fund_score = [] # 打分不满足要求的基金
self.abandon_fund_corr = []
self.proposal_fund = [] # 建议的基金
self.old_correlation = None
self.new_correlation = None
self.old_weights = None
self.new_weights = None
def get_portfolio(self, ):
"""获取组合净值表
Returns:
"""
# 获取原始投资组合的第一支基金的净值表
prod = get_nav(self.portfolio[0], self.start_date, invest_type=self.invest_type)
fund_info = get_fund_info(self.end_date, invest_type=self.invest_type)
while prod is None:
# 获取的净值表为空时首先考虑基金净值数据不足半年,查找同一基金经理下的相同二级策略的基金ID作替换
result = fund_info[fund_info['fund_id'] == portfolio[0]]
manager = str(result['manager'].values)
strategy = result['substrategy'].values
replaced_fund = replace_fund(manager, strategy, fund_rank)
if replaced_fund is not None:
# 替换基金数据非空则记录替换的基金对
prod = get_nav(replaced_fund, self.start_date, invest_type=self.invest_type)
self.replace_pair[portfolio[0]] = replaced_fund
else:
# 替换基金数据为空则记录当前基金为找不到数据的基金, 继续尝试获取下一个基金ID的净值表
self.no_data_fund.append(portfolio[0])
self.portfolio.pop(0)
prod = get_nav(self.portfolio[0], self.start_date, invest_type=self.invest_type)
# 记录基金的公布频率
self.freq_list.append(get_frequency(prod))
prod = rename_col(prod, portfolio[0])
# 循环拼接基金净值表构建组合
for idx in range(len(portfolio) - 1):
prod1 = get_nav(portfolio[idx + 1], self.start_date, invest_type=self.invest_type)
if prod1 is None or prod1.index[-1] - prod1.index[0] < 0.6 * (self.end_date - self.start_date):
result = fund_info[fund_info['fund_id'] == portfolio[idx + 1]]
if result['fund_manager_id'].count() != 0:
manager = str(result['fund_manager_id'].values)
substrategy = result['substrategy'].values[0]
replaced_fund = replace_fund(manager, substrategy, fund_rank)
else:
self.no_data_fund.append(portfolio[idx + 1])
continue
if replaced_fund is not None:
prod1 = get_nav(replaced_fund, self.start_date, invest_type=self.invest_type)
self.replace_pair[portfolio[0]] = replaced_fund
self.freq_list.append(get_frequency(prod1))
prod1 = rename_col(prod1, replaced_fund)
else:
self.no_data_fund.append(portfolio[idx + 1])
continue
else:
self.freq_list.append(get_frequency(prod1))
prod1 = rename_col(prod1, portfolio[idx + 1])
# 取prod表和prod1表的并集
prod = pd.merge(prod, prod1, on=['end_date'], how='outer')
# 对所有合并后的基金净值表按最大周期进行重采样
prod.sort_index(inplace=True)
prod.ffill(inplace=True)
prod = resample(prod, get_trade_cal(), min(self.freq_list))
return prod
def abandon(self, prod):
"""建议替换的基金
Args:
prod: 原始组合净值表
Returns: 剔除建议替换基金的组合净值表
"""
self.old_correlation = cal_correlation(prod)
for fund in prod.columns:
z_score = search_rank(fund_rank, fund, metric='z_score')
# 建议替换得分为60或与其他基金相关度大于0.8的基金
if z_score < 60:
self.abandon_fund_score.append(fund)
prod = prod.drop(fund, axis=1)
if np.any(self.old_correlation[fund] > 0.8):
self.abandon_fund_corr.append(fund)
prod = prod.drop(fund, axis=1)
return prod
def proposal(self, prod):
"""建议申购基金
Args:
prod: 剔除建议替换基金的组合净值表
Returns: 增加建议申购基金的组合净值表
"""
# 组合内已包含的策略
included_strategy = set()
# 按每种基金最少投资100w确定组合包含的最大基金数量
max_len = self.invest_amount // 1e6 - len(prod.columns)
# 排名表内包含的所有策略
all_strategy = set(fund_rank['substrategy'].to_list())
if prod is not None:
included_strategy = set([search_rank(fund_rank, fund, metric='substrategy') for fund in prod.columns])
# 待添加策略为所有策略-组合已包含策略
add_strategy = all_strategy - included_strategy
# 获取探普产品池
tamp_fund = get_fund_rank()
# 遍历产品池,推荐得分>80且与组合内其他基金相关度低于0.8的属于待添加策略的基金
for proposal in tamp_fund['fund_id']:
if proposal in fund_rank['fund_id'].to_list():
proposal_z_score = search_rank(fund_rank, proposal, metric='z_score')
proposal_strategy = fund_rank[fund_rank['fund_id'] == proposal]['substrategy'].values[0]
else:
continue
if proposal_z_score > 80 and proposal_strategy in add_strategy:
# if proposal_z_score > 80:
proposal_nav = get_nav(proposal, self.start_date, invest_type=self.invest_type)
# 忽略净值周期大于周更的产品
if get_frequency(proposal_nav) <= 52:
continue
self.freq_list.append(get_frequency(proposal_nav))
proposal_nav = rename_col(proposal_nav, proposal)
# 按最大周期进行重采样,计算新建组合的相关性
prod = pd.merge(prod, proposal_nav, how='outer', on='end_date')
prod.sort_index(inplace=True)
prod.ffill(inplace=True)
prod = resample(prod, get_trade_cal(), min(self.freq_list))
_correlation = cal_correlation(prod)
_correlation = _correlation.fillna(0)
if np.all(_correlation < 0.8):
self.proposal_fund.append(proposal)
max_len -= 1
add_strategy -= {proposal_strategy}
if len(add_strategy) == 0 or max_len == 0:
# if max_len == 0:
break
else:
prod.drop(columns=proposal, inplace=True)
return prod
def optimize(self, ):
origin_portfolio = self.get_portfolio()
abandoned_portfolio = self.abandon(origin_portfolio)
propose_portfolio = self.proposal(abandoned_portfolio)
# propose_portfolio.to_csv('test_portfolio.csv', encoding='gbk')
returns = propose_portfolio.pct_change().dropna()
mu = expected_returns.mean_historical_return(propose_portfolio)
S = risk_models.sample_cov(propose_portfolio)
# if self.client_type == 1:
# proposal_risk = [[x, get_risk_level(search_rank(fund_rank, x, metric='substrategy'))] for x in
# self.proposal_fund]
# self.proposal_fund = list(filter(lambda x: x[1] != 3, proposal_risk))
# proposal_portfolio = list((set(self.portfolio) - set(self.no_data_fund) - set(self.replace_pair.keys())) | \
# (set(self.proposal_fund) | set(self.replace_pair.values())))
propose_risk_mapper = dict()
for fund in propose_portfolio.columns:
propose_risk_mapper[fund] = str(get_risk_level(search_rank(fund_rank, fund, metric='substrategy')))
risk_upper = {"H": 0.0}
risk_lower = {"L": 0.6, "M": 0.4}
ef = EfficientFrontier(mu, S)
ef.add_sector_constraints(propose_risk_mapper, risk_lower, risk_upper)
# weights = ef.nonconvex_objective(deviation_risk_parity, ef.cov_matrix)
ef.efficient_return(0.2)
clean_weights = ef.clean_weights()
# ef.portfolio_performance(verbose=True)
self.new_weights = np.array(list(clean_weights.values()))
# S = np.asmatrix(S)
# w_origin = np.asarray([i for i in w_origin.values()])
# risk_target = np.asarray([1 / len(w_origin)] * len(w_origin))
# self.proposal_weights = calcu_w(w_origin, S, risk_target)
# elif self.client_type == 2:
# elif self.client_type == 3:
# elif self.client_type == 4:
# elif self.client_type == 5:
# print(len(propose_portfolio.columns))
# # 单支基金占投资额的下界为 100W/投资总额
# # w_low = 1e6 / self.invest_amount
# w_low = 0
# w_origin, S, mu = optim_drawdown(propose_portfolio, 0.5, [w_low, 1], min(self.freq_list))
# print(w_origin)
# S = np.asmatrix(S)
# w_origin = np.asarray([i for i in w_origin.values()])
# risk_target = np.asarray([1 / len(w_origin)] * len(w_origin))
# self.proposal_weights = calcu_w(w_origin, S, risk_target)
def return_compare(self):
index_data = get_index_daily(self.index_id)
origin_portfolio = self.get_portfolio()
abandoned_portfolio = self.abandon(origin_portfolio)
propose_portfolio = self.proposal(abandoned_portfolio)
index_data = pd.merge(index_data, propose_portfolio, how='inner', left_index=True, right_index=True)
index_return = index_data.iloc[:, :] / index_data.iloc[0, :] - 1
# origin_fund_return = origin_portfolio.iloc[:, :] / origin_portfolio.iloc[0, :] - 1
propose_fund_return = propose_portfolio.iloc[:, :] / propose_portfolio.iloc[0, :] - 1
print(self.new_weights)
propose_fund_return['return'] = propose_fund_return.T.iloc[:, :].apply(lambda x: np.dot(self.new_weights, x))
propose_fund_return.to_csv('new_port.csv', encoding='gbk')
return index_return, propose_fund_return
def old_evaluation(self):
start_year = self.start_date.year
start_month = self.start_date.month
current_year = datetime.datetime.now().year
current_month = datetime.datetime.now().month
current_day = datetime.datetime.now().day
past_month = (current_year - start_year) * 12 + current_month - start_month
num_fund = len(self.portfolio)
abandon_fund = [[x, self.invest_type] for x in self.abandon_fund]
old_strategy = set([search_rank(fund_rank, x, metric='substrategy') for x in self.portfolio])
data = [start_year, start_month, past_month, self.invest_amount,
current_year, current_month, current_day]
return data
def new_evaluation(self):
hold_fund = set(self.portfolio) - set(self.abandon_fund)
abandon_fund = self.abandon_fund
proposal_fund = self.proposal_fund
data = [hold_fund, abandon_fund, proposal_fund]
return data
def single_evaluation(self, fund_id):
"""
1、该基金整体表现优秀/良好/一般,收益能力优秀/良好/合格/较差,回撤控制能力优秀/良好/合格/较差,风险收益比例较高/一般/较低;
2、在收益方面,该基金年化收益能力高于/持平/低于同类基金平均水平,有x%区间跑赢大盘/指数,绝对收益能力优秀/一般;
3、在风险方面,该基金抵御风险能力优秀/良好/一般,在同类基金中处于高/中/低等水平,最大回撤为x%,高于/持平/低于同类基金平均水平;
4、该基金收益较好/较差的同时回撤较大/较小,也就是说,该基金在用较大/较小风险换取较大/较小收益,存在较高/较低风险;
5、基金经理,投资年限5.23年,经验丰富;投资能力较强,生涯中共管理过X只基金,历任的X只基金平均业绩在同类中处于上游水平,其中x只排名在前x%;生涯年化回报率x%,同期大盘只有x%
旧个基显示1-4,新个基显示1-5。
旧个基如果是要保留的,显示好的评价。
如果是要剔除的,显示坏的评价。
新个基只显示好的评价。
Args:
fund_id:
Returns:
"""
z_score = search_rank(fund_rank, fund_id, metric='z_score')
total_level = np.select([z_score >= 80,
70 <= z_score < 80,
z_score < 70], [0, 1, 2]).item()
return_rank = search_rank(fund_rank, fund_id, metric='annual_return_rank')
return_level = np.select([return_rank >= 0.8,
0.7 <= return_rank < 0.8,
0.6 <= return_rank < 0.7,
return_rank < 0.6], [0, 1, 2, 3]).item()
return_bool = 1 if return_level > 2 else 0
return_triple = return_level - 1 if return_level >= 2 else return_level
drawdown_rank = search_rank(fund_rank, fund_id, metric='max_drawdown_rank')
drawdown_value = search_rank(fund_rank, fund_id, metric='max_drawdown')
drawdown_level = np.select([drawdown_rank >= 0.8,
0.7 <= drawdown_rank < 0.8,
0.6 <= drawdown_rank < 0.7,
drawdown_rank < 0.6], [0, 1, 2, 3]).item()
drawdown_bool = 1 if drawdown_level > 2 else 0
drawdown_triple = drawdown_level - 1 if drawdown_level >= 2 else drawdown_level
sharp_rank = search_rank(fund_rank, fund_id, metric='sharp_ratio_rank')
sharp_level = np.select([sharp_rank >= 0.8,
0.6 <= sharp_rank < 0.8,
sharp_rank < 0.6], [0, 1, 2]).item()
data = {1: [total_level, return_level, drawdown_level, sharp_level],
2: [return_triple, "TO DO", return_triple],
3: [drawdown_triple, drawdown_triple, drawdown_value, drawdown_triple],
4: [return_bool, drawdown_bool, drawdown_bool, return_bool, drawdown_bool]}
if fund_id in self.abandon_fund:
data['remove'] = True
elif fund_id in self.proposal_fund:
data[5] = [1] * 7
data['remove'] = False
else:
data['remove'] = False
x = '30%'
# 第一个评价
content = {1: [["优秀", "良好", "一般"],
["优秀", "良好", "合格", "较差"],
["优秀", "良好", "合格", "较差"],
["高", "一般", "较低"]],
# 第二个评价
2: [["高于", "持平", "低于"],
x,
["优秀", "一般"]],
# 第三个评价
3: [["优秀", "良好", "一般"],
["高", "中", "低"], x,
["高于", "持平", "低于"]],
# 第四个评价
4: [["较好", "较差"],
["较小", "较大"],
["较小", "较小"],
["较大", "较小"],
["较低", "较高"]],
5: [["TO DO"] * 7]}
sentence = {
1: "1、该基金整体表现%s,收益能力%s,回撤控制能力%s,风险收益比例%s;\n",
2: "2、在收益方面,该基金年化收益能力%s同类基金平均水平,有%s区间跑赢指数,绝对收益能力%s;\n",
3: "3、在风险方面,该基金抵御风险能力%s,在同类基金中处于%s等水平,最大回撤为%s,%s同类基金平均水平;\n",
4: "4、该基金收益%s的同时回撤%s,也就是说,该基金在用%s风险换取%s收益,存在%s风险;\n",
5: "5、基金经理,投资年限%s年,经验丰富;投资能力较强,生涯中共管理过%s只基金,历任的%s只基金平均业绩在同类中处于上游水平,其中%s只排名在前%s;生涯年化回报率%s,同期大盘只有%s;"}
remove = data["remove"]
del data["remove"]
# 不剔除,选择好的话术
if not remove:
evaluation = choose_good_evaluation(data)
# 剔除,选择坏的话术
else:
evaluation = choose_bad_evaluation(data)
print(evaluation)
ret = ""
for k, v in evaluation.items():
# print(translate_single(content[k], v))
ret = ret + sentence[k] % translate_single(content[k], v)
return {fund_id: ret}
def old_portfolio_evaluation(self, ):
result = []
for fund in self.portfolio:
result.append(self.single_evaluation(fund))
return result
def propose_fund_evaluation(self, ):
result = []
for fund in self.proposal_fund:
result.append(self.single_evaluation(fund))
return result
if __name__ == '__main__':
portfolio = ['HF00002JJ2', 'HF00005DBQ', 'HF0000681Q', 'HF00006693', 'HF00006AZF', 'HF00006BGS']
portfolio_diagnose = PortfolioDiagnose(client_type=1, portfolio=portfolio, invest_amount=10000000)
portfolio_diagnose.optimize()
print(portfolio_diagnose.old_correlation)
# print(portfolio_diagnose.propose_fund_evaluation())
import pymysql
import tushare as ts
# import pymysql
from sqlalchemy import create_engine
db = create_engine(
'mysql+pymysql://tamp_fund:@imeng408@tamper.mysql.polardb.rds.aliyuncs.com:3306/tamp_fund?charset=utf8mb4')
con = db.connect()
import logging
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
from app.utils.week_evaluation import *
from week_evaluation import *
# con = pymysql.connect(host='tamper.mysql.polardb.rds.aliyuncs.com',
# user='tamp_fund',
# password='@imeng408',
# database='tamp_fund',
# charset='utf8',
# use_unicode='True')
con = pymysql.connect(host='tamper.mysql.polardb.rds.aliyuncs.com',
user='tamp_fund',
password='@imeng408',
database='tamp_fund',
charset='utf8',
use_unicode='True')
def get_nav(fund, start_date, rollback=False, invest_type='public'):
"""获取基金ID为fund, 起始日期为start_date, 终止日期为当前日期的基金净值表
Args:
fund[str]:基金ID
start_date[date]:起始日期
rollback[bool]:当起始日期不在净值公布日历中,是否往前取最近的净值公布日
public[bool]:是否为公募
Returns:df[DataFrame]: 索引为净值公布日, 列为复权净值的净值表; 查询失败则返回None
def get_dataframe(fund, start_date, rollback=False):
"""
if invest_type == 'public':
sql = "SELECT ts_code, end_date, adj_nav FROM public_fund_nav " \
"WHERE ts_code='{0}'".format(fund)
"WHERE ts_code='{}'".format(fund)
df = pd.read_sql(sql, con).dropna(how='any')
df.rename({'ts_code': 'fund_id'}, axis=1, inplace=True)
else:
sql = "SELECT fund_id, price_date, cumulative_nav FROM fund_nav " \
"WHERE fund_id='{}'".format(fund)
df = pd.read_sql(sql, con).dropna(how='any')
df.rename({'price_date': 'end_date', 'cumulative_nav': 'adj_nav'}, axis=1, inplace=True)
if df['adj_nav'].count() == 0:
logging.log(logging.ERROR, "CAN NOT FIND {}".format(fund))
return None
df['end_date'] = pd.to_datetime(df['end_date'])
if rollback:
if rollback and df['end_date'].min() < start_date < df['end_date'].max():
while start_date not in list(df['end_date']):
start_date = start_date - datetime.timedelta(days=1)
start_date -= datetime.timedelta(days=1)
df = df[df['end_date'] >= start_date]
df.drop_duplicates(subset='end_date', inplace=True, keep='first')
......@@ -36,9 +60,18 @@ def get_dataframe(fund, start_date, rollback=False):
def get_frequency(df):
"""获取基金净值一年当中公布的频率
Args:
df[DataFrame]:以基金净值公布日期为索引的基金净值表
Returns:[int]: 年公布频率;查询失败则返回ValueError
"""
index_series = df.index.to_series()
freq_series = index_series - index_series.shift(1)
logging.log(logging.INFO, freq_series.describe())
# freq_series = index_series - index_series.shift(1)
freq_series = index_series.diff(1)
logging.log(logging.DEBUG, freq_series.describe())
f = freq_series.mode()[0].days
if f in range(0, 3):
return 250
......@@ -54,41 +87,67 @@ def get_frequency(df):
raise ValueError
def get_trade_cal(start_date, end_date, method):
if method == 'mysql':
def get_trade_cal():
"""获取上交所交易日历表
Returns:df[DataFrame]: 索引为交易日, 列为交易日的上交所交易日历表
"""
sql = 'SELECT cal_date FROM stock_trade_cal WHERE is_open=1'
df = pd.read_sql(sql, con)
df['end_date'] = pd.to_datetime(df['cal_date'])
df.set_index('end_date', drop=False, inplace=True)
elif method == 'tushare':
ts.set_token('ac1f734f8a25651aa07319ca35b1b0c0854e361e306fe85d85e092bc')
pro = ts.pro_api()
if end_date is not None:
df = pro.trade_cal(exchange='SSE', start_date=start_date, end_date=end_date, is_open='1')
else:
df = pro.trade_cal(exchange='SSE', start_date=start_date, is_open='1')
df.drop(['exchange', 'is_open'], axis=1, inplace=True)
df.rename(columns={'cal_date': 'end_date'}, inplace=True)
return df
def get_manager():
def get_manager(invest_type):
"""获取基金对应基金经理表
Args:
invest_type: 资产类型:公募, 私募等
Returns:
"""
if invest_type == 'public':
sql = 'SELECT ts_code, name FROM public_fund_manager WHERE end_date IS NULL'
df = pd.read_sql(sql, con)
else:
sql = 'SELECT fund_id, fund_manager_id FROM fund_manager_mapping'
df = pd.read_sql(sql, con)
return df
def get_fund_info(end_date):
def get_fund_info(end_date, invest_type):
"""[summary]
Args:
end_date ([type]): [description]
invest_type ([type]): [description]
Returns:
[type]: [description]
"""
if invest_type == 'public':
sql = "SELECT ts_code, fund_type, management FROM public_fund_basic " \
"WHERE delist_date IS NULL AND (due_date IS NULL OR due_date>'{}')".format(end_date.strftime('%Y%m%d'))
df = pd.read_sql(sql, con).dropna(how='all')
manager_info = get_manager()
df = pd.merge(df, manager_info, how="left", on='ts_code')
manager_info = get_manager(invest_type)
df.rename({'ts_code': 'fund_id'}, axis=1, inplace=True)
df = pd.merge(df, manager_info, how="left", on='fund_id')
else:
sql = "SELECT id, substrategy FROM fund_info WHERE delete_tag=0 " \
"AND substrategy!=-1"
df = pd.read_sql(sql, con).dropna(how='all')
df.rename({'id': 'fund_id'}, axis=1, inplace=True)
manager_info = get_manager(invest_type)
df = pd.merge(df, manager_info, how="inner", on='fund_id')
return df
def resample(df, trading_cal, freq):
def resample(df, trading_cal, freq, simple_flag=True):
"""对基金净值表进行粒度不同的重采样,并剔除不在交易日中的结果
Args:
......@@ -102,7 +161,13 @@ def resample(df, trading_cal, freq):
freq_dict = {250: 'B', 52: 'W-FRI', 12: 'M', 24: 'SM', 3: 'Q'}
resample_freq = freq_dict[freq]
# 按采样频率进行重采样并进行净值的前向填充
df = df.resample(rule=resample_freq).ffill()
df = df.resample(rule=resample_freq, closed='right').ffill()
# 计算年化指标时简化重采样过程
if simple_flag and freq == 250:
return pd.merge(df, trading_cal, how='inner', left_index=True, right_index=True)
elif simple_flag and freq != 250:
return df
# 根据采样频率确定最大日期偏移量(保证偏移后的日期与重采样的日期在同一周,同一月,同一季度等)
timeoffset_dict = {250: 1, 52: 5, 12: 30, 24: 15, 3: 120}
......@@ -147,42 +212,49 @@ def cal_date(date, period_type, period):
def metric_rank(df):
for metric in ['annual_return', 'downside_risk', 'max_drawdown', 'sortino_ratio']:
for metric in ['annual_return', 'downside_risk', 'max_drawdown', 'sharp_ratio']:
if metric in ['downside_risk', 'max_drawdown']:
ascending = False
else:
ascending = True
df['{}_rank'.format(metric)] = df.groupby(['invest_type'])[metric].rank(ascending=ascending, pct=True)
df['{}_rank'.format(metric)] = df.groupby(['substrategy'])[metric].rank(ascending=ascending, pct=True)
return df
def public_fund_rank(start_date, end_date):
fund_info = get_fund_info(end_date)
group = fund_info.groupby('fund_type')
grouped_fund = group['ts_code'].unique()
def fund_rank(start_date, end_date, invest_type='private'):
fund_info = get_fund_info(end_date, invest_type=invest_type)
group = fund_info.groupby('substrategy')
grouped_fund = group['fund_id'].unique()
trading_cal = get_trade_cal(start_date, end_date, method='mysql')
trading_cal = get_trade_cal()
metric_df = pd.DataFrame(columns=('ts_code', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio',
'volatility', 'sortino_ratio', 'downside_risk', 'invest_type'))
metric_df = pd.DataFrame(columns=('fund_id', 'range_return', 'annual_return', 'max_drawdown', 'sharp_ratio',
'volatility', 'sortino_ratio', 'downside_risk', 'substrategy'))
skipped_funds = []
for invest_type in grouped_fund.index:
for fund in grouped_fund[invest_type]:
for substrategy in grouped_fund.index:
for fund in grouped_fund[substrategy]:
df = get_dataframe(fund, start_date)
df = get_nav(fund, start_date, rollback=False, invest_type=invest_type)
try:
if df.index[-1] - df.index[0] < 0.6 * (end_date - start_date):
skipped_funds.append(fund)
logging.log(logging.INFO, 'Skipped {}'.format(fund))
continue
n = get_frequency(df)
except Exception as e:
logging.log(logging.ERROR, repr(e))
# logging.log(logging.ERROR, repr(e))
logging.log(logging.INFO, 'Skipped {}'.format(fund))
continue
df = resample(df, trading_cal, n)
try:
_ = get_frequency(df)
except ValueError:
continue
logging.log(logging.INFO, "Dealing with {}".format(fund))
net_worth = df['adj_nav'].astype(float)
......@@ -199,16 +271,16 @@ def public_fund_rank(start_date, end_date):
down_risk = downside_risk(sim_return, bank_rate=0.015, n=n)
sor_ratio = sortino_ratio(ex_return, down_risk, n)
manager = fund_info[fund_info['ts_code'] == fund]['name'].values
management = fund_info[fund_info['ts_code'] == fund]['management'].values
manager = fund_info[fund_info['fund_id'] == fund]['fund_manager_id'].values
# management = fund_info[fund_info['fund_id'] == fund]['management'].values
row = pd.Series([fund, rng_return, ann_return, drawdown, shp_ratio,
vol, sor_ratio, down_risk, invest_type, manager, management],
index=['ts_code', 'range_return', 'annual_return', 'max_drawdown',
vol, sor_ratio, down_risk, substrategy, manager],
index=['fund_id', 'range_return', 'annual_return', 'max_drawdown',
'sharp_ratio', 'volatility', 'sortino_ratio', 'downside_risk',
'invest_type', 'manager', 'management'])
'substrategy', 'manager'])
metric_df = metric_df.append(row, ignore_index=True)
metric_df.set_index('ts_code', inplace=True)
metric_df.set_index('fund_id', inplace=True)
df = metric_rank(metric_df)
df['z_score'] = z_score(df['annual_return_rank'],
......@@ -221,5 +293,8 @@ def public_fund_rank(start_date, end_date):
if __name__ == '__main__':
end_date = datetime.datetime.now() - datetime.timedelta(days=1)
start_date = cal_date(end_date, 'Y', 1)
public_fund_rank = public_fund_rank(start_date, end_date)
public_fund_rank.to_csv('public_fund_rank.csv', encoding='gbk')
fund_rank = fund_rank(start_date, end_date, False)
# fund_rank.to_csv("fund_rank.csv", encoding='gbk')
# df = pd.read_csv('fund_rank.csv')
# df.to_sql("fund_rank", con, if_exists='replace')
con.close()
\ No newline at end of file
import numpy as np
import pandas as pd
from scipy.optimize import minimize
# 风险预算优化
def calculate_portfolio_var(w, S):
# 计算组合风险的函数
w = np.mat(w)
return w * S * w.T
def calculate_risk_contribution(w, S):
# 计算单个资产对总体风险贡献度的函数
w = np.mat(w)
sigma = np.sqrt(calculate_portfolio_var(w, S))
# 边际风险贡献
MRC = S * w.T / sigma
# 风险贡献
RC = np.multiply(MRC, w.T)
return RC
def risk_budget_objective(w, pars):
# 计算组合风险
S = pars[0] # 协方差矩阵
risk_target = pars[1] # 组合中资产预期风险贡献度的目标向量
sig_p = np.sqrt(calculate_portfolio_var(w, S)) # portfolio sigma
risk_target = np.asmatrix(np.multiply(sig_p, risk_target))
asset_RC = calculate_risk_contribution(w, S)
J = sum(np.square(asset_RC - risk_target.T))[0, 0] # sum of squared error
return J
def total_weight_constraint(x):
return np.sum(x) - 1.0
def long_only_constraint(x):
return x
# 根据资产预期目标风险贡献度来计算各资产的权重
def calcu_w(w_origin, S, risk_target):
# w0 = [0.2, 0.2, 0.2, 0.6]
# x_t = [0.25, 0.25, 0.25, 0.25] # 目标是让四个资产风险贡献度相等,即都为25%
cons = ({'type': 'eq', 'fun': total_weight_constraint},)
# {'type': 'ineq', 'fun': long_only_constraint})
res = minimize(risk_budget_objective, w_origin, args=[S, risk_target], method='SLSQP',
bounds=[[0,1]] * len(w_origin),
constraints=cons, options={'disp': True})
w_rb = np.asmatrix(res.x)
return w_rb
# return res.x
# 将各资产风险贡献度绘制成柱状图
def plot_rc(w, S):
rc = calculate_risk_contribution(w, S)
print(rc)
if __name__ == '__main__':
P = pd.read_csv("tests/resources/stock_prices.csv", parse_dates=True, index_col="date")
w_origin, S, mu = optim_drawdown(P, 0.6, [0.001, 1])
S = np.asmatrix(S)
w_origin = np.asarray([i for i in w_origin.values()])
risk_target = np.asarray([1/len(w_origin)] * len(w_origin))
print(risk_target)
w_rb = calcu_w(w_origin, S, risk_target)
print('各资产权重:', w_rb)
print(w_rb @ mu)
plot_rc(w_rb)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment