1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
"""
The ``base_optimizer`` module houses the parent classes ``BaseOptimizer`` from which all
optimisers will inherit. ``BaseConvexOptimizer`` is the base class for all ``cvxpy`` (and ``scipy``)
optimisation.
Additionally, we define a general utility function ``portfolio_performance`` to
evaluate return and risk for a given set of portfolio weights.
"""
import collections
import json
import warnings
import numpy as np
import pandas as pd
import cvxpy as cp
import scipy.optimize as sco
from . import objective_functions
from . import exceptions
class BaseOptimizer:
"""
Instance variables:
- ``n_assets`` - int
- ``tickers`` - str list
- ``weights`` - np.ndarray
Public methods:
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(self, n_assets, tickers=None):
"""
:param n_assets: number of assets
:type n_assets: int
:param tickers: name of assets
:type tickers: list
"""
self.n_assets = n_assets
if tickers is None:
self.tickers = list(range(n_assets))
else:
self.tickers = tickers
# Outputs
self.weights = None
def _make_output_weights(self, weights=None):
"""
Utility function to make output weight dict from weight attribute (np.array). If no
arguments passed, use self.tickers and self.weights. If one argument is passed, assume
it is an alternative weight array so use self.tickers and the argument.
"""
if weights is None:
weights = self.weights
return collections.OrderedDict(zip(self.tickers, weights))
def set_weights(self, input_weights):
"""
Utility function to set weights attribute (np.array) from user input
:param input_weights: {ticker: weight} dict
:type input_weights: dict
"""
self.weights = np.array([input_weights[ticker] for ticker in self.tickers])
def clean_weights(self, cutoff=1e-4, rounding=5):
"""
Helper method to clean the raw weights, setting any weights whose absolute
values are below the cutoff to zero, and rounding the rest.
:param cutoff: the lower bound, defaults to 1e-4
:type cutoff: float, optional
:param rounding: number of decimal places to round the weights, defaults to 5.
Set to None if rounding is not desired.
:type rounding: int, optional
:return: asset weights
:rtype: OrderedDict
"""
if self.weights is None:
raise AttributeError("Weights not yet computed")
clean_weights = self.weights.copy()
clean_weights[np.abs(clean_weights) < cutoff] = 0
if rounding is not None:
if not isinstance(rounding, int) or rounding < 1:
raise ValueError("rounding must be a positive integer")
clean_weights = np.round(clean_weights, rounding)
return self._make_output_weights(clean_weights)
def save_weights_to_file(self, filename="weights.csv"):
"""
Utility method to save weights to a text file.
:param filename: name of file. Should be csv, json, or txt.
:type filename: str
"""
clean_weights = self.clean_weights()
ext = filename.split(".")[1]
if ext == "csv":
pd.Series(clean_weights).to_csv(filename, header=False)
elif ext == "json":
with open(filename, "w") as fp:
json.dump(clean_weights, fp)
elif ext == "txt":
with open(filename, "w") as f:
f.write(str(dict(clean_weights)))
else:
raise NotImplementedError("Only supports .txt .json .csv")
class BaseConvexOptimizer(BaseOptimizer):
"""
The BaseConvexOptimizer contains many private variables for use by
``cvxpy``. For example, the immutable optimisation variable for weights
is stored as self._w. Interacting directly with these variables is highly
discouraged.
Instance variables:
- ``n_assets`` - int
- ``tickers`` - str list
- ``weights`` - np.ndarray
- ``solver`` - str
Public methods:
- ``add_objective()`` adds a (convex) objective to the optimisation problem
- ``add_constraint()`` adds a (linear) constraint to the optimisation problem
- ``convex_objective()`` solves for a generic convex objective with linear constraints
- ``nonconvex_objective()`` solves for a generic nonconvex objective using the scipy backend.
This is prone to getting stuck in local minima and is generally *not* recommended.
- ``set_weights()`` creates self.weights (np.ndarray) from a weights dict
- ``clean_weights()`` rounds the weights and clips near-zeros.
- ``save_weights_to_file()`` saves the weights to csv, json, or txt.
"""
def __init__(
self, n_assets, tickers=None, weight_bounds=(0, 1), solver=None, verbose=False
):
"""
:param weight_bounds: minimum and maximum weight of each asset OR single min/max pair
if all identical, defaults to (0, 1). Must be changed to (-1, 1)
for portfolios with shorting.
:type weight_bounds: tuple OR tuple list, optional
:param solver: name of solver. list available solvers with: ``cvxpy.installed_solvers()``
:type solver: str, optional. Defaults to "ECOS"
:param verbose: whether performance and debugging info should be printed, defaults to False
:type verbose: bool, optional
"""
super().__init__(n_assets, tickers)
# Optimisation variables
self._w = cp.Variable(n_assets)
self._objective = None
self._additional_objectives = []
self._constraints = []
self._lower_bounds = None
self._upper_bounds = None
self._map_bounds_to_constraints(weight_bounds)
self._solver = solver
self._verbose = verbose
def _map_bounds_to_constraints(self, test_bounds):
"""
Process input bounds into a form acceptable by cvxpy and add to the constraints list.
:param test_bounds: minimum and maximum weight of each asset OR single min/max pair
if all identical OR pair of arrays corresponding to lower/upper bounds. defaults to (0, 1).
:type test_bounds: tuple OR list/tuple of tuples OR pair of np arrays
:raises TypeError: if ``test_bounds`` is not of the right type
:return: bounds suitable for cvxpy
:rtype: tuple pair of np.ndarray
"""
# If it is a collection with the right length, assume they are all bounds.
if len(test_bounds) == self.n_assets and not isinstance(
test_bounds[0], (float, int)
):
bounds = np.array(test_bounds, dtype=np.float)
self._lower_bounds = np.nan_to_num(bounds[:, 0], nan=-np.inf)
self._upper_bounds = np.nan_to_num(bounds[:, 1], nan=np.inf)
else:
# Otherwise this must be a pair.
if len(test_bounds) != 2 or not isinstance(test_bounds, (tuple, list)):
raise TypeError(
"test_bounds must be a pair (lower bound, upper bound) "
"OR a collection of bounds for each asset"
)
lower, upper = test_bounds
# Replace None values with the appropriate +/- 1
if np.isscalar(lower) or lower is None:
lower = -1 if lower is None else lower
self._lower_bounds = np.array([lower] * self.n_assets)
upper = 1 if upper is None else upper
self._upper_bounds = np.array([upper] * self.n_assets)
else:
self._lower_bounds = np.nan_to_num(lower, nan=-1)
self._upper_bounds = np.nan_to_num(upper, nan=1)
self._constraints.append(self._w >= self._lower_bounds)
self._constraints.append(self._w <= self._upper_bounds)
# self._constraints.append()
def _solve_cvxpy_opt_problem(self):
"""
Helper method to solve the cvxpy problem and check output,
once objectives and constraints have been defined
:raises exceptions.OptimizationError: if problem is not solvable by cvxpy
"""
try:
opt = cp.Problem(cp.Minimize(self._objective), self._constraints)
if self._solver is not None:
opt.solve(solver=self._solver, verbose=self._verbose)
else:
opt.solve(verbose=self._verbose)
except (TypeError, cp.DCPError) as e:
raise exceptions.OptimizationError from e
if opt.status != "optimal":
raise exceptions.OptimizationError
self.weights = self._w.value.round(16) + 0.0 # +0.0 removes signed zero
return self._make_output_weights()
def add_objective(self, new_objective, **kwargs):
"""
Add a new term into the objective function. This term must be convex,
and built from cvxpy atomic functions.
Example::
def L1_norm(w, k=1):
return k * cp.norm(w, 1)
ef.add_objective(L1_norm, k=2)
:param new_objective: the objective to be added
:type new_objective: cp.Expression (i.e function of cp.Variable)
"""
self._additional_objectives.append(new_objective(self._w, **kwargs))
def add_constraint(self, new_constraint):
"""
Add a new constraint to the optimisation problem. This constraint must be linear and
must be either an equality or simple inequality.
Examples::
ef.add_constraint(lambda x : x[0] == 0.02)
ef.add_constraint(lambda x : x >= 0.01)
ef.add_constraint(lambda x: x <= np.array([0.01, 0.08, ..., 0.5]))
:param new_constraint: the constraint to be added
:type constraintfunc: lambda function
"""
if not callable(new_constraint):
raise TypeError("New constraint must be provided as a lambda function")
self._constraints.append(new_constraint(self._w))
def add_sector_constraints(self, sector_mapper, sector_lower, sector_upper):
"""
Adds constraints on the sum of weights of different groups of assets.
Most commonly, these will be sector constraints e.g portfolio's exposure to
tech must be less than x%::
sector_mapper = {
"GOOG": "tech",
"FB": "tech",,
"XOM": "Oil/Gas",
"RRC": "Oil/Gas",
"MA": "Financials",
"JPM": "Financials",
}
sector_lower = {"tech": 0.1} # at least 10% to tech
sector_upper = {
"tech": 0.4, # less than 40% tech
"Oil/Gas: 0.1 # less than 10% oil and gas
}
:param sector_mapper: dict that maps tickers to sectors
:type sector_mapper: {str: str} dict
:param sector_lower: lower bounds for each sector
:type sector_lower: {str: float} dict
:param sector_upper: upper bounds for each sector
:type sector_upper: {str:float} dict
"""
if np.any(self._lower_bounds < 0):
warnings.warn(
"Sector constraints may not produce reasonable results if shorts are allowed."
)
for sector in sector_upper:
is_sector = [sector_mapper[t] == sector for t in self.tickers]
self._constraints.append(cp.sum(self._w[is_sector]) <= sector_upper[sector])
for sector in sector_lower:
is_sector = [sector_mapper[t] == sector for t in self.tickers]
self._constraints.append(cp.sum(self._w[is_sector]) >= sector_lower[sector])
def convex_objective(self, custom_objective, weights_sum_to_one=True, **kwargs):
"""
Optimise a custom convex objective function. Constraints should be added with
``ef.add_constraint()``. Optimiser arguments must be passed as keyword-args. Example::
# Could define as a lambda function instead
def logarithmic_barrier(w, cov_matrix, k=0.1):
# 60 Years of Portfolio Optimisation, Kolm et al (2014)
return cp.quad_form(w, cov_matrix) - k * cp.sum(cp.log(w))
w = ef.convex_objective(logarithmic_barrier, cov_matrix=ef.cov_matrix)
:param custom_objective: an objective function to be MINIMISED. This should be written using
cvxpy atoms Should map (w, `**kwargs`) -> float.
:type custom_objective: function with signature (cp.Variable, `**kwargs`) -> cp.Expression
:param weights_sum_to_one: whether to add the default objective, defaults to True
:type weights_sum_to_one: bool, optional
:raises OptimizationError: if the objective is nonconvex or constraints nonlinear.
:return: asset weights for the efficient risk portfolio
:rtype: OrderedDict
"""
# custom_objective must have the right signature (w, **kwargs)
self._objective = custom_objective(self._w, **kwargs)
for obj in self._additional_objectives:
self._objective += obj
if weights_sum_to_one:
self._constraints.append(cp.sum(self._w) == 1)
return self._solve_cvxpy_opt_problem()
def nonconvex_objective(
self,
custom_objective,
objective_args=None,
weights_sum_to_one=True,
constraints=None,
solver="SLSQP",
initial_guess=None,
):
"""
Optimise some objective function using the scipy backend. This can
support nonconvex objectives and nonlinear constraints, but often gets stuck
at local minima. This method is not recommended – caveat emptor. Example::
# Market-neutral efficient risk
constraints = [
{"type": "eq", "fun": lambda w: np.sum(w)}, # weights sum to zero
{
"type": "eq",
"fun": lambda w: target_risk ** 2 - np.dot(w.T, np.dot(ef.cov_matrix, w)),
}, # risk = target_risk
]
ef.nonconvex_objective(
lambda w, mu: -w.T.dot(mu), # min negative return (i.e maximise return)
objective_args=(ef.expected_returns,),
weights_sum_to_one=False,
constraints=constraints,
)
:param objective_function: an objective function to be MINIMISED. This function
should map (weight, args) -> cost
:type objective_function: function with signature (np.ndarray, args) -> float
:param objective_args: arguments for the objective function (excluding weight)
:type objective_args: tuple of np.ndarrays
:param weights_sum_to_one: whether to add the default objective, defaults to True
:type weights_sum_to_one: bool, optional
:param constraints: list of constraints in the scipy format (i.e dicts)
:type constraints: dict list
:param solver: which SCIPY solver to use, e.g "SLSQP", "COBYLA", "BFGS".
User beware: different optimisers require different inputs.
:type solver: string
:param initial_guess: the initial guess for the weights, shape (n,) or (n, 1)
:type initial_guess: np.ndarray
:return: asset weights that optimise the custom objective
:rtype: OrderedDict
"""
# Sanitise inputs
if not isinstance(objective_args, tuple):
objective_args = (objective_args,)
# Make scipy bounds
bound_array = np.vstack((self._lower_bounds, self._upper_bounds)).T
bounds = list(map(tuple, bound_array))
if initial_guess is None:
initial_guess = np.array([1 / self.n_assets] * self.n_assets)
# Construct constraints
final_constraints = []
if weights_sum_to_one:
final_constraints.append({"type": "eq", "fun": lambda x: np.sum(x) - 1})
if constraints is not None:
final_constraints += constraints
result = sco.minimize(
custom_objective,
x0=initial_guess,
args=objective_args,
method=solver,
bounds=bounds,
constraints=final_constraints,
)
self.weights = result["x"]
return self._make_output_weights()
def portfolio_performance(
weights, expected_returns, cov_matrix, verbose=False, risk_free_rate=0.02
):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio.
:param expected_returns: expected returns for each asset. Can be None if
optimising for volatility only (but not recommended).
:type expected_returns: np.ndarray or pd.Series
:param cov_matrix: covariance of returns for each asset
:type cov_matrix: np.array or pd.DataFrame
:param weights: weights or assets
:type weights: list, np.array or dict, optional
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02
:type risk_free_rate: float, optional
:raises ValueError: if weights have not been calcualted yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
if isinstance(weights, dict):
if isinstance(expected_returns, pd.Series):
tickers = list(expected_returns.index)
elif isinstance(cov_matrix, pd.DataFrame):
tickers = list(cov_matrix.columns)
else:
tickers = list(range(len(expected_returns)))
new_weights = np.zeros(len(tickers))
for i, k in enumerate(tickers):
if k in weights:
new_weights[i] = weights[k]
if new_weights.sum() == 0:
raise ValueError("Weights add to zero, or ticker names don't match")
elif weights is not None:
new_weights = np.asarray(weights)
else:
raise ValueError("Weights is None")
sigma = np.sqrt(objective_functions.portfolio_variance(new_weights, cov_matrix))
if expected_returns is not None:
mu = objective_functions.portfolio_return(
new_weights, expected_returns, negative=False
)
sharpe = objective_functions.sharpe_ratio(
new_weights,
expected_returns,
cov_matrix,
risk_free_rate=risk_free_rate,
negative=False,
)
if verbose:
print("Expected annual return: {:.1f}%".format(100 * mu))
print("Annual volatility: {:.1f}%".format(100 * sigma))
print("Sharpe Ratio: {:.2f}".format(sharpe))
return mu, sigma, sharpe
else:
if verbose:
print("Annual volatility: {:.1f}%".format(100 * sigma))
return None, sigma, None