from collections.abc import Iterable
from collections import namedtuple
from io import StringIO
# from textwrap import fill
import logging
import warnings
import matplotlib.pyplot as plt
from matplotlib import colormaps
import numpy as np
import pandas as pd
import scipy.stats as ss
from scipy.interpolate import interp1d
from scipy.spatial import ConvexHull
from scipy.stats import norm
logger = logging.getLogger(__name__)
phi = norm.cdf
phi_inv = norm.ppf
try:
import numba
njit = numba.njit
except ImportError:
logger.info("Numba not found. Falling back to pure Python.")
def njit(func=None, **kwargs):
if func is None:
return lambda f: f
return func
from .constants import *
from .random_agg import RANDOM
from .utilities import short_hash
# number super-fast functions for TVaR and BiTVaR computations
[docs]
@njit(parallel=False)
def tvar_gS(probs, p):
"""
Compute gS given individual probs. Equivlaent to computing
``tvar.g(1 - probs.cumsum())`` for a tvar distortion.
Runs about 4x speed of g.g(1-probs.cumsum()) for TVaR. Cannot
parallelize because of shared S variable.
:param probs: numpy array of probabilities.
:param p: float, distortion parameter.
"""
# shared S variable mean you cannot parallelize
S = 1.
ans = np.zeros_like(probs)
if p == 0:
# mean
for i in range(len(probs)):
S -= probs[i]
if S <= 0:
ans[i] = 0
elif S >= 1:
ans[i] = 1
else:
ans[i] = S
elif p == 1:
for i in range(len(probs)):
S -= probs[i]
if S > 0:
ans[i] = 1
else:
ans[i] = 0
else:
# proper tvar
s = 1 - p
m = 1 / s
for i in range(len(probs)):
S -= probs[i]
if S < s:
ans[i] = m * S
else:
ans[i] = 1
return ans
[docs]
@njit(parallel=False)
def bitvar_gS(probs, p0, p1, w):
"""
Compute gS given individual probs. Equivlaent to computing
``bitvar.g(1 - probs.cumsum())`` for a bitvar distortion.
As usual, p0 < p1, w in (0,1) input parameters.
See OneNote 2024/November Numba TVaR and BiTVaR.
:param probs: numpy array of probabilities.
:param p0: float, distortion parameter.
:param p1: float, distortion parameter.
:param w: float, weighting to p1.
"""
if p0 == p1:
# actually a tvar
return tvar_gS(probs, p0)
if w == 0:
return tvar_gS(probs, p0)
if w == 1:
return tvar_gS(probs, p1)
# now weight is 0 < w < 1
# now there are four cases (0,1), (>0, 1), (0, <1), (>0, <1)
s0 = 1 - p0
s1 = 1 - p1
S = 1.
ans = np.zeros_like(probs)
# height at kink
pt = w + (1 - w) * s1 / s0
if p0 == 0 and p1 == 1:
slope = (1 - w)
for i in range(len(probs)):
S -= probs[i]
# here and below say <= 0 to allow for
# numerical cumulation error which can result
# in S very small < 0
if S <= 0:
ans[i] = 0
else:
ans[i] = w + slope * S
elif p0 > 0 and p1 == 1:
slope = (1 - w) / s0
for i in range(len(probs)):
S -= probs[i]
if S <= 0:
ans[i] = 0
elif S < s0:
ans[i] = w + slope * S
else:
ans[i] = 1.
elif p0 == 0 and p1 < 1:
# two kinks
slope0 = pt / s1
slope1 = (1 - pt) / p1
for i in range(len(probs)):
S -= probs[i]
if S <= 0:
ans[i] = 0
elif S < s1:
ans[i] = slope0 * S
else:
ans[i] = pt + slope1 * (S - s1)
else:
# p0 > 0 and p1 < 1
# three kinks
slope0 = pt / s1
slope1 = (1 - pt) / (s0 - s1)
for i in range(len(probs)):
S -= probs[i]
if S <= 0:
ans[i] = 0
elif S < s1:
ans[i] = slope0 * S
elif S < s0:
ans[i] = pt + slope1 * (S - s1)
else:
ans[i] = 1.
return ans
[docs]
@njit(parallel=False)
def tvar_ra(probs, x, p):
"""
Computes risk adjusted value in one step with no intermediate
arrays or copies.
Equivalent to::
dx = np.diff(x)
(g.g(1 - probs.cumsum())[:-1] * dx).sum() + x[0]
or to::
den = pd.Series(probs, index=x)
tvar.price_ex(den, kind='both', calculation='dx', as_frame=True)
Detects if it is computing the mean (p==0) or the maximum (p==1)
and shortcuts accordingly.
:param probs: numpy array of probabilities.
:param x: numpy array loss outcomes, must be in ascending order.
:param p: float, distortion parameter.
"""
# shared S variable mean you cannot parallelize
S = 1.
lastx = x[0]
ans = lastx # !! remember the shift!
if p == 0:
# mean
ans = np.sum(probs * x)
# for i in range(len(probs) - 1):
# S -= probs[i]
# dx = x[i + 1] - lastx
# lastx = x[i + 1]
# if S <= 0:
# pass
# elif S >= 1:
# ans += dx
# else:
# ans += S * dx
elif p == 1:
# max
ans = x[-1]
# for i in range(len(probs) - 1):
# S -= probs[i]
# dx = x[i + 1] - lastx
# lastx = x[i + 1]
# if S > 0:
# ans += dx
else:
# proper tvar
s = 1 - p
m = 1 / s
for i in range(len(probs) - 1):
S -= probs[i]
dx = x[i + 1] - lastx
lastx = x[i + 1]
if S < s:
ans += m * S * dx
else:
ans += dx
return ans
[docs]
@njit(parallel=False)
def bitvar_ra(probs, x, p0, p1, w):
"""
Computes risk adjusted value in one step with no intermediate
arrays or copies.
Equivalent to::
dx = np.diff(x)
(g.g(1 - probs.cumsum())[:-1] * dx).sum() + x[0]
or to::
den = pd.Series(probs, index=x)
tvar.price_ex(den, kind='both', calculation='dx', as_frame=True)
:param probs: numpy array of probabilities.
:param x: numpy array of loss outcomes, must be in ascending order.
:param p0: float, distortion parameter.
:param p1: float, distortion parameter.
:param w: float, weighting to p1.
"""
if p0 == p1:
# actually a tvar
return tvar_ra(probs, x, p0)
if w == 0:
return tvar_ra(probs, x, p0)
if w == 1:
return tvar_ra(probs, x, p1)
# now weight is 0 < w < 1
# now there are four cases (0,1), (>0, 1), (0, <1), (>0, <1)
s0 = 1 - p0
s1 = 1 - p1
S = 1.
lastx = x[0]
ans = lastx
# height at kink
pt = w + (1 - w) * s1 / s0
if p0 == 0 and p1 == 1:
# weights mean and max
m = np.sum(probs * x)
x = x[-1]
ans = w * x + (1 - w) * m
# slope = (1 - w)
# for i in range(len(probs) - 1):
# S -= probs[i]
# dx = x[i + 1] - lastx
# lastx = x[i + 1]
# # here and below say <= 0 to allow for
# # numerical cumulation error which can result
# # in S very small < 0
# if S <= 0:
# pass
# else:
# ans += (w + slope * S) * dx
elif p0 > 0 and p1 == 1:
m0 = tvar_ra(probs, x, p0)
ans = w * x[-1] + (1 - w) * m0
# slope = (1 - w) / s0
# for i in range(len(probs) - 1):
# S -= probs[i]
# dx = x[i + 1] - lastx
# lastx = x[i + 1]
# if S <= 0:
# pass
# elif S < s0:
# ans += (w + slope * S) * dx
# else:
# ans += dx
elif p0 == 0 and p1 < 1:
m1 = tvar_ra(probs, x, p1)
ans = w * m1 + (1 - w) * np.sum(probs * x)
# two kinks
# slope0 = pt / s1
# slope1 = (1 - pt) / p1
# for i in range(len(probs) - 1):
# S -= probs[i]
# dx = x[i + 1] - lastx
# lastx = x[i + 1]
# if S <= 0:
# pass
# elif S < s1:
# ans += (slope0 * S) * dx
# else:
# ans += (pt + slope1 * (S - s1)) * dx
else:
# p0 > 0 and p1 < 1
# three kinks
slope0 = pt / s1
slope1 = (1 - pt) / (s0 - s1)
for i in range(len(probs) - 1):
S -= probs[i]
dx = x[i + 1] - lastx
lastx = x[i + 1]
if S <= 0:
pass
elif S < s1:
ans += (slope0 * S) * dx
elif S < s0:
ans += (pt + slope1 * (S - s1)) * dx
else:
ans += dx
return ans
[docs]
class Distortion(object):
"""
Creation and management of distortion functions.
0.9.4: renamed roe to ccoc, but kept creator with roe for backwards compatibility.
Oct 2022: renamed wtdtvar to bitvar, but kept ...
Sep 2024: added a proper implementation of wtdtvar and a random_distortion using it
Note, to create a fake Distortion use a type:
g = type('Distortion', (),
{'g': your function,
'g_inv': your function,
'g_dual': lambda x: 1 - g.g(1-x)}
)
"""
# make these (mostly) immutable...avoid changing by mistake
_available_distortions_ = ('ph', 'wang', 'cll', 'lep', 'ly', 'clin', 'dual', 'ccoc', 'tvar',
'bitvar', 'wtdtvar', 'convex', 'tt', 'beta')
_has_mass_ = ('ly', 'clin', 'lep', 'roe')
_med_names_ = ("Prop Hzrd", "Wang", 'Capd Loglin', "Lev Equiv", "Lin Yield", "Capped Linear", "Dual Mom",
'Const CoC', "Tail VaR", 'BiTVaR', 'WtdTVaR', "Convex Env", 'Wang-tt', 'Beta')
_long_names_ = ("Proportional Hazard", "Wang-normal", 'Capped Loglinear', "Leverage Equivalent Pricing",
"Linear Yield", "Capped Linear", "Dual Moment", "Constant CoC", "Tail VaR",
'BiTVaR', 'Weighted TVaR', "Convex Envelope", 'Wang-tt', 'Beta')
# TODO fix examples!
# _available_distortions_ = ('ph', 'wang', 'cll', 'lep', 'ly', 'clin', 'dual', 'ccoc', 'tvar', 'wtdtvar, 'convex')
_eg_param_1_ = (.9, .1, .9, 0.25, 0.8, 1.1, 1.5, .1, 0.15, .15) # noqa
_eg_param_2_ = (.5, .75, .5, 0.5, 1.5, 1.8, 3, .25, 0.50, .5) # noqa
# _distortion_names_ = dict(zip(_available_distortions_, _med_names_))
_distortion_names_ = dict(zip(_available_distortions_, _long_names_))
renamer = _distortion_names_
[docs]
@staticmethod
def tvar_terms(p_in):
"""
Evaluate tvar function min(s / (1-p), 1),
allowing for possibility p=1 and using vector input.
s = 1 - p in reverse order. This evaluates
the "knot" points needed to create a general weighted
TVaR.
"""
n = len(p_in)
p = p_in.reshape((n, 1))
s = (1 - p_in[::-1]).reshape((1, n))
return np.where(s==0,
np.zeros_like(p),
np.where(p==1,
np.ones_like(p),
np.minimum(s / (1 - p), 1)))
[docs]
@classmethod
def available_distortions(cls, pricing=True, strict=True):
"""
List of the available distortions.
:param pricing: only return list suitable for pricing, excludes tvar and convex
:param strict: only include those without mass at zero (pricing only)
:return:
"""
if pricing and strict:
return tuple((i for i in cls._available_distortions_[:-5] if i not in cls._has_mass_))
elif pricing:
return cls._available_distortions_[:-3]
else:
return cls._available_distortions_
# Custom __getstate__ to save named constructor arguments
def __getstate__(self):
# Save the constructor arguments in a state dictionary with names
# run through shape
if isinstance(self.shape, Iterable):
loc_shape = []
for x in self.shape:
if isinstance(x, Distortion):
loc_shape.append(x.__getstate__())
else:
loc_shape.append(x)
else:
loc_shape = self.shape
state = {
'name': self._name,
'shape': loc_shape,
'r0': self.r0,
'df': self.df,
'col_x': self.col_x,
'col_y': self.col_y,
'display_name': self.display_name
}
return state
def __setstate__(self, state):
# Unpack the state dictionary and pass it to the constructor
# Use __dict__.update() to restore the state...usual but doesn't work
# because of possible iteration in shape
# self.__dict__.update(state)
self._name = state['name']
self.r0 = state['r0']
self.df = state['df']
self.col_x = state['col_x']
self.col_y = state['col_y']
self.display_name = state['display_name']
# deal with complex shape
if isinstance(state['shape'], Iterable):
loc_shape = []
for x in state['shape']:
if isinstance(x, dict):
loc_shape.append(Distortion(**x))
else:
loc_shape.append(x)
self.shape = np.array(loc_shape)
else:
self.shape = state['shape']
# now complete the init
self._complete_init()
[docs]
def __init__(self, name, shape, r0=0.0, df=None, col_x='', col_y='', display_name=''):
"""
Create a new distortion.
For the beta distribution:
* A synthesis of risk measures for capital adequacy
* Wirch and Hardy, IME 1999
* 0<a<=1, b>= 1
* b=1 is a PH with rho = 1/a
* a=1 is a dual with rho = b
Tester: ::
ps = np.linspace(0, 1, 201)
for dn in agg.Distortion.available_distortions(True):
if dn=='clin':
# shape param must be > 1
g_dist = agg.Distortion(**{'name': dn, 'shape': 1.25, 'r0': 0.02, 'df': 5.5})
else:
g_dist = agg.Distortion(**{'name': dn, 'shape': 0.5, 'r0': 0.02, 'df': 5.5})
g_dist.plot()
g = g_dist.g
g_inv = g_dist.g_inv
df = pd.DataFrame({'p': ps, 'gg_inv': g(g_inv(ps)), 'g_invg': g_inv(g(ps)),
'g': g(ps), 'g_inv': g_inv(ps)})
print(dn)
print("errors")
display(df.query(' abs(gg_inv - g_invg) > 1e-5'))
:param name: name of an available distortion, call ``Distortion.available_distortions()`` for a list
:param shape: float or [float, float] for beta
:param shape: shape parameter
:param r0: risk free or rental rate of interest
:param df: for convex envelope, dataframe with col_x and col_y used to parameterize or df for t
:param col_x:
:param col_y:
:param display_name: over-ride name, useful for parameterized convex fix distributions
"""
# constructor arguments (e.g., needed for pickling)
self._name = name
self._kind_immutable = name
self.r0 = r0
self.df = df
self.col_x = col_x
self.col_y = col_y
self.display_name = display_name
self.shape = shape
self.min_index = None # used for minimum distortion, returns "active" distortion index
self._complete_init()
[docs]
def _complete_init(self):
"""
"""
# when created by calibrate distortions extra info put here
self.error = 0.0
self.premium_target = 0.0
self.assets = 0.0
self.mass = 0.0
g_prime = None
# for the usual suspects only
standard_shape = np.nan
# now make g and g_inv
if self._name == 'ph':
rho = self.shape
rhoinv = 1.0 / rho # noqa
self.has_mass = False
standard_shape = (1 - self.shape) / (1 + self.shape)
# @numba.vectorize(["float64(float64)"], nopython=True, target='parallel')
def g(x):
return x ** rho
def g_inv(x):
return x ** rhoinv
def g_prime(x):
return np.where(x > 0, rho * x ** (rho - 1.0), np.inf)
elif self._name == 'beta':
a, b = self.shape
assert 0 < a <= 1, f'a parameter must be in (0,1], not {a}'
assert b >= 1, f'b parameter must be >= 1, not {b}'
fz = ss.beta(a, b)
self.has_mass = False
def g(x):
return fz.cdf(x)
def g_inv(x):
return fz.ppf(x)
def g_prime(x):
return fz.pdf(x)
elif self._name == 'wang':
lam = self.shape
n = ss.norm()
self.has_mass = False
standard_shape = 2 * n.cdf(self.shape / 2 ** 0.5) - 1
def g(x):
return n.cdf(n.ppf(x) + lam)
def g_inv(x):
return n.cdf(n.ppf(x) - lam)
def g_prime(x):
return n.pdf(n.ppf(x) + lam) / n.pdf(n.ppf(x))
elif self._name == 'tt':
lam = self.shape
t = ss.t(self.df)
self.has_mass = False
def g(x):
return t.cdf(t.ppf(x) + lam)
def g_inv(x):
return t.cdf(t.ppf(x) - lam)
elif self._name == 'cll':
# capped log linear
b = self.shape
binv = 1 / b
ea = np.exp(self.r0)
a = self.r0
self.has_mass = False
def g(x):
return np.where(x==0, 0, np.minimum(1, ea * x ** b))
def g_inv(x):
return np.where(x < 1, np.minimum(1, (x / ea) ** binv), 1)
elif self._name == 'tvar':
p = self.shape
standard_shape = self.shape
if p == 1:
# aka max
alpha = np.nan
self.has_mass = True
self.mass = 1
def g(x):
# <= 0 handles rounding issues gracefully
return np.where(x<=0, 0, 1)
def g_inv(x):
return np.where(x < 1, x * (1 - p), 1)
def g_prime(x):
return np.where(x < 1 - p, alpha, 0)
else:
# proper tvar or mean
alpha = 1 / (1 - p)
self.has_mass = False
def g(x):
return np.minimum(alpha * x, 1)
def g_inv(x):
return np.where(x < 1, x * (1 - p), 1)
def g_prime(x):
return np.where(x <= 1 - p, alpha, 0)
elif self._name == 'ly':
# linear yield
# r0 = occupancy; rk = consumption specified in list shape parameter
rk = self.shape
self.has_mass = (self.r0 > 0)
self.mass = self.r0 / (1 + self.r0)
def g(x):
return np.where(x == 0, 0, (self.r0 + x * (1 + rk)) / (1 + self.r0 + rk * x))
def g_inv(x):
return np.maximum(0, (x * (1 + self.r0) - self.r0) / (1 + rk * (1 - x)))
elif self._name == 'clin':
# capped linear, needs shape > 1 to make sense...needs shape >= 1-r0 else
# problems at 1
sl = self.shape
self.has_mass = (self.r0 > 0)
self.mass = self.r0
def g(x):
return np.where(x == 0, 0, np.minimum(1, self.r0 + sl * x))
def g_inv(x):
return np.where(x <= self.r0, 0, (x - self.r0) / sl)
elif self._name == 'roe' or self._name == 'ccoc':
# constant roe = capped linear with shape = 1/(1+r), r0=r/(1+r)
# r = target roe
self._name = 'ccoc'
r = self.shape
v = 1 / (1 + r)
d = 1 - v
self.has_mass = (d > 0)
self.mass = d
standard_shape = self.shape / (1 + self.shape)
def g(x):
return np.where(x == 0, 0, np.minimum(1, d + v * x))
def g_inv(x):
return np.where(x <= d, 0, (x - d) / v)
def g_prime(x):
return v
elif self._name == 'lep':
# leverage equivalent pricing
# self.r0 = risk free/financing and r = risk charge (the solved parameter)
r = self.shape
delta = r / (1 + r)
d = self.r0 / (1 + self.r0)
spread = delta - d
self.has_mass = (d > 0)
self.mass = d
def g(x):
return np.where(x == 0, 0, np.minimum(1, d + (1 - d) * x + spread * np.sqrt(x * (1 - x))))
spread2 = spread ** 2
a = (1 - d) ** 2 + spread2
def g_inv(y):
mb = (2 * (y - d) * (1 - d) + spread2) # mb = -b
c = (y - d) ** 2
rad = np.sqrt(mb * mb - 4 * a * c)
# l = (mb + rad)/(2 * a)
u = (mb - rad) / (2 * a)
return np.where(y < d, 0, np.maximum(0, u))
elif self._name == 'dual':
# dual moment
p = self.shape
q = 1 / p
self.has_mass = False
standard_shape = (self.shape - 1) / (self.shape + 1)
def g(x):
return 1 - (1 - x)**p
def g_inv(y):
return 1 - (1 - y)**q
def g_prime(x):
return p * (1 - x)**(p - 1)
elif self._name == 'power':
# power distortion
# shape = alpha, df = [x0, x1]
# created from part of power function distribution
# compare Bernegger approach
# allows to create distortions with controlled slopes at 0 and 1.
alpha = float(self.shape) # numpy complains about powers of integers
x0, x1 = self.df
assert x0 < x1, 'Error: x0 must be less than x1'
self.has_mass = False
self.mass = 0
if alpha != 1:
bl = np.power(x1, -alpha + 1)
br = np.power(x0, -alpha + 1)
def g(s):
"""
f(x) = x^-alpha, F(x)=int_{x_0}^x f
g(s) = F(x0 + s(x1-x0)) / F(x1)
alpha \\ge 0 required
x0, x1 are other parameters to determine slopes at 0, 1
"""
tl = np.power(x0 + s * (x1 - x0), -alpha + 1)
return (tl - br) / (bl - br)
def g_prime(s):
"""
Derivative of g
"""
tl = np.power(x0 + s * (x1 - x0), -alpha)
return (1 - alpha) * (x1 - x0) * tl / (bl - br)
def g_inv(s):
"""
Inverse of g
"""
t1 = np.power(s * (bl - br) + br, 1 / (1 - alpha))
return (t1 - x0) / (x1 - x0)
else:
bl = np.log(x1)
br = np.log(x0)
def g(s):
t = np.log(x0 + s * (x1 - x0))
return (t - br) / (bl - br)
def g_prime(s):
return (x1 - x0) / (bl - br) / (x0 + s * (x1 - x0))
def g_inv(s):
return (np.exp(s * (bl - br) + br) - x0) / (x1 - x0)
elif self._name == 'bitvar':
# bitvar tvar, df = p0 <p1, shape = weight on p1
try:
p0, p1 = self.df
assert p0 < p1, 'Error: p0 must be less than p1'
w = self.shape
self.has_mass = (p1 == 1)
self.mass = w if p1 == 1 else 0
pt = (1 - p1) / (1 - p0) * (1 - w) + w
# if p1==1 there is a mass, so have to be careful to put that in
if p1 == 1:
# if p1 = 1 the result is a TVaR + max
alpha = 1 / (1 - p0)
def g(x):
# <= 0 handles rounding issues gracefully
return w * np.where(x<=0, 0, 1) + (1 - w) * np.minimum(alpha * x, 1)
# this stupid code with fixed number wasted two days of my life.
# but we don't really use g_inv so...
s = np.array([0., 1e-50, 1-p0, 1.])
gs = np.array([0., pt, 1., 1.])
g_inv = interp1d(gs, s, kind='linear', bounds_error=False, fill_value=(0,1))
else:
s = np.array([0., 1 - p1, 1-p0, 1.])
gs = np.array([0., pt, 1., 1.])
g = interp1d(s, gs, kind='linear', bounds_error=False, fill_value=(0,1))
g_inv = interp1d(gs, s, kind='linear', bounds_error=False, fill_value=(0,1))
if p1 < 1:
def g_prime(x):
return np.where(x > 1 - p0, 0,
np.where(x < 1 - p1,
w / (1 - p1) + (1 - w) / (1 - p0),
(1 - w) / (1 - p0)))
else:
# p1==1, don't want 1/(1-p1)
def g_prime(x):
return np.where(x > 1 - p0, 0, (1 - w) / (1 - p0))
except:
raise ValueError('Inadmissible parameters to Distortion for bitvar. '
'Pass shape=wt for p1 and df=[p0, p1]')
elif self._name == 'wtdtvar':
# Create the weighted tvar g and g_inv.
# ps = p values of TVaRs passed as shape
# wts = weights, must sum to 1, passed as df
# extract inputs
ps = np.array(self.shape)
wts = np.array(self.df)
if 1 in ps and wts[-1] > 0:
self.has_mass = True
self.mass = wts[-1]
else:
self.has_mass = False
self.mass = 0
if self.display_name == '':
try:
self.display_name = f'wtdTVaR on {len(ps):d} points'
except:
print(f'ERROR: {ps = }, {ps = }')
self.display_name = f'wtdTVaR on UNKNOWN points'
# data checks
# check sorted
assert np.all(ps[:-1] < ps[1:]), 'Error: ps must be sorted ascending.'
# ok, sum to 1 is a user requirement, this is too painful!
# swts = wts.sum()
# assert (swts==1
# or np.nextafter(swts, np.inf)==1
# or np.nextafter(swts, -np.inf)==1
# ), f'Error: weights must sum to 1. Entered weights sum to {wts.sum()}'
# must ensure 0 and 1 are in ps; add with zero weights
# if missing. This ensures resulting function has
# g(0)=0 and g(1)=1
if 0 not in ps:
ps = np.insert(ps, 0, 0)
wts = np.insert(wts, 0, 0)
if 1 not in ps:
ps = np.append(ps, 1)
wts = np.append(wts, 0)
else:
# if input with a mass at 1 need to adjust
# so the interpolation "sees" it
ps[-1] = np.nextafter(1, -1)
ps = np.append(ps, 1)
wts = np.append(wts, 0)
# evaluate at knot points and weighted tvar at knot points
s = 1 - ps[::-1]
gs = wts @ Distortion.tvar_terms(ps)
g = interp1d(s, gs, kind='linear', bounds_error=False, fill_value=(0,1))
g_inv = interp1d(gs, s, kind='linear', bounds_error=False, fill_value=(0,1))
# want to compute derivatives exactly
def g_prime(s):
# gemini 3 pro
s = 1 - np.atleast_1d(s)
# Calculate slopes for each TVaR component: w_i / (1 - p_i)
# Handle division by zero if p=1 (results in inf, handled by logic below)
component_slopes = np.divide(wts, 1.0 - ps, where=(ps < 1.0), out=np.full_like(ps, np.inf))
# Broadcast s against ps
# s_col shape: (..., 1), ps shape: (N,)
s_col = s[..., np.newaxis]
# TVaR_p(s) derivative is 1/(1-p) if s > p, else 0
# Sum contributions where s > p_i
active_mask = s_col > ps
grad = np.sum(np.where(active_mask, component_slopes, 0.0), axis=-1)
# Check for cusps (s == p_i) ??
is_cusp = np.any(np.isclose(s_col, ps, rtol=1e-14, atol=1e-14), axis=-1)
grad[is_cusp] = np.nan
return grad
elif self._name == 'minimum':
# min of several distortion; shape is list of Distortions
self.has_mass = np.all([d.has_mass for d in self.shape])
if self.has_mass:
self.mass = np.min([d.mass for d in self.shape])
else:
self.mass = 0
if self.display_name == '':
self.display_name = f'Minimum of {len(self.shape):d} distortions'
loc_shape = self.shape # #.copy() ## ? insulates from shape?
def min_index(x):
g_values = np.array([gi.g(x) for gi in loc_shape])
return np.argmin(g_values, axis=0)
self.min_index = min_index
def g(x):
g_values = np.array([gi.g(x) for gi in loc_shape])
return np.min(g_values, axis=0)
def g_prime(x):
"""
Note: you really need to check whether the point x is a
knot point for the minimum function.
"""
# need to see what happens just before 1.
# This is a hack.
x = np.where(x==1, 1 - 1e-15, x)
g_values = np.array([gi.g(x) for gi in loc_shape])
# Find the index of the minimum value
min_index = np.argmin(g_values, axis=0)
# Return the derivative (g_prime) of the object that achieved the minimum
if np.isscalar(min_index):
return loc_shape[min_index].g_prime(x)
elif np.isscalar(x):
return np.array([loc_shape[i].g_prime(x) for i in min_index])
else:
return np.array([loc_shape[i].g_prime(xi) for i, xi in zip(min_index, x)])
def g_inv(y):
"""
max inverse value is inverse of min: draw a picture to see why
"""
inv_values = np.array([gi.g_inv(y) for gi in loc_shape])
# Return the maximum of these inverse values
return np.max(inv_values, axis=0)
elif self._name == 'mixture':
# mixture of several distortion; shape is list of Distortions
# and weights are passed in df
# reconstuct if needed (pickle)
loc_shape = []
for x in self.shape:
if isinstance(x, Distortion):
loc_shape.append(x)
else:
loc_shape.append(Distortion(**x))
self.has_mass = np.any([d.has_mass for d in loc_shape])
if self.has_mass:
self.mass = np.sum([d.mass * w for (d, w) in zip(loc_shape, self.df)])
else:
self.mass = 0
if self.display_name == '':
self.display_name = f'Mixture of {len(loc_shape):d} distortions'
if self.df is None:
loc_wts = self.df = np.array([1 / len(loc_shape)] * len(loc_shape))
else:
loc_wts = np.array(self.df.copy())
def g(x):
g_values = np.array([gi.g(x) for gi in loc_shape])
# at this point, have to be a bit careful about the shape of the arrays
if g_values.ndim > 2:
# Flatten the higher dimensions (n1, n2) into a single dimension
vals_flat = g_values.reshape(len(loc_shape), -1) # shape (2, n1 * n2)
# Perform the matrix multiplication
result_flat = loc_wts @ vals_flat # shape (n1 * n2)
# Reshape the result back to the original shape (n1, n2)
result = result_flat.reshape(g_values.shape[1], g_values.shape[2])
else:
# vals is already 2D, just apply the @ operation directly
result = loc_wts @ g_values
return result
def g_prime(x):
g_values = np.array([gi.g_prime(x) for gi in loc_shape])
# at this point, have to be a bit careful about the shape of the arrays
if g_values.ndim > 2:
# Flatten the higher dimensions (n1, n2) into a single dimension
vals_flat = g_values.reshape(len(loc_shape), -1)
result_flat = loc_wts @ vals_flat # shape (n1 * n2)
result = result_flat.reshape(g_values.shape[1], g_values.shape[2])
else:
# vals is already 2D, just apply the @ operation directly
result = loc_wts @ g_values
return result
def g_inv(y):
"""
Inverse of mixture...Ö«
"""
raise NotImplementedError('Inverse of mixture not implemented')
elif self._name == 'convex':
# convex envelope and general interpolation
# NOT ALLOWED to have a mass...why not???
# evil - use shape to indicate if mass at zero
if self.shape > 0:
self.has_mass = True
self.mass = self.shape
# use shape for number of points in calibrating data set
if self.display_name == '':
self.display_name = f'Convex on {len(self.df):d} points'
if not (0 in self.df[self.col_x].values and 1 in self.df[self.col_x].values):
# painful...always want 0 and 1 there...but don't know what other columns in df
# logger.debug('df does not contain s=0/1...adding')
self.df = self.df[[self.col_x, self.col_y]].copy().reset_index(drop=True)
self.df.loc[len(self.df)] = (0,0)
self.df.loc[len(self.df)] = (1,1)
self.df = self.df.sort_values(self.col_x)
if len(self.df) > 2:
hull = ConvexHull(self.df[[self.col_x, self.col_y]])
knots = list(set(hull.simplices.flatten()))
g = interp1d(self.df.iloc[knots, self.df.columns.get_loc(self.col_x)],
self.df.iloc[knots, self.df.columns.get_loc(self.col_y)], kind='linear',
bounds_error=False, fill_value=(0,1))
g_inv = interp1d(self.df.iloc[knots, self.df.columns.get_loc(self.col_y)],
self.df.iloc[knots, self.df.columns.get_loc(self.col_x)], kind='linear',
bounds_error=False, fill_value=(0,1))
else:
self.df = self.df.sort_values(self.col_x)
g = interp1d(self.df[self.col_x], self.df[self.col_y], kind='linear',
bounds_error=False, fill_value=(0,1))
g_inv = interp1d(self.df[self.col_y], self.df[self.col_x], kind='linear',
bounds_error=False, fill_value=(0,1))
else:
print(self._name, self.shape, self.df)
raise ValueError(
f"Incorrect spec passed to Distortion; name={self._name}, shape={self.shape}, r0={self.r0}, df={self.df}")
self.standard_shape = standard_shape
self.g = g
self.g_inv = g_inv
if g_prime is None:
g_prime = lambda x: (g(x + 1e-6) - g(x - 1e-6)) / 2e-6
self.g_prime = g_prime
[docs]
def tvar_info_df(self):
"""
Return dataframe of information for wtdtvar, convex type distortions.
p, wts
knots = 1 - p (where g has kinks)
slopes, intercepts = for each affine part, starting at the knot point
TODO: implement for other types?
"""
# if self._kind_immutable not in {'tvar', 'wtdtvar', 'bitvar', 'ccoc'}:
if self._kind_immutable != 'wtdtvar':
return None
p = np.array(self.shape)
wts = np.array(self.df)
# knots = knots[::-1]
if 0 not in p:
p = np.hstack((0, p))
wts = np.hstack((0, wts))
if 1 not in p:
p = np.hstack((p, 1))
wts = np.hstack((wts, 0))
knots = 1 - p
gs = self.g(knots)
df = pd.DataFrame({
's': knots,
'gs': gs,
'p': p,
'wts': wts,
})
df = df.sort_values('s').reset_index(drop=True)
# slope and intercept
df['slope'] = (df.gs.shift(-1) - df.gs) / (df.s.shift(-1) - df.s)
df['intercept'] = df.gs - df.slope * df.s
# need to adjust if there is a mass
if p[-1] == 1 and wts[-1] > 0:
df.loc[0, 'slope'] = (df.gs[1] - df.wts[0]) / df.s[1]
df.loc[0, 'intercept'] = df.wts[0]
# add elasticities either side of cusps
def eta(s):
s = np.asarray(s)
return np.where(s == 0, 1,
np.where(s == 1, 0, s * self.g_prime(s) / self.g(s)))
# use mid points of regions to avoid an arb epsilon that may not
# be small enough
df['gprime-'] = self.g_prime((df.s + df.s.shift(1)) / 2)
df['gprime+'] = self.g_prime((df.s + df.s.shift(-1)) / 2)
df['eta-'] = eta((df.s + df.s.shift(1)) / 2)
df['eta+'] = eta((df.s + df.s.shift(-1)) / 2)
# if df.loc[0, 'wts'] == 0:
# df = df.drop(index=0)
return df
[docs]
def plot_affine(self, ax=None, n_pts=101,
cmap_name='viridis', alpha=1.,
marker='o', marker_size=4):
"""Render the upper affine envelope of g for wtdtvars."""
if self._kind_immutable != 'wtdtvar':
return None
ax = self.plot(both=False)
# if ax is None:
# fig, ax = plt.subplots(1, 1, figsize=(FIG_H, FIG_H))
ps = np.linspace(0, 1, n_pts)
gs = self.g(ps)
df = self.tvar_info_df()
# Generate colors from the colormap
# n / max(1, len - 1) handles the edge case of a single line
n_lines = len(df)
cmap = colormaps.get_cmap(cmap_name)
colors = [cmap(i / max(1, n_lines - 1)) for i in range(n_lines)]
for c, (n, r) in zip(colors, df.iterrows()):
if np.isnan(r.slope): continue
line = r.intercept + r.slope * ps
line = np.where((line>=0) & (line<=1), line, np.nan)
ax.plot(ps, line, lw=0.5, color=c, alpha=alpha)
if len(df) < 20:
ax.scatter(df.s, df.gs,
color=colors,
marker=marker,
s=marker_size,
zorder=3)
return ax
# utility methods to create usual suspects and other common distortions
[docs]
@staticmethod
def tvar(p):
"""
Utility method to create tvar.
"""
return Distortion('tvar', p, display_name=f'TVaR({p:.3g})')
[docs]
@staticmethod
def max():
"""
Utility method to create max = TVaR 1.
"""
p = 1.
return Distortion('tvar', p, display_name='max')
[docs]
@staticmethod
def mean():
"""
Utility method to create mean = TVaR 0.
"""
p = 0.
return Distortion('tvar', p, display_name='mean')
[docs]
@staticmethod
def wang(shape):
"""
Utility method to create wang.
"""
return Distortion('wang', shape, display_name=f'Wang({shape:.3g})')
[docs]
@staticmethod
def ph(shape):
"""
Utility method to create ph.
"""
return Distortion('ph', shape, display_name=f'PH({shape:.3g})')
[docs]
@staticmethod
def dual(shape):
"""
Utility method to create dual.
"""
return Distortion('dual', shape, display_name=f'dual({shape:.3g})')
[docs]
@staticmethod
def bitvar(p0, p1, w=0.5):
"""
Utility method to create bitvar with :math:`p_0 < p_1`. The
weight w is the weight on p1. Remember p=0 corresponds to
the mean and p=1 to the max.
"""
# first check if it is really a TVaR
if p0 == p1:
return Distortion.tvar(p0)
elif w == 0:
return Distortion.tvar(p0)
elif w == 1:
return Distortion.tvar(p1)
else:
return Distortion('bitvar', w, df=[p0, p1], display_name=f'bitvar({p0:.3g}, {p1:.3g}; {w:.3g})')
[docs]
@staticmethod
def ccoc(d):
"""
Utility method to create ccoc with given discount factor. The
default constructor inputs the return r instead of the discount factor d.
d = r / (1 + r).
"""
r = d / (1. - d)
return Distortion('ccoc', r, display_name=f'ccoc({r:.3g})')
[docs]
@staticmethod
def minimum(distortion_list):
"""
Utility method to create minimum of a list of distortions.
"""
return Distortion('minimum', distortion_list,
display_name=f'minimum({len(distortion_list)})')
[docs]
@staticmethod
def mixture(distortion_list, weights=None):
"""
Utility method to create mixture of a list of distortions.
"""
return Distortion('mixture', distortion_list, df=weights,
display_name=f'mixture({len(distortion_list)})')
[docs]
@staticmethod
def beta(a, b):
"""
Utility method to create mixture of a list of distortions.
"""
return Distortion('beta', [a, b], display_name=f'beta({a:.3f}, {b:.3f})')
[docs]
@staticmethod
def power(alpha, x0, x1):
"""
Utility method to create mixture of a list of distortions.
"""
return Distortion('power', alpha, df=[x0, x1],
display_name=f'power({alpha:.3f}, {x0:.3f}, {x1:.3f})')
[docs]
def g_dual(self, x):
"""
The dual of the distortion function g.
"""
return 1 - self.g(1 - x)
[docs]
def id(self):
"""
Unique ID as a short string
"""
# operational end of the hose
bit = {k: v for k, v in self.__dict__.items() if k in ('_name', 'r0', 'df', 'shape', 'col_x', 'col_y')}
return short_hash(str(bit))
def __str__(self):
return self.name
# if self.display_name != '':
# s = self.display_name
# return s
# else:
# return self.__repr__()
# was
# elif isinstance(self.shape, (list, tuple, str)):
# s = f'{self._distortion_names_.get(self._name, self._name)}, {self.shape}'
# else:
# s = f'{self._distortion_names_.get(self._name, self._name)}, {self.shape:.3f}'
# if self._name == 'tt':
# s += f', {self.df:.2f}'
# if self._name == 'wtdtvar':
# s += f', ({self.df[0]:.3f}/{self.df[1]:.3f})'
# elif self.has_mass:
# # don't show the mass for wtdtvar
# s += f', mass {self.mass:.3f}'
# return s
def __repr__(self):
return self.name
# Get the default __repr__ output
# this messes up plot etc. that uses repr as default title
# default_repr = super().__repr__()
# return f'{self.name}: {default_repr}'
# # originally
# if self.has_mass:
# s += f', {self.r0})'
# elif self._name == 'tt':
# s += f', {self.df:.2f})'
# else:
# s += ')'
# return s
@property
def name(self):
return self.display_name if self.display_name != '' else self._name
@name.setter
def name(self, value):
self._name = value
[docs]
def plot(self, xs=None, n=101, both=True, ax=None, plot_points=True, scale='linear',
c=None, c_dual=None, size='small', **kwargs):
"""
Quick plot of the distortion
:param xs:
:param n: length of vector is no xs
:param both: True: plot g and g_dual and add decorations, if False just g and no trimmings.
:param ax:
:param plot_points:
:param scale: linear as usual or return plots -log(gs) vs -logs and inverts both scales
:param size: 'small' or 'large' for size of plot, FIG_H or FIG_W. The default is 'small'.
:param kwargs: passed to matplotlib.plot
:return:
"""
assert scale in ['linear', 'return']
if scale == 'return':
# default not enough for return
xs = 10 ** np.linspace(-10, 0, n)
elif xs is None:
xs = np.hstack((0, np.linspace(1e-10, 1, n)))
y1 = self.g(xs)
y2 = None # keep linter happy
if both:
y2 = self.g_dual(xs)
if ax is None:
if size == 'small':
sz = FIG_H
elif isinstance(size, (float, int)):
sz = size
else:
sz = FIG_W
fig, ax = plt.subplots(1,1, figsize=(sz, sz), layout="constrained")
if c is None:
c = 'C0'
if c_dual is None:
c_dual = 'C1'
if scale == 'linear':
ax.plot(xs, y1, c=c, label=self.name, **kwargs)
if both:
ax.plot(xs, y2, c=c_dual, label='$g\\check$', **kwargs)
ax.plot(xs, xs, color='k', lw=0.5, alpha=0.5)
# ax.plot(xs, xs, lw=0.5, color='C0', alpha=0.5)
elif scale == 'return':
ax.plot(xs, y1, c=c, label=self.name, **kwargs)
if both:
ax.plot(xs, y2, c=c_dual, label=f'Dual {self.name}', **kwargs)
ax.set(xscale='log', yscale='log', xlim=[1/5_000, 1], ylim=[1/5_000, 1])
ax.plot(xs, xs, color='k', lw=0.5, alpha=0.5)
if self._name == 'convex' and plot_points:
if len(self.df) > 50:
alpha = .35
elif len(self.df) > 20:
alpha = 0.6
else:
alpha = 1
if c is None:
c = 'C4'
if scale == 'linear':
ax.scatter(x=self.df[self.col_x], y=self.df[self.col_y], marker='.', s=15, color=c, alpha=alpha)
elif scale == 'return':
ax.scatter(x=1/self.df[self.col_x], y=1/self.df[self.col_y], marker='.', s=15, color=c, alpha=alpha)
ax.set(title=self.name, aspect='equal')
if scale == 'linear':
ax.set(xticks=np.linspace(0, 1, 6), yticks=np.linspace(0, 1, 6))
if both:
ax.legend(loc='upper left', fontsize='x-small')
return ax
[docs]
@classmethod
def test(cls, r0=0.035, df=[0.0, .9]): # noqa default class is mutable
"""
Tester: make some nice plots of available distortions.
:return:
"""
f0, axs0 = plt.subplots(2, 11, figsize=(22, 4), constrained_layout=True, sharex=True, sharey=True)
f1, axs1 = plt.subplots(2, 11, figsize=(22, 4), constrained_layout=True, sharex=True, sharey=True)
axiter0 = iter(axs0.flat)
axiter1 = iter(axs1.flat)
xs = np.linspace(0, 1, 1001)
# zip stops at the shorter of the vectors, so this does not include convex (must be listed last)
# added df for the t; everyone else can ignore it
# rank by order on large lsoses...
for axiter, scale in zip([axiter0, axiter1], ['linear', 'return']):
for name, shape in zip(cls._available_distortions_, cls._eg_param_1_):
dist = Distortion(name, shape, r0, df=df)
dist.plot(xs, ax=next(axiter), scale=scale)
dist = Distortion.convex_example('bond')
dist.plot(xs, ax=next(axiter), scale=scale)
# order will look better like this
for name, shape in zip(cls._available_distortions_, cls._eg_param_2_):
dist = Distortion(name, shape, r0, df=df)
dist.plot(xs, ax=next(axiter), scale=scale)
dist = Distortion.convex_example('cat')
dist.plot(xs, ax=next(axiter), scale=scale)
# tidy up
for ax in axiter0:
f0.delaxes(ax)
for ax in axiter1:
f1.delaxes(ax)
f0.suptitle('Example Distortion Functions - Linear Scale')
f1.suptitle('Example Distortion Functions - Return Scale')
[docs]
@staticmethod
def distortions_from_params(params, index, r0=0.025, df=5.5, pricing=True, strict=True):
"""
Make set of dist funs and inverses from params, output of port.calibrate_distortions.
params must just have one row for each method and be in the output format of cal_dist.
Called by Portfolio.
:param index:
:param params: dataframe such that params[index, :] has a [lep, param] etc.
pricing=True, strict=True: which distortions to allow
df for t distribution
:param r0: min rol parameters
:param strict:
:param pricing:
:return:
"""
temp = params.loc[index, :]
dists = {}
for dn in Distortion.available_distortions(pricing=pricing, strict=strict):
param = float(temp.loc[dn, 'param'])
dists[dn] = Distortion(name=dn, shape=param, r0=r0, df=df)
return dists
[docs]
@staticmethod
def convex_example(source='bond'):
"""
Example convex distortion using data from https://www.bis.org/publ/qtrpdf/r_qt0312e.pdf.
:param source: bond gives a bond yield curve example, cat gives cat bond / cat reinsurance pricing based example
:return:
"""
if source == 'bond':
yield_curve = '''
AAAA 0.000000 0.000000
AAA 0.000018 0.006386
AA 0.000144 0.007122
A 0.000278 0.010291
BBB 0.002012 0.017089
BB 0.012674 0.036455
B 0.040052 0.069181
Z 1.000000 1.000000'''
df = pd.read_fwf(StringIO(yield_curve))
df.columns = ['Rating', 'EL', 'Yield']
return Distortion('convex', 'Yield Curve', df=df, col_x='EL', col_y='Yield')
elif source.lower() == 'cat':
cat_bond = '''EL,ROL
0.116196,0.32613
0.088113,0.2452
0.074811,0.22769
0.056385,0.17131
0.046923,0.15326
0.032961,0.12222
0.02807,0.11037
0.024205,0.1022
0.011564,0.07284
0.005813,0.06004
0,0
1,1'''
df = pd.read_csv(StringIO(cat_bond))
return Distortion('convex', 'Cat Bond', df=df, col_x='EL', col_y='ROL')
else:
raise ValueError(f'Inadmissible value {source} passed to convex_example, expected yield or cat')
[docs]
@staticmethod
def bagged_distortion(data, proportion, samples, display_name=""):
"""
Make a distortion by bootstrap aggregation (Bagging) resampling, taking the convex envelope,
and averaging from data.
Each sample uses proportion of the data.
Data must have two columns: EL and Spread
:param data:
:param proportion: proportion of data for each sample
:param samples: number of resamples
:param display_name: display_name of created distortion
:return:
"""
df = pd.DataFrame(index=np.linspace(0,1,10001), dtype=float)
for i in range(samples):
rebit = data.sample(frac=proportion, replace=False, random_state=RANDOM)
rebit.loc[-1] = [0, 0]
rebit.loc[max(rebit.index)+1] = [1, 1]
d = Distortion('convex', 0, df=rebit, col_x='EL', col_y='Spread')
df[i] = d.g(df.index)
df['avg'] = df.mean(axis=1)
df2 =df['avg'].copy()
df2.index.name = 's'
df2 = df2.reset_index(drop=False)
d = Distortion('convex', 0, df=df2, col_x='s', col_y='avg', display_name=display_name)
return d
[docs]
@staticmethod
def average_distortion(data, display_name, n=201, el_col='EL', spread_col='Spread'):
"""
Create average distortion from (s, g(s)) pairs. Each point defines a wtdTVaR with
p=s and p=1 points.
:param data:
:param display_name:
:param n: number of s values (between 0 and max(EL), 1 is added
:param el_col: column containing EL
:param spread_col: column containing Spread
:return:
"""
els = data[el_col]
spreads = data[spread_col]
max_el = els.max()
s = np.hstack((np.linspace(0, max_el, n), 1))
ans = np.zeros((len(s), len(data)))
for i, el, spread in zip(range(len(data)), els, spreads):
p = 1 - el
w = (spread - el) / (1 - el)
d = Distortion('wtdtvar', w, df=[0,p])
ans[:, i] = d.g(s)
df = pd.DataFrame({'s': s, 'gs': np.mean(ans, 1)})
dout = Distortion('convex', None, df=df, col_x='s', col_y='gs', display_name=display_name)
return dout
# replacedc with wtdtvar type Distortions
# @staticmethod
# def wtd_tvar(ps, wts, display_name='', details=False):
# """
# A careful version of wtd tvar with knots at ps and wts.
#
# :param ps:
# :param wts:
# :param display_name:
# :param details:
# :return:
# """
#
# # evaluate at 0, 1 and all the knot points
# ps0 = np.array(ps)
# s = np.array(sorted(set((0.,1.)).union(1-ps0)))
# s = s.reshape(len(s), 1)
#
# wts = np.array(wts).reshape(len(wts), 1)
# if np.sum(wts) != 1:
# wts = wts / np.sum(wts)
# ps = np.array(ps).reshape(1, len(ps))
#
# gs = np.where(ps == 1, 1, np.minimum(s / (1 - ps), 1)) @ wts
#
# d = Distortion.s_gs_distortion(s, gs, display_name)
# if details:
# return d, s, gs
# else:
# return d
[docs]
@staticmethod
def s_gs_distortion(s, gs, display_name=''):
"""
Make a convex envelope distortion from {s, g(s)} points.
TODO: allow mass at zero; currently shape=0 passes to no mass at zero
even if s,gs implies one. LAZY
:param s: iterable (can be converted into numpy.array
:param gs:
:param display_name:
:return:
"""
s = np.array(s)
gs = np.array(gs)
return Distortion('convex', 0, df=pd.DataFrame({'s': s.flat, 'gs': gs.flat}),
col_x='s', col_y='gs', display_name=display_name)
[docs]
@staticmethod
def random_distortion_ex(n=1, random_state=None):
"""
Random distortion over types.
Example usage and test::
ans = Distortion.random_distortion_ex(24)
f, axs = plt.subplots(6, 4, figsize=(4*1.5, 6*1.5), layout='constrained')
for (n, g), ax in zip(ans.items(), axs.flat):
g.plot(ax=ax, both=False)
Generalizes random_distortion which only returns a wtdtvar type.
"""
rng = np.random.default_rng(random_state)
def _one_distortion(rng):
method = rng.choice(['ph', 'wang', 'dual', 'wtdtvar', 'tvar', 'ccoc'])
match method:
case 'ph':
return Distortion.ph(rng.uniform(0.1 if rng.uniform() < 0.1 else 0.4, 1.0))
case 'wang':
return Distortion.wang(rng.uniform(0.5, 1.5))
case 'dual':
return Distortion.dual(rng.uniform(1.1, 2.5))
case 'tvar':
return Distortion.tvar(rng.uniform(0.01, 0.99))
case 'ccoc':
return Distortion.ccoc(rng.uniform(0.02, 0.35))
case 'wtdtvar':
knots = rng.choice([2, 2, 3, 3, 3, 4, 4, 5, 8, 12])
mean = 0 if (t:=rng.uniform() < 0.5) else t / 2
mass = 0 if (t:=rng.uniform() < 0.75) else t / 4
return Distortion.random_distortion(knots, mass, mean, random_state=rng)
if n == 1:
return _one_distortion(rng)
else:
return {i: _one_distortion(rng) for i in range(n)}
[docs]
@staticmethod
def random_distortion(n_knots, mass=0, mean=0, wt_rng=None, name="", random_state=None):
"""
Create a random distortion. if mass (mean)
add a mass (mean) term.
Knots include possible mean and mass (max) knots.
wt_rng to generate (spiky) weights, eg. wt_rng=ss.pareto(1.5).rvs.
"""
random_state = random_state or np.random.default_rng()
if mass: n_knots -= 1
if mean: n_knots -= 1
ps = random_state.random(n_knots)
ps.sort()
assert n_knots >= 0
if n_knots == 0:
assert mass + mean == 1, 'BiTVaR weighting mean and max, weights must sum to 1.'
if wt_rng is None:
wts = random_state.random(n_knots)
else:
wts = wt_rng(size=n_knots, random_state=random_state)
wts = wts / wts.sum(dtype=np.float64) * (1 - mass - mean)
mn = ''
ma = ''
if mass:
ps = np.append(ps, 1)
wts = np.append(wts, mass)
ma = f', mx={mass:.3f}'
if mean:
ps = np.insert(ps, 0, 0)
wts = np.insert(wts, 0, mean)
mn = f', mn={mean:.3f}'
return Distortion('wtdtvar', ps, df=wts,
display_name=name or f'Rnd {n_knots} knots{mn}{ma}')
[docs]
def price(self, ser, a=np.inf, kind='ask', S_calculation='forwards'):
r"""
Compute the bid and ask prices for the distribution determined by ``ser`` with
an asset limit ``a``. Index of ``ser`` need not be equally spaced, so it can
be applied to :math:`\kappa`. However, the index values must be distinct.
Index is sorted if needed.
To apply to do kappa for unit A in portfolio port::
ser = port.density_df[['exeqa_A', 'p_total']].\
set_index('exeqa_A').groupby('exeqa_A').\
sum()['p_total']
dist.price(ser, port.q(0.99), 'both')
Always use ``S_calculation='forwards`` method to compute S = 1 - cumsum(probs).
Computes the price as the integral of gS.
Returns a tuple of (premium, loss) for the ask or bid price. When 'both' requested
returns (bid_premium, loss, ask_premium).
Generally prefer to use newer price_ex.
Requires ser starts at x=0.
:param ser: pd.Series of is probabilities, indexed by outcomes. Outcomes need not
be spaced evenly. ``ser`` is usually a probability column from ``density_df``.
:param kind: is "ask", "bid", or "both", giving the pricing view.
:param a: asset level. ``ser`` is truncated at ``a``.
"""
if not isinstance(ser, pd.Series):
raise ValueError(f'ser must be a pandas Series, not {type(ser)}')
if kind in ['bid', 'ask']:
pass
elif kind == 'both':
return [*self.price(ser, a, 'bid', S_calculation),
self.price(ser, a, 'ask', S_calculation)[0]]
else:
raise ValueError(f'kind must be bid, ask, or both, not {kind}')
# unlikely to be an issue
if not ser.index.is_monotonic_increasing:
ser = ser.sort_index(ascending=True)
# apply limit
if a < np.inf:
# ser = ser.copy()
# tail = ser[a:].sum()
# fixed bug: need loc to include a
ser = ser.loc[:a]
# ser[a] = tail
if S_calculation == 'forwards':
S = 1 - ser.cumsum()
S = np.maximum(0, S)
else:
fill_value = max(0, 1 - ser.sum())
S = np.minimum(1, fill_value +
ser.shift(-1, fill_value=0)[::-1].cumsum()[::-1])
# not all distortions return numpy; force conversion
if kind == 'ask':
gS = np.array(self.g(S))
else:
gS = np.array(self.g_dual(S))
# trapz is not correct with our interpretation elsewhere; we use a step function
# loss = np.trapz(S, S.index)
# prem = np.trapz(gS, S.index)
dx = np.diff(S.index)
loss = (S.iloc[:-1].values * dx).sum()
prem = (gS[:-1] * dx).sum()
return prem, loss
[docs]
def price2(self, ser, a=None, S_calculation='forwards'):
r"""
Similar to price, and uses the exact same calculation. Returns bid and ask by
default in a DataFrame. Returns all cumulative sums if a is None (likely, not
what you want).
Generally prefer to use newer price_ex.
Requires ser starts at x=0.
"""
if not isinstance(ser, pd.Series):
raise ValueError(f'ser must be a pandas Series, not {type(ser)}')
# unlikely to be an issue
if not ser.index.is_monotonic_increasing:
ser = ser.sort_index(ascending=True)
if S_calculation == 'forwards':
S = 1 - ser.cumsum()
S = np.maximum(0, S)
else:
fill_value = max(0, 1 - ser.sum())
S = np.minimum(1, fill_value +
ser.shift(-1, fill_value=0)[::-1].cumsum()[::-1])
# not all distortions return numpy; force conversion
gS = np.array(self.g(S))
dgS = np.array(self.g_dual(S))
dx = np.diff(S.index)
loss = (S.iloc[:-1].values * dx).cumsum()
ask = (gS[:-1] * dx).cumsum()
bid = (dgS[:-1] * dx).cumsum()
# index is shifted by 1 because it is the right hand range of integration
ans = pd.DataFrame({'bid': bid, 'el': loss, 'ask': ask}, index=S.index[1:])
if a is None:
return ans
else:
# no longer guaranteed that a is in ser.index
return ans.iloc[ans.index.get_indexer([a], method='nearest')]
[docs]
def price_ex(self, ser, a=np.inf, kind='ask', method='dx', S_calculation='backwards',
as_frame=False):
"""
Updated version of price and price2 that should be used in the future.
Always uses S_calculation='backwards' method to compute S as
``ser[::-1].cumsum().shift(1, fill_value=0)[::-1]``.
Sorts ser if needed.
If calculation 'dx' computes the price as the integral of gS dx. (This method
has fewer diffs).
if calculation 'ds' computes the prices as the integralk of x d(gS).
Neither method requires the index to be unique.
kind = ask, bid or both.
Asset level a is required to be in the index of ser, there is no interpolation.
There is an ambiguity here about a. If you want the unlimited price of an
unbounded variable then you integrate to infinity. In the numerical approximation
you integrate over all ser values. In that canse there is no a P(X>a) term
added in the dgS method, because Pr(X>inf)=0. However, if X is actually bounded
and as a mass at its max value then you do need to add xPr(X>
Returns a namedtuple.
summarize=True automatically summarizes the index if it is not unique.
if as_frame is True, returns a DataFrame with bid, el, and ask columns,
otherwise returns a namedtuple with bid, el, and ask fields.
If S_calculation is 'forwards', S is computed as 1 - ser.cumsum(). If 'backwards',
it is computed as p_total[::-1].cumsum()[::-1].shift(-1, fill_value=0). This is computed
before p is truncated for a.
Because of potential underflow issues, the backwards method for computing the ask
is more reliable. Therefore it has become the default.
When there are thin tails, neither method is reliable for the bid
because g_dual(s) = 1 - g(1 - s).
"""
# input audits
assert kind in ['bid', 'ask', 'both'], "kind must be 'bid', 'ask', or 'both'"
assert method in ['dx', 'ds'], "method must be 'dx' or 'ds'"
assert S_calculation in ['forwards', 'backwards'], "S_calculation must be 'forwards' or 'backwards'"
if not isinstance(ser, pd.Series):
raise ValueError(f'ser must be a pandas Series, not {type(ser)}')
# assert ser.index.is_monotonic_increasing, 'ser index must be sorted ascending'
# need
if not ser.index.is_monotonic_increasing:
ser = ser.sort_index(ascending=True)
if S_calculation == 'forwards':
# truncate: note here you need ser a Series and you need to use
# .loc to truncate at a and include a. Otherwise, it may not be included.
if a < np.inf:
assert a in ser.index, f'a={a} must be in the index of ser'
# notice this includes a at the end, which is not needed
# to compute the integral Sdx integral (but is for the xdS)
ser = ser.loc[:a]
# calculation using forwards method
S = np.maximum(0, 1 - ser.cumsum())
if a == np.inf:
# if a = inf then you want the last prob to be zero come what may
S.iloc[-1] = 0
elif S_calculation == 'backwards':
# there is an implicit assumption here that ser.sum() == 1.
if not np.allclose((ser.sum()), (1)):
print(f'WARNING: ser.sum() = {ser.sum()} is not 1.')
S = ser[::-1].cumsum().shift(1, fill_value=0)[::-1]
S = np.minimum(1, S)
if a < np.inf:
assert a in ser.index, f'a={a} must be in the index of ser'
S = S.loc[:a]
# note that by construction the last element here is automatically zero
gS = dual_gS = None
el = bid = ask = np.nan # keep the linter happy
if kind == 'ask':
gS = np.array(self.g(S))
elif kind == 'both':
gS = np.array(self.g(S))
dual_gS = np.array(self.g_dual(S))
elif kind == 'bid':
dual_gS = np.array(self.g_dual(S))
# at this point S is a Series, and gS and dual_gS are numpy arrays
if method == 'dx':
# note: the last value of S is for prob > a, so it is not used.
dx = np.diff(S.index)
x0 = S.index[0]
el = (S.iloc[:-1].values * dx).sum() + x0
if gS is not None:
ask = (gS[:-1] * dx).sum() + x0
if dual_gS is not None:
bid = (dual_gS[:-1] * dx).sum() + x0
elif method == 'ds':
# the last adjustment to the xdgS integral is to add
# a P(X>a) provided a is finite. To simplify math:
if a == np.inf:
a = 0
dS = np.diff(S, prepend=1.)
x = np.array(S.index)
# S is a Series
# int gS dx = - x S + a S - 0 S0
el = -((x * dS).sum()) + a * S.iloc[-1]
if gS is not None:
dgS = np.diff(gS, prepend=1.)
ask = -((x * dgS).sum()) +a * gS[-1]
if dual_gS is not None:
ddual_gS = np.diff(dual_gS, prepend=1.)
bid = -((x * ddual_gS).sum()) + a * dual_gS[-1]
if as_frame:
return pd.DataFrame([[bid, el, ask, self.name, a]],
index=[0], # convenient to know it is 0
# index=pd.MultiIndex.from_arrays([[self.name], [a]],
# names=['distortion', 'assets']),
columns=pd.Index(
['bid', 'el', 'ask', 'distortion', 'assets']))
else:
# Define a namedtuple called 'Point' with fields 'x' and 'y'
Price = namedtuple('Price', 'bid,el,ask')
return Price(bid, el, ask)
[docs]
def make_q(self, ser, a=np.inf):
"""
Use logic from ``price_ex`` to return the vector of risk adjusted probabilities
for use in pricing. See ``price_ex`` for more.
Always uses backwards calculation, ask pricing and method='ds'.
The resulting series is used as::
q = a.make_q(x, a)
ask = -((x * q).sum()) + a * gS[-1]
In use, the last adjustment to the xdgS integral is to add a P(X>a) provided a is finite.
To simplify the math, just add::
if a == np.inf:
a = 0
"""
if not isinstance(ser, pd.Series):
raise ValueError(f'ser must be a pandas Series, not {type(ser)}')
# assert ser.index.is_monotonic_increasing, 'ser index must be sorted ascending'
# need
if not ser.index.is_monotonic_increasing:
ser = ser.sort_index(ascending=True)
# there is an implicit assumption here that ser.sum() == 1.
if ser.sum() < np.nextafter(1.0, 0.0): # float before 1, rounding...
raise ValueError('Sum of input probabilities must be 1. Try remove_fuzz=True if using a Portfolio')
# backwards calc of S
S = ser[::-1].cumsum().shift(1, fill_value=0)[::-1]
S = np.minimum(1, S)
if a < np.inf:
assert a in ser.index, f'a={a} must be in the index of ser'
S = S.loc[:a]
gS = np.array(self.g(S))
# at this point S is a Series, and gS is a numpy array
dS = -np.diff(S, prepend=1.)
dgS = -np.diff(gS, prepend=1.)
ans = pd.DataFrame({'p': dS, 'q': dgS, 'S': S, 'gS': gS}, index=S.index)
return ans
# add the numba functions to the class for easy access
[docs]
def quick_gS(self, den):
"""
Calls numba version to compute ``self.g(1-den.sumsum())``
for tvar or bitvar distortions.
"""
if self._name == 'tvar':
p = self.shape
if isinstance(den, pd.Series):
return tvar_gS(den.values, p)
else:
return tvar_gS(den, p)
elif self._name == 'bitvar':
p0, p1 = self.df
w = self.shape
if isinstance(den, pd.Series):
return bitvar_gS(den.values, p0, p1, w)
else:
return bitvar_gS(den, p0, p1, w)
else:
raise ValueError(f'quick_gS only implemented for TVaR and BiTVaR, not {self._name}')
[docs]
def quick_ra(self, den, x=None):
"""
Computes the risk adjusted expected value of ``den`` and ``x`` using the distortion.
Calls numba version of ra for tvar or bitvar distortion. If ``x is None``,
requires ``den`` to be a ``Series`` and uses the index. ``x`` values must be in
ascending order.
"""
if self._name == 'tvar':
p = self.shape
if isinstance(den, pd.Series):
return tvar_ra(den.values, np.array(den.index), p)
else:
assert x is not None
return tvar_ra(den, x, p)
elif self._name == 'bitvar':
p0, p1 = self.df
w = self.shape
if isinstance(den, pd.Series):
return bitvar_ra(den.values, np.array(den.index), p0, p1, w)
else:
return bitvar_ra(den, x, p0, p1, w)
else:
raise ValueError(f'quick_ra only implemented for TVaR and BiTVaR, not {self._name}')
[docs]
def approx_ccoc(roe, eps=1e-14, display_name=None):
"""
Create a continuous approximation to the CCoC distortion with return roe.
Helpful utility function for creating a distortion.
:param roe: return on equity
:param eps: small number to avoid mass at zero
"""
return Distortion('bitvar', roe/(1 + roe), df=[0, 1-eps],
display_name=f'aCCoC {roe:.2%}' if display_name is None
else display_name
)
[docs]
def tvar_weights(d):
"""
Return tvar weight function for a distortion d. Use np.gradient to differentiate g' but
adjust for certain distortions. The returned function expects a numpy array of p
values.
:param: d distortion
"""
shape = d.shape
r0 = d.r0
nm = d.name
if nm.lower().find('ccoc') >= 0:
nm = 'ccoc'
v = shape
def wf(p):
return np.where(p==0, 1 - v,
# use nan to try and distinguish mass from density
np.where(p == 1, v, np.nan))
elif nm == 'ph':
# this is easy, do by hand
def wf(p):
return np.where(
p==1, shape, # really a mass!
-shape * (shape - 1) * (1 - p) ** (shape - 1)
)
elif nm == 'tvar':
def wf(p):
# something that will plot reasonably
dp = p[1] - p[0]
return np.where(np.abs(p - shape) < dp, 1, 0)
else:
# numerical approximation
def wf(p):
gprime = d.g_prime(1 - p)
wt = (1 - p) * np.gradient(gprime, p)
return wt
# adjust for endpoints in certain situations (where is a mass at 0 or a kink
# s=1 (slope approaching s=1 is > 0)
if nm == 'wang':
pass
elif nm == 'dual':
pass
return wf #noqa
[docs]
def p_to_parameters(p):
"""Return standard parameters equivalent to TVaR p."""
ans = {}
# in terms of p = d, discount parameterization
ans['ccoc'] = p
# g(s) = s^a, a = 1 - p / 1 + p; a = 0 is p = 1, a = 1 is p = 0
ans['ph'] = (1 - p) / (1 + p)
ans['wang'] = 2 ** 0.5 * phi_inv((1 + p) / 2)
# p = 0 lambda = 0, p = 1 lambda = infty
# b param = 1 + p / 1 - p, p = 0 is b=1 and p = 1 is b = infty
ans['dual'] = (1 + p) / (1 - p)
ans['tvar'] = p
return ans
[docs]
def consistent_distortions(p: int):
"""Create five representative distortions using the TVaR-p parameterization."""
params = p_to_parameters(p)
ans = {}
for k, sh in params.items():
# Distortion.ccoc takes discount but, alas,
# Distortion('ccoc', shape) takes return
if k == 'ccoc':
sh = sh / (1 - sh)
ans[k] = Distortion(k, sh)
return ans