from collections import namedtuple
from cycler import cycler
import decimal
from functools import lru_cache
from io import BytesIO
import itertools
from itertools import product
import logging.handlers
import logging
import math
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from numbers import Number
import numpy as np
import pandas as pd
from pygments import highlight
from pygments.formatters import HtmlFormatter
from pygments.lexers import get_lexer_by_name
import re
import scipy.stats as ss
import scipy.fft as sft
from scipy.integrate import quad
from scipy.interpolate import interp1d
from scipy.optimize import broyden2, newton_krylov, brentq
from scipy.optimize.nonlin import NoConvergence
from scipy.special import kv, binom, loggamma
from scipy.stats import multivariate_t
from IPython.core.display import HTML, Markdown, display, Image as ipImage, SVG as ipSVG
from .constants import *
import aggregate.random_agg as ar
logger = logging.getLogger(__name__)
GCN = namedtuple('GCN', ['gross', 'net', 'ceded'])
# TODO take out timer stuff
last_time = first_time = 0
timer_active = False
[docs]def get_fmts(df):
"""
reasonable formats for a styler
:param df:
:return:
"""
fmts = {}
def guess_fmt(nm, sz):
named_cols = {'Err': '{:6.3e}', 'CV': '{:6.3f}', 'Skew': '{:6.3f}'}
for n, f in named_cols.items():
if nm.find(n) >= 0: # note -1 means not found
return f
if abs(sz) < 1:
return '{:6.3e}'
elif abs(sz) < 10:
return '{:6.3f}'
elif abs(sz) < 1000:
return '{:6.0f}'
elif abs(sz) < 1e10:
return '{:,.0f}'
else:
return '{:5.3e}'
for k, v in df.mean().items():
fmts[k] = guess_fmt(k, v)
return fmts
[docs]def pprint(txt):
"""
Simple text version of pprint with line breaking
"""
return pprint_ex(txt, split=60)
[docs]def pprint_ex(txt, split=0, html=False):
"""
Try to format an agg program. This is difficult because of dfreq and dsev, optional
reinsurance, etc. Go for a simple approach of removing unnecessary spacing
and removing notes. Notes can be accessed from the spec that is always to hand.
For long programs use split=60 or so, they are split at appropriate points.
Best to use html = True to get colorization.
:param txt: program text input
:param split: if > 0 split lines at this length
:param html: if True return html (via pygments) , else return text
"""
ans = []
# programs come in as multiline
txt = txt.replace('\n\tagg', ' agg')
for t in txt.split('\n'):
clean = re.sub(r'[ \t]+', ' ', t.strip())
clean = re.sub(r' note\{[^}]*\}', '', clean)
if split > 0 and len(clean) > split:
clean = re.sub(
r' ((dfreq )([0-9]+ )|([0-9]+ )(claims?|premium|loss|exposure)'
r'|d?sev|dfreq|occurrence|agg|aggregate|wts?|mixed|poisson|fixed)',
r'\n \1', clean)
if clean[:4] == 'port':
# put in extra tabs at agg for portfolios
sc = clean.split('\n')
clean = sc[0] + '\n' + '\n'.join([i if i[:5] == ' agg' else ' ' + i for i in sc[1:]])
ans.append(clean)
ans = '\n'.join(ans)
if html is True:
# ans = f'<p><code>{ans}\n</code></p>'
# notes = re.findall('note\{([^}]*)\}', txt)
# for i, n in enumerate(notes):
# ans += f'<p><small>Note {i+1}. {n}</small><p>'
# use pygments to colorize
agg_lex = get_lexer_by_name('agg')
# remove extra spaces
txt = re.sub(r'[ \t\n]+', ' ', txt.strip())
ans = HTML(highlight(txt, agg_lex, HtmlFormatter(style='friendly', full=False)))
return ans
[docs]def ft(z, padding, tilt):
"""
fft with padding and tilt
padding = n makes vector 2^n as long
n=1 doubles (default)
n=2 quadruples
tilt is passed in as the tilting vector or None: easier for the caller to have a single instance
:param z:
:param padding: = 1 doubles
:param tilt: vector of tilt values
:return:
"""
locft = sft.rfft
if z.shape != (len(z),):
raise ValueError('ERROR wrong shape passed into ft: ' + str(z.shape))
# tilt
# valeus per https://stackoverflow.com/questions/71706387/finding-fft-gives-keyerror-aligned-pandas
if tilt is not None:
zt = z * tilt
else:
zt = z
if type(zt) != np.ndarray:
zt = zt.to_numpy()
# padding handled by the ft routine
# temp = np.hstack((z, np.zeros_like(z)))
return locft(zt, len(z) << padding)
[docs]def ift(z, padding, tilt):
"""
ift that strips out padding and adjusts for tilt
:param z:
:param padding:
:param tilt:
:return:
"""
locift = sft.irfft
if z.shape != (len(z),):
raise ValueError('ERROR wrong shape passed into ft: ' + str(z.shape))
if type(z) != np.ndarray:
temp = locift(z.to_numpy())
else:
temp = locift(z)
# unpad
if padding != 0:
temp = temp[0:len(temp) >> padding]
# untilt
if tilt is not None:
temp /= tilt
return temp
# return temp[0:int(len(temp) / 2)]
# def ft(z, padding, tilt):
# """
# fft with padding and tilt
# padding = n makes vector 2^n as long
# n=1 doubles (default)
# n=2 quadruples
# tilt is passed in as the tilting vector or None: easier for the caller to have a single instance
#
# :param z:
# :param padding: = 1 doubles
# :param tilt: vector of tilt values
# :return:
# """
# # locft = np.fft.fft
# locft = np.fft.rfft
# if z.shape != (len(z),):
# raise ValueError('ERROR wrong shape passed into ft: ' + str(z.shape))
# # tilt
# if tilt is not None:
# zt = z * tilt
# else:
# zt = z
# # pad
# if padding > 0:
# # temp = np.hstack((z, np.zeros_like(z), np.zeros_like(z), np.zeros_like(z)))
# pad_len = zt.shape[0] * ((1 << padding) - 1)
# temp = np.hstack((zt, np.zeros(pad_len)))
# else:
# temp = zt
# # temp = np.hstack((z, np.zeros_like(z)))
# return locft(temp)
#
#
# def ift(z, padding, tilt):
# """
# ift that strips out padding and adjusts for tilt
#
# :param z:
# :param padding:
# :param tilt:
# :return:
# """
# # locift = np.fft.ifft
# locift = np.fft.irfft
# if z.shape != (len(z),):
# raise ValueError('ERROR wrong shape passed into ft: ' + str(z.shape))
# temp = locift(z)
# # unpad
# # temp = temp[0:]
# if padding != 0:
# temp = temp[0:len(temp) >> padding]
# # untilt
# if tilt is not None:
# temp /= tilt
# return temp
# # return temp[0:int(len(temp) / 2)]
[docs]def mu_sigma_from_mean_cv(m, cv):
"""
lognormal parameters
"""
cv = np.array(cv)
m = np.array(m)
sigma = np.sqrt(np.log(cv*cv + 1))
mu = np.log(m) - sigma**2 / 2
return mu, sigma
ln_fit = mu_sigma_from_mean_cv
[docs]def sln_fit(m, cv, skew):
"""
method of moments shifted lognormal fit matching given mean, cv and skewness
:param m:
:param cv:
:param skew:
:return:
"""
if skew == 0:
return -np.inf, np.inf, 0
else:
eta = (((np.sqrt(skew ** 2 + 4)) / 2) + (skew / 2)) ** (1 / 3) - (
1 / (((np.sqrt(skew ** 2 + 4)) / 2) + (skew / 2)) ** (1 / 3))
sigma = np.sqrt(np.log(1 + eta ** 2))
shift = m - cv * m / eta
if shift > m:
logger.log(WL, f'utils sln_fit | shift > m, {shift} > {m}, too extreme skew {skew}')
shift = m - 1e-6
mu = np.log(m - shift) - sigma ** 2 / 2
return shift, mu, sigma
[docs]def sgamma_fit(m, cv, skew):
"""
method of moments shifted gamma fit matching given mean, cv and skewness
:param m:
:param cv:
:param skew:
:return:
"""
if skew == 0:
return np.nan, np.inf, 0
else:
alpha = 4 / (skew * skew)
theta = cv * m * skew / 2
shift = m - alpha * theta
return shift, alpha, theta
[docs]def gamma_fit(m, cv):
"""
"""
alpha = cv**-2
beta = m / alpha
return alpha, beta
[docs]def approximate_work(m, cv, skew, name, agg_str, note, approx_type, output):
"""
Does the work for Portfolio.approximate and Aggregate.approximate. See their documentation.
:param output: scipy - frozen scipy.stats continuous rv object; agg_decl
sev_decl - DecL program for severity (to substituate into an agg ; no name)
sev_kwargs - dictionary of parameters to create Severity
agg_decl - Decl program agg T 1 claim sev_decl fixed
any other string - created Aggregate object
"""
if approx_type == 'norm':
sd = m*cv
if output == 'scipy':
return ss.norm(loc=m, scale=sd)
sev = {'sev_name': 'norm', 'sev_scale': sd, 'sev_loc': m}
decl = f'{sd} @ norm 1 # {m} '
elif approx_type == 'lognorm':
mu, sigma = mu_sigma_from_mean_cv(m, cv)
sev = {'sev_name': 'lognorm', 'sev_a': sigma, 'sev_scale': np.exp(mu)}
if output == 'scipy':
return ss.lognorm(sigma, scale=np.exp(mu))
decl = f'{np.exp(mu)} * lognorm {sigma} '
elif approx_type == 'gamma':
shape = cv ** -2
scale = m / shape
if output == 'scipy':
return ss.gamma(shape, scale=scale)
sev = {'sev_name': 'gamma', 'sev_a': shape, 'sev_scale': scale}
decl = f'{scale} * gamma {shape} '
elif approx_type == 'slognorm':
shift, mu, sigma = sln_fit(m, cv, skew)
if output == 'scipy':
return ss.lognorm(sigma, scale=np.exp(mu), loc=shift)
sev = {'sev_name': 'lognorm', 'sev_a': sigma, 'sev_scale': np.exp(mu), 'sev_loc': shift}
decl = f'{np.exp(mu)} * lognorm {sigma} + {shift} '
elif approx_type == 'sgamma':
shift, alpha, theta = sgamma_fit(m, cv, skew)
if output == 'scipy':
return ss.gamma(alpha, loc=shift, scale=theta)
sev = {'sev_name': 'gamma', 'sev_a': alpha, 'sev_scale': theta, 'sev_loc': shift}
decl = f'{theta} * gamma {alpha} + {shift} '
else:
raise ValueError(f'Inadmissible approx_type {approx_type} passed to fit')
if output == 'agg_decl':
agg_str += decl
agg_str += ' fixed'
return agg_str
elif output == 'sev_kwargs':
return sev
elif output == 'sev_decl':
return decl
else:
from . distributions import Aggregate
return Aggregate(**{'name': name, 'note': note,
'exp_en': 1, **sev, 'freq_name': 'fixed'})
[docs]def estimate_agg_percentile(m, cv, skew, p=0.999):
"""
Come up with an estimate of the tail of the distribution based on the three parameter fits, ln and gamma
Updated Nov 2022 with a way to estimate p based on lognormal results. How far in the
tail you need to go to get an accurate estimate of the mean. See 2_x_approximation_error
in the help.
Retain p param for backwards compatibility.
:param m:
:param cv:
:param skew:
:param p: if > 1 converted to 1 - 10**-n
:return:
"""
# p_estimator = interp1d([0.53294, 0.86894, 1.9418, 7.3211, 22.738, 90.012, 457.14, 2981],
# [3, 4, 5, 7, 8, 9, 11, 12],
# assume_sorted=True, bounds_error=False, fill_value=(3, 13))
# p = 1 - 10**-p_estimator(cv)
if np.isinf(cv):
raise ValueError('Infinite variance passed to estimate_agg_percentile')
if p > 1:
p = 1 - 10 ** -p
pn = pl = pg = 0
if skew <= 0:
# neither sln nor sgamma works, use a normal
# for negative skewness the right tail will be thin anyway so normal not outrageous
fzn = ss.norm(scale=m * cv, loc=m)
pn = fzn.isf(1 - p)
elif not np.isinf(skew):
shift, mu, sigma = sln_fit(m, cv, skew)
fzl = ss.lognorm(sigma, scale=np.exp(mu), loc=shift)
shift, alpha, theta = sgamma_fit(m, cv, skew)
fzg = ss.gamma(alpha, scale=theta, loc=shift)
pl = fzl.isf(1 - p)
pg = fzg.isf(1 - p)
else:
mu, sigma = ln_fit(m, cv)
fzl = ss.lognorm(sigma, scale=np.exp(mu))
alpha, theta = gamma_fit(m, cv)
fzg = ss.gamma(alpha, scale=theta)
pl = fzl.isf(1 - p)
pg = fzg.isf(1 - p)
# throw in a mean + 3 sd approx too...
return max(pn, pl, pg, m * (1 + ss.norm.isf(1 - p) * cv))
[docs]def round_bucket(bs):
"""
Compute a decent rounded bucket from an input float ``bs``. ::
if bs > 1 round to 2, 5, 10, ...
elif bs < 1 find the smallest power of two greater than bs
Test cases: ::
test_cases = [1, 1.1, 2, 2.5, 4, 5, 5.5, 8.7, 9.9, 10, 13,
15, 20, 50, 100, 99, 101, 200, 250, 400, 457,
500, 750, 1000, 2412, 12323, 57000, 119000,
1e6, 1e9, 1e12, 1e15, 1e18, 1e21]
for i in test_cases:
print(i, round_bucket(i))
for i in test_cases:
print(1/i, round_bucket(1/i))
"""
if bs == 0 or np.isinf(bs):
raise ValueError(f'Inadmissible value passed to round_bucket, {bs}')
if bs == 1:
return bs
if bs > 1:
# rounded bs, to an integer
rbs = np.round(bs, 0)
if rbs == 1:
return 2.0
elif rbs == 2:
return 2
elif rbs <= 5:
return 5.0
elif rbs <= 10:
return 10.0
else:
rbs = np.round(bs, -int(np.log10(bs)))
if rbs < bs:
rbs *= 2
return rbs
if bs < 1:
# inverse bs
# originally
# bsi = 1 / bs
# nbs = 1
# while nbs < bsi:
# nbs <<= 1
# nbs >>= 1
# return 1. / nbs
# same answer but ? clearer and slightly quicker
x = 1. / bs
x = bin(int(x))
x = '0b1' + "0" * (len(x) -3)
x = int(x[2:], 2)
return 1./ x
[docs]def make_ceder_netter(reins_list, debug=False):
"""
Build the netter and ceder functions. It is applied to occ_reins and agg_reins,
so should be stand-alone.
The reinsurance functions are piecewise linear functions from 0 to inf which
kinks as needed to express the ceded loss as a function of subject (gross) loss.
For example, if ``reins_list = [(1, 10, 0), (0.5, 30, 20)]`` the program is 10 x 10 and
15 part of 30 x 20 (share=0.5). This requires nodes at 0, 10, 20, 50, and inf.
It is easiest to make the ceder function. Ceded loss at subject loss at x equals
the sum of the limits below x plus the cession to the layer in which x lies. The
variable ``base`` keeps track of the layer, ``h`` of the sum (height) of lower layers.
``xs`` tracks the knot points, ``ys`` the values.
::
Break (xs) Ceded (ys)
0 0
10 0
20 10
50 25
inf 25
For example:
::
%%sf 1 2
c, n, x, y = make_ceder_netter([(1, 10, 10), (0.5, 30, 20), (.25, np.inf, 50)], debug=True)
xs = np.linspace(0,250, 251)
ys = c(xs)
ax0.plot(xs, ys)
ax0.plot(xs, xs, ':C7')
ax0.set(title='ceded')
ax1.plot(xs, xs-ys)
ax1.plot(xs, xs, 'C7:')
ax1.set(title='net')
:param reins_list: a list of (share of, limit, attach), e.g., (0.5, 3, 2) means 50% share of 3x2
or, equivalently, 1.5 part of 3 x 2. It is better to store share rather than part
because it still works if limit == inf.
:param debug: if True, return layer function xs and ys in addition to the interpolation functions.
:return: netter and ceder functions; optionally debug information.
"""
# poor mans inf
INF = 1e99
h = 0
base = 0
xs = [0]
ys = [0]
for (share, y, a) in reins_list:
# part of = share of times limit
if np.isinf(y):
y = INF
p = share * y
if a > base:
# moved to new layer, write out left-hand knot point
xs.append(a)
ys.append(h)
# increment height
h += p
# write out right-hand knot points
xs.append(a + y)
ys.append(h)
# update left-hand end
base += (a + y)
# if not at infinity, stay flat from base to end
if base < INF:
xs.append(np.inf)
ys.append(h)
ceder = interp1d(xs, ys)
netter = lambda x: x - ceder(x)
if debug:
return ceder, netter, xs, ys
else:
return ceder, netter
# axis management OLD
[docs]def axiter_factory(axiter, n, figsize=None, height=2, aspect=1, nr=5):
"""
axiter = check_axiter(axiter, ...) to allow chaining
TODO can this be done in the class somehow?
:param axiter:
:param n:
:param figsize:
:param height:
:param aspect:
:param nr:
:return:
"""
if axiter is None:
return AxisManager(n, figsize, height, aspect, nr)
else:
return axiter
[docs]class AxisManager(object):
"""
Manages creation of a grid of axes for plotting. Allows pandas plot and matplotlib to
plot to same set of axes.
Always created and managed through axiter_factory function
:param n: number of plots in grid
:param figsize:
:param height: height of individual plot
:param aspect: aspect ratio of individual plot
:param nr: number of plots per row
"""
__slots__ = ['n', 'nr', 'r', 'ax', 'axs', 'c', 'f', 'faxs', 'it']
[docs] def __init__(self, n, figsize=None, height=2, aspect=1, nr=5):
self.n = n
self.nr = nr
self.r, self.c = self.grid_size(n)
if figsize is None:
h = self.r * height
w = self.c * height * aspect
# almost all the time will sup_title which scales down by 0.96
figsize = (w, h / 0.96)
self.f, self.axs = plt.subplots(self.r, self.c, figsize=figsize)
if n == 1:
self.ax = self.axs
self.it = None
else:
# faxs = flattened axes
self.faxs = self.axs.flatten()
self.it = iter(self.faxs)
self.ax = None
def __next__(self):
if self.n > 1:
self.ax = next(self.it)
return self.ax
[docs] def grid_size(self, n, subgrid=False):
"""
appropriate grid size given class parameters
:param n:
:param subgrid: call is for a subgrid, no special treatment for 6 and 8
:return:
"""
r = (self.nr - 1 + n) // self.nr
c = min(n, self.nr)
if not subgrid:
if self.nr > 3 and n == 6 and self.nr != 6:
r = 2
c = 3
elif self.nr > 4 and n == 8 and self.nr != 8:
r = 2
c = 4
return r, c
[docs] def dimensions(self):
"""
return dimensions (width and height) of current layout
:return:
"""
return self.r, self.c
[docs] def grid(self, size=0):
"""
return a block of axes suitable for Pandas
if size=0 return all the axes
:param size:
:return:
"""
if size == 0:
return self.faxs
elif size == 1:
return self.__next__()
else:
# need local sizing
assert self.n >= size
# r, c = self.grid_size(size, subgrid=True)
return [self.__next__() for _ in range(size)] # range(c) for _ in range(r)]
[docs] def tidy(self):
"""
delete unused axes to tidy up a plot
:return:
"""
if self.it is not None:
for ax in self.it:
self.f.delaxes(ax)
# methods for a more hands on approach
[docs] @staticmethod
def good_grid(n, c=4):
"""
Good layout for n plots
:param n:
:return:
"""
basic = {1: (1, 1), 2: (1, 2), 3: (1, 3), 4: (2, 2), 5: (2, 3), 6: (2, 3), 7: (2, 4),
8: (2, 4), 9: (3, 3), 10: (4, 3), 11: (4, 3), 12: (4, 3), 13: (5, 3), 14: (5, 3),
15: (5, 3), 16: (4, 4), 17: (5, 4), 18: (5, 4), 19: (5, 4), 20: (5, 4)}
if n <= 20:
r, c = basic[n]
else:
r = n // c
if r * c < n:
r += 1
return r, c
[docs] @staticmethod
def print_fig(n, aspect=1.5):
"""
printout code...to insert (TODO copy to clipboard!)
:param n:
:param aspect:
:return:
"""
r, c = AxisManager.good_grid(n)
w, h = AxisManager.size_figure(r, c, aspect)
l1 = f"f, axs = plt.subplots({r}, {c}, figsize=({w}, {h}), constrained_layout=True, squeeze=False)"
l2 = "axi = iter(axs.flatten())"
return '\n'.join([l1, l2])
[docs] @staticmethod
def tidy_up(f, ax):
"""
delete unused frames out of a figure
:param ax:
:return:
"""
for a in ax:
f.delaxes(a)
[docs]def lognorm_lev(mu, sigma, n, limit):
"""
return E(min(X, limit)^n) for lognormal using exact calculation
currently only for n=1, 2
:param mu:
:param sigma:
:param n:
:param limit:
:return:
"""
if limit == -1:
return np.exp(n * mu + n * n * sigma * sigma / 2)
else:
phi = ss.norm.cdf
ll = np.log(limit)
sigma2 = sigma * sigma
phi_l = phi((ll - mu) / sigma)
phi_l2 = phi((ll - mu - n * sigma2) / sigma)
unlimited = np.exp(n * mu + n * n * sigma2 / 2)
return unlimited * phi_l2 + limit ** n * (1 - phi_l)
[docs]def lognorm_approx(ser):
"""
Lognormal approximation to series, index = loss values, values = density.
"""
m, cv = xsden_to_meancv(ser.index, ser.values)
mu, sigma = mu_sigma_from_mean_cv(m, cv)
fz = ss.lognorm(sigma, scale=np.exp(mu))
return fz
# display related
[docs]def html_title(txt, n=1, title_case=True):
"""
:param txt:
:param n:
:param title_case:
:return:
"""
if title_case:
display(HTML('<h{:}> {:}'.format(n, txt.replace("_", " ").title())))
else:
display(HTML('<h{:}> {:}'.format(n, txt.replace("_", " "))))
[docs]def sensible_jump(n, desired_rows=20):
"""
return a sensible jump size to output desired_rows given input of n
:param n:
:param desired_rows:
:return:
"""
if n < desired_rows:
return 1
j = int(n / desired_rows)
return round(j, -len(str(j)) + 1)
[docs]def suptitle_and_tight(title, **kwargs):
"""
deal with tight layout when there is a suptitle
:param title:
:return:
"""
plt.suptitle(title, **kwargs)
# plt.tight_layout(rect=[0, 0, 1, 0.96])
[docs]class MomentAggregator(object):
"""
Purely accumulates moments
Used by Portfolio
Not frequency aware
makes report_ser df and statistics_df
Internal variables agg, sev, freq, tot = running total, 1, 2, 3 = noncentral moments, E(X^k)
:param freq_moms: function of one variable returning first three noncentral moments of the underlying
frequency distribution
"""
__slots__ = ['freq_1', 'freq_2', 'freq_3', 'sev_1', 'sev_2', 'sev_3', 'agg_1', 'agg_2', 'agg_3',
'tot_freq_1', 'tot_freq_2', 'tot_freq_3',
'tot_sev_1', 'tot_sev_2', 'tot_sev_3',
'tot_agg_1', 'tot_agg_2', 'tot_agg_3', 'freq_moms'
]
[docs] def __init__(self, freq_moms=None):
# accumulators
self.agg_1 = self.agg_2 = self.agg_3 = 0
self.tot_agg_1 = self.tot_agg_2 = self.tot_agg_3 = 0
self.freq_1 = self.freq_2 = self.freq_3 = 0
self.tot_freq_1 = self.tot_freq_2 = self.tot_freq_3 = 0
self.sev_1 = self.sev_2 = self.sev_3 = 0
self.tot_sev_1 = self.tot_sev_2 = self.tot_sev_3 = 0
# function to comptue frequency moments, hence can call add_f1s(...)
self.freq_moms = freq_moms
[docs] def add_fs(self, f1, f2, f3, s1, s2, s3):
"""
accumulate new moments defined by f and s
used by Portfolio
compute agg for the latest values
:param f1:
:param f2:
:param f3:
:param s1:
:param s2:
:param s3:
:return:
"""
self.freq_1 = f1
self.freq_2 = f2
self.freq_3 = f3
# load current sev statistics_df
self.sev_1 = s1
self.sev_2 = s2
self.sev_3 = s3
# accumulatge frequency
self.tot_freq_1, self.tot_freq_2, self.tot_freq_3 = \
self.cumulate_moments(self.tot_freq_1, self.tot_freq_2, self.tot_freq_3, f1, f2, f3)
# severity
self.tot_sev_1 = self.tot_sev_1 + f1 * s1
self.tot_sev_2 = self.tot_sev_2 + f1 * s2
self.tot_sev_3 = self.tot_sev_3 + f1 * s3
# aggregate_project
self.agg_1, self.agg_2, self.agg_3 = self.agg_from_fs(f1, f2, f3, s1, s2, s3)
# finally accumulate the aggregate_project
self.tot_agg_1, self.tot_agg_2, self.tot_agg_3 = \
self.cumulate_moments(self.tot_agg_1, self.tot_agg_2, self.tot_agg_3, self.agg_1, self.agg_2, self.agg_3)
[docs] def add_fs2(self, f1, vf, s1, vs):
"""
accumulate based on first two moments entered as mean and variance - this
is how questions are generally written.
"""
f2 = vf + f1 * f1
s2 = vs + s1 * s1
self.add_fs(f1, f2, 0., s1, s2, 0.)
[docs] def add_f1s(self, f1, s1, s2, s3):
"""
accumulate new moments defined by f1 and s - fills in f2, f3 based on
stored frequency distribution
used by Aggregate
compute agg for the latest values
:param f1:
:param s1:
:param s2:
:param s3:
:return:
"""
# fill in the frequency moments and store away
f1, f2, f3 = self.freq_moms(f1)
self.add_fs(f1, f2, f3, s1, s2, s3)
[docs] def get_fsa_stats(self, total, remix=False):
"""
get the current f x s = agg statistics_df and moments
total = true use total else, current
remix = true for total only, re-compute freq statistics_df based on total freq 1
:param total: binary
:param remix: combine all sevs and recompute the freq moments from total freq
:return:
"""
if total:
if remix:
# recompute the frequency moments; all local variables
f1 = self.tot_freq_1
f1, f2, f3 = self.freq_moms(f1)
s1, s2, s3 = self.tot_sev_1 / f1, self.tot_sev_2 / f1, self.tot_sev_3 / f1
a1, a2, a3 = self.agg_from_fs(f1, f2, f3, s1, s2, s3)
return [f1, f2, f3, *self.static_moments_to_mcvsk(f1, f2, f3),
s1, s2, s3, *self.static_moments_to_mcvsk(s1, s2, s3),
a1, a2, a3, *self.static_moments_to_mcvsk(a1, a2, a3)]
else:
# running total, not adjusting freq cv
return [self.tot_freq_1, self.tot_freq_2, self.tot_freq_3, *self.moments_to_mcvsk('freq', True),
self.tot_sev_1 / self.tot_freq_1, self.tot_sev_2 / self.tot_freq_1,
self.tot_sev_3 / self.tot_freq_1, *self.moments_to_mcvsk('sev', True),
self.tot_agg_1, self.tot_agg_2, self.tot_agg_3, *self.moments_to_mcvsk('agg', True)]
else:
# not total
return [self.freq_1, self.freq_2, self.freq_3, *self.moments_to_mcvsk('freq', False),
self.sev_1, self.sev_2, self.sev_3, *self.moments_to_mcvsk('sev', False),
self.agg_1, self.agg_2, self.agg_3, *self.moments_to_mcvsk('agg', False)]
@staticmethod
def factorial_to_noncentral(f1, f2, f3):
# eg. Panjer Willmot p 29, 2.3.13
nc2 = f2 + f1
nc3 = f3 + 3 * f2 + f1
return nc2, nc3
[docs] @staticmethod
def agg_from_fs(f1, f2, f3, s1, s2, s3):
"""
aggregate_project moments from freq and sev components
:param f1:
:param f2:
:param f3:
:param s1:
:param s2:
:param s3:
:return:
"""
return f1 * s1, \
f1 * s2 + (f2 - f1) * s1 ** 2, \
f1 * s3 + f3 * s1 ** 3 + 3 * (f2 - f1) * s1 * s2 + (- 3 * f2 + 2 * f1) * s1 ** 3
[docs] @staticmethod
def agg_from_fs2(f1, vf, s1, vs):
"""
aggregate_project moments from freq and sev ex and var x
:param f1:
:param vf:
:param s1:
:param vs:
:return:
"""
f2 = vf + f1 * f1
s2 = vs + s1 * s1
a1, a2, a3 = MomentAggregator.agg_from_fs(f1, f2, 0., s1, s2, 0.)
mw = MomentWrangler()
mw.noncentral = a1, a2, a3
# drop skewness
return mw.stats[:-1]
[docs] def moments_to_mcvsk(self, mom_type, total=True):
"""
convert noncentral moments into mean, cv and skewness
type = agg | freq | sev | mix
delegates work
:param mom_type:
:param total:
:return:
"""
if mom_type == 'agg':
if total:
return MomentAggregator.static_moments_to_mcvsk(self.tot_agg_1, self.tot_agg_2, self.tot_agg_3)
else:
return MomentAggregator.static_moments_to_mcvsk(self.agg_1, self.agg_2, self.agg_3)
elif mom_type == 'freq':
if total:
return MomentAggregator.static_moments_to_mcvsk(self.tot_freq_1, self.tot_freq_2, self.tot_freq_3)
else:
return MomentAggregator.static_moments_to_mcvsk(self.freq_1, self.freq_2, self.freq_3)
elif mom_type == 'sev':
if total:
return MomentAggregator.static_moments_to_mcvsk(self.tot_sev_1 / self.tot_freq_1,
self.tot_sev_2 / self.tot_freq_1,
self.tot_sev_3 / self.tot_freq_1)
else:
return MomentAggregator.static_moments_to_mcvsk(self.sev_1, self.sev_2, self.sev_3)
[docs] def moments(self, mom_type, total=True):
"""
vector of the moments; convenience function
:param mom_type:
:param total:
:return:
"""
if mom_type == 'agg':
if total:
return self.tot_agg_1, self.tot_agg_2, self.tot_agg_3
else:
return self.agg_1, self.agg_2, self.agg_3
elif mom_type == 'freq':
if total:
return self.tot_freq_1, self.tot_freq_2, self.tot_freq_3
else:
return self.freq_1, self.freq_2, self.freq_3
elif mom_type == 'sev':
if total:
return self.tot_sev_1 / self.tot_freq_1, self.tot_sev_2 / self.tot_freq_1, \
self.tot_sev_3 / self.tot_freq_1
else:
return self.sev_1, self.sev_2, self.sev_3
[docs] @staticmethod
def cumulate_moments(m1, m2, m3, n1, n2, n3):
"""
Moments of sum of indepdendent variables
:param m1: 1st moment, E(X)
:param m2: 2nd moment, E(X^2)
:param m3: 3rd moment, E(X^3)
:param n1:
:param n2:
:param n3:
:return:
"""
# figure out moments of the sum
t1 = m1 + n1
t2 = m2 + 2 * m1 * n1 + n2
t3 = m3 + 3 * m2 * n1 + 3 * m1 * n2 + n3
return t1, t2, t3
[docs] @staticmethod
def static_moments_to_mcvsk(ex1, ex2, ex3):
"""
returns mean, cv and skewness from non-central moments
:param ex1:
:param ex2:
:param ex3:
:return:
"""
m = ex1
var = ex2 - ex1 ** 2
# rounding errors...
if np.allclose(var, 0):
var = 0
if var < 0:
logger.error(f'MomentAggregator.static_moments_to_mcvsk | weird var < 0 = {var}; ex={ex1}, ex2={ex2}')
sd = np.sqrt(var)
if m == 0:
cv = np.nan
logger.info('MomentAggregator.static_moments_to_mcvsk | encountered zero mean, called with '
f'{ex1}, {ex2}, {ex3}')
else:
cv = sd / m
if sd == 0:
skew = np.nan
else:
skew = (ex3 - 3 * ex1 * ex2 + 2 * ex1 ** 3) / sd ** 3
return m, cv, skew
[docs] @staticmethod
def column_names():
"""
list of the moment and statistics_df names for f x s = a
:return:
"""
return [i + j for i, j in itertools.product(['freq', 'sev', 'agg'], [f'_{i}' for i in range(1, 4)] +
['_m', '_cv', '_skew'])]
[docs] def stats_series(self, name, limit, pvalue, remix):
"""
combine elements into a reporting series
handles order, index names etc. in one place
:param name: series name
:param limit:
:param pvalue:
:param remix: called from Aggregate want remix=True to collect mix terms; from Portfolio remix=False
:return:
"""
# TODO: needs to be closer link to column_names()
idx = pd.MultiIndex.from_arrays([['freq'] * 6 + ['sev'] * 6 + ['agg'] * 8,
(['ex1', 'ex2', 'ex3'] + ['mean', 'cv', 'skew']) * 3 + ['limit', 'P99.9e']],
names=['component', 'measure'])
all_stats = self.get_fsa_stats(total=True, remix=remix)
try:
p999e = estimate_agg_percentile(*all_stats[15:18], pvalue)
except ValueError:
# if no cv this is a value error
p999e = np.inf
return pd.Series([*all_stats, limit, p999e], name=name, index=idx)
[docs]class MomentWrangler(object):
"""
Conversion between central, noncentral and factorial moments
Input any one and ask for any translation.
Stores moments as noncentral internally
"""
__slots__ = ['_central', '_noncentral', '_factorial']
[docs] def __init__(self):
self._central = None
self._noncentral = None
self._factorial = None
@property
def central(self):
return self._central
@central.setter
def central(self, value):
self._central = value
ex1, ex2, ex3 = value
# p 43, 1.241
self._noncentral = (ex1, ex2 + ex1 ** 2, ex3 + 3 * ex2 * ex1 + ex1 ** 3)
self._make_factorial()
@property
def noncentral(self):
return self._noncentral
@noncentral.setter
def noncentral(self, value):
self._noncentral = value
self._make_factorial()
self._make_central()
@property
def factorial(self):
return self._factorial
@factorial.setter
def factorial(self, value):
self._factorial = value
ex1, ex2, ex3 = value
# p 44, 1.248
self._noncentral = (ex1, ex2 + ex1, ex3 + 3 * ex2 + ex1)
self._make_central()
@property
def stats(self):
m, v, c3 = self._central
sd = np.sqrt(v)
if m == 0:
cv = np.nan
else:
cv = sd / m
if sd == 0:
skew = np.nan
else:
skew = c3 / sd ** 3
# return pd.Series((m, v, sd, cv, skew), index=('EX', 'Var(X)', 'SD(X)', 'CV(X)', 'Skew(X)'))
# shorter names are better
return pd.Series((m, v, sd, cv, skew), index=('ex', 'var', 'sd', 'cv', 'skew'))
@property
def mcvsk(self):
m, v, c3 = self._central
sd = np.sqrt(v)
if m == 0:
cv = np.nan
else:
cv = sd / m
if sd == 0:
skew = np.nan
else:
skew = c3 / sd ** 3
return m, cv, skew
def _make_central(self):
ex1, ex2, ex3 = self._noncentral
# p 42, 1.240
self._central = (ex1, ex2 - ex1 ** 2, ex3 - 3 * ex2 * ex1 + 2 * ex1 ** 3)
[docs] def _make_factorial(self):
""" add factorial from central """
ex1, ex2, ex3 = self._noncentral
# p. 43, 1.245 (below)
self._factorial = (ex1, ex2 - ex1, ex3 - 3 * ex2 + 2 * ex1)
[docs]def xsden_to_meancv(xs, den):
"""
Compute mean and cv from xs and density.
Consider adding: np.nan_to_num(den)
Note: cannot rely on pd.Series[-1] to work... it depends on the index.
xs could be an index
:param xs:
:param den:
:return:
"""
pg = 1 - den.sum()
xd = xs * den
if isinstance(xs, np.ndarray):
xsm = xs[-1]
elif isinstance(xs, pd.Series):
xsm = xs.iloc[-1]
else:
xsm = np.array(xs)[-1]
ex1 = np.sum(xd) + pg * xsm
# logger.log(WL, f'tail mass mean adjustment {pg * xsm}')
ex2 = np.sum(xd * xs) + pg * xsm ** 2
sd = np.sqrt(ex2 - ex1 ** 2)
if ex1 != 0:
cv = sd / ex1
else:
cv = np.nan
return ex1, cv
[docs]def xsden_to_meancvskew(xs, den):
"""
Compute mean, cv and skewness from xs and density
Consider adding: np.nan_to_num(den)
:param xs:
:param den:
:return:
"""
pg = 1 - den.sum()
xd = xs * den
if isinstance(xs, np.ndarray):
xsm = xs[-1]
bs = xs[1] - xs[0]
elif isinstance(xs, pd.Series):
xsm = xs.iloc[-1]
bs = xs.iloc[1] - xs.iloc[0]
else:
_ = np.array(xs)
xsm = _[-1]
bs = _[1] - _[0]
xsm = xsm + bs
ex1 = np.sum(xd) + pg * xsm
# logger.log(WL, f'tail mass mean adjustment {pg * xsm}')
xd *= xs
ex2 = np.sum(xd) + pg * xsm ** 2
ex3 = np.sum(xd * xs) + pg * xsm ** 3
mw = MomentWrangler()
mw.noncentral = ex1, ex2, ex3
return mw.mcvsk
[docs]def tweedie_convert(*, p=None, μ=None, σ2=None, λ=None, α=None, β=None, m=None, cv=None):
"""
Translate between Tweedie parameters. Input p, μ, σ2 or λ, α, β or λ, m, cv. Remaining
parameters are computed and returned in pandas Series.
p, μ, σ2 are the reproductive parameters, μ is the mean and the variance equals σ2 μ^p
λ, α, β are the additive parameters; λαβ is the mean, λα(α + 1) β^2 is the variance
(α is the gamma shape and β is the scale).
λ, m, cv specify the compound Poisson with expected claim count λ and gamma with mean m and cv
In addition, returns p0, the probability mass at 0.
"""
if μ is None:
if α is None:
# λ, m, cv spec directly as compound Poisson
assert λ is not None and m is not None and cv is not None
α = 1 / cv ** 2
β = m / α
else:
# λ, α, β in additive form
assert λ is not None and α is not None and β is not None
m = α * β
cv = α ** -0.5
p = (2 + α) / (1 + α)
μ = λ * m
σ2 = λ * α * (α + 1) * β ** 2 / μ ** p
else:
# p, μ, σ2 in reproductive form
assert p is not None and μ is not None and σ2 is not None
α = (2 - p) / (p - 1)
λ = μ**(2-p) / ((2-p) * σ2)
β = μ / (λ * α)
m = α * β
cv = α ** -0.5
p0 = np.exp(-λ)
twcv = np.sqrt(σ2 * μ ** p) / μ
ans = pd.Series([μ, p, σ2, λ, α, β, twcv, m, cv, p0],
index=['μ', 'p', 'σ^2', 'λ', 'α', 'β', 'tw_cv', 'sev_m', 'sev_cv', 'p0'])
return ans
[docs]def tweedie_density(x, *, p=None, μ=None, σ2=None, λ=None, α=None, β=None, m=None, cv=None):
"""
Exact density of Tweedie distribution from series expansion.
Use any parameterization and convert between them with Tweedie convert.
Coded for clarity and flexibility not speed. See ``tweedie_convert``
for parameterization.
"""
pars = tweedie_convert(p=p, μ=μ, σ2=σ2, λ=λ, α=α, β=β, m=m, cv=cv)
λ = pars['λ']
α = pars['α']
β = pars['β']
if x == 0:
return np.exp(-λ)
# reasonable max n from normal approx to Poisson
maxn = λ + 4 * λ ** 0.5
logl = np.log(λ)
logx = np.log(x)
logb = np.log(β)
const = -λ - x / β
ans = 0.0
for n in range(1, 2000):
log_term = (const +
+ n * logl +
+ (n * α - 1) * logx +
- loggamma(n+1) +
- loggamma(n * α) +
- n * α * logb)
ans += np.exp(log_term)
if n > maxn or (n >λ and log_term < -227):
break
return ans
[docs]def frequency_examples(n, ν, f, κ, sichel_case, log2, xmax=500):
"""
Illustrate different frequency distributions and frequency moment
calculations.
sichel_case = gamma | ig | ''
Sample call: ::
df, ans = frequency_examples(n=100, ν=0.45, f=0.5, κ=1.25,
sichel_case='', log2=16, xmax=2500)
:param n: E(N) = expected claim count
:param ν: CV(mixing) = asymptotic CV of any compound aggregate whose severity has a second moment
:param f: proportion of certain claims, 0 <= f < 1, higher f corresponds to greater skewnesss
:param κ: (kappa) claims per occurrence
:param sichel_case: gamma, ig or ''
:param xmax:
"""
def ft(x):
return np.fft.fft(x)
def ift(x):
return np.fft.ifft(x)
def defuzz(x):
x[np.abs(x) < 5e-16] = 0
return x
def ma(x):
return list(MomentAggregator.static_moments_to_mcvsk(*x))
def row(ps):
moms = [(x ** k * ps).sum() for k in (1, 2, 3)]
stats = ma(moms)
return moms + stats + [np.nan]
def noncentral_n_moms_from_mixing_moms(n, c, g):
"""
c=Var(G), g=E(G^3) return EN, EN2, EN3
"""
return [n, n * (1 + (1 + c) * n), n * (1 + n * (3 * (1 + c) + n * g))]
def asy_skew(g, c):
"""
asymptotic skewnewss
"""
return [(g - 3 * c - 1) / c ** 1.5]
ans = pd.DataFrame(columns=['X', 'type', 'EX', 'EX2', 'EX3', 'mean', 'CV', 'Skew', 'Asym Skew'])
ans = ans.set_index(['X', 'type'])
N = 1 << log2
x = np.arange(N, dtype=float)
z = np.zeros(N)
z[1] = 1
fz = ft(z)
# build poisson for comparison
dist = 'poisson'
kernel = n * (fz - 1)
p = np.real(ift(np.exp(kernel)))
p = defuzz(p)
# for poisson c=0 and g=1 (the "mixing" distribution is G identically equal to 1)
ans.loc[(dist, 'empirical'), :] = row(p)
temp = noncentral_n_moms_from_mixing_moms(n, 0, 1)
ans.loc[(dist, 'theoretical'), :] = temp + ma(temp) + [np.inf]
ans.loc[(dist, 'diff'), :] = ans.loc[(dist, 'empirical'), :] / ans.loc[(dist, 'theoretical'), :] - 1
# negative binomial
# Var(G) = c, E(G)=1 so Var(G) = ν^2 = c
# wikipedia / scipy stats use a and θ for shape and scale
dist = 'neg bin'
c = ν * ν # contagion
a = 1 / c
θ = c
# E(G^3): skew(G) = skew(G') = γ, so E(G-EG)^3 = γν^3, so EG^3 = γν^3 + 3(1+c) - 2 = γν^3 + 3c + 1
# for Gamma skew = 2 / sqrt(a) = 2ν
g = 1 + 3 * c + 2 * c * c
nb = np.real(ift((1 - θ * kernel) ** -a))
nb = defuzz(nb)
ans.loc[(dist, 'empirical'), :] = row(nb)
# this row is generic: it applies to all distributions
temp = noncentral_n_moms_from_mixing_moms(n, c, g)
ans.loc[(dist, 'theoretical'), :] = temp + ma(temp) + asy_skew(g, c)
ans.loc[(dist, 'diff'), :] = ans.loc[(dist, 'empirical'), :] / ans.loc[(dist, 'theoretical'), :] - 1
# delaporte G = f + G'
dist = 'delaporte'
c = ν * ν # contagion
a = ((1 - f) / ν) ** 2
θ = (1 - f) / a
g = 2 * ν ** 4 / (1 - f) + 3 * c + 1
delaporte = np.real(ift(np.exp(f * kernel) * (1 - θ * kernel) ** -a))
delaporte = defuzz(delaporte)
ans.loc[(dist, 'empirical'), :] = row(delaporte)
temp = noncentral_n_moms_from_mixing_moms(n, c, g)
ans.loc[(dist, 'theoretical'), :] = temp + ma(temp) + asy_skew(g, c)
ans.loc[(dist, 'diff'), :] = ans.loc[(dist, 'empirical'), :] / ans.loc[(dist, 'theoretical'), :] - 1
# pig
dist = 'pig'
c = ν * ν # contagion
μ = c
λ = 1 / μ
# our param (λ, μ) --> (λ, λμ) in Mathematica and hence skew = γ = 3 * sqrt(μ) in scip py parameterization
γ = 3 * np.sqrt(μ)
g = γ * ν ** 3 + 3 * c + 1
pig = np.real(ift(np.exp(1 / μ * (1 - np.sqrt(1 - 2 * μ ** 2 * λ * kernel)))))
pig = defuzz(pig)
# print(f'PIG parameters μ={μ} and λ={λ}, g={g}, γ={γ}')
ans.loc[(dist, 'empirical'), :] = row(pig)
temp = noncentral_n_moms_from_mixing_moms(n, c, g)
ans.loc[(dist, 'theoretical'), :] = temp + ma(temp) + asy_skew(g, c)
ans.loc[(dist, 'diff'), :] = ans.loc[(dist, 'empirical'), :] / ans.loc[(dist, 'theoretical'), :] - 1
# shifted pig
dist = 'shifted pig'
c = ν * ν # contagion
μ = c / (1 - f) ** 2
λ = (1 - f) / μ
γ = 3 * np.sqrt(μ)
g = γ * ν ** 3 + 3 * c + 1
shifted_pig = np.real(ift(np.exp(f * kernel) * np.exp(1 / μ * (1 - np.sqrt(1 - 2 * μ ** 2 * λ * kernel)))))
shifted_pig = defuzz(shifted_pig)
ans.loc[(dist, 'empirical'), :] = row(shifted_pig)
temp = noncentral_n_moms_from_mixing_moms(n, c, g)
ans.loc[(dist, 'theoretical'), :] = temp + ma(temp) + asy_skew(g, c)
ans.loc[(dist, 'diff'), :] = ans.loc[(dist, 'empirical'), :] / ans.loc[(dist, 'theoretical'), :] - 1
# poisson pascal
# parameters
dist = 'poisson pascal'
# solve for local c to hit overall c=ν^2 value input
c = (n * ν ** 2 - 1 - κ) / κ
a = 1 / c
θ = κ * c
λ = n / κ # poisson parameter for number of claims
pois_pascal = np.real(ift(np.exp(λ * ((1 - θ * (fz - 1)) ** -a - 1))))
pois_pascal = defuzz(pois_pascal)
ans.loc[(dist, 'empirical'), :] = row(pois_pascal)
# moments for the PP are different can't use noncentral__nmoms_from_mixing_moms
g = κ * λ * (
2 * c ** 2 * κ ** 2 + 3 * c * κ ** 2 * λ + 3 * c * κ ** 2 + 3 * c * κ + κ ** 2 * λ ** 2 + 3 * κ ** 2 * λ + κ ** 2 + 3 * κ * λ + 3 * κ + 1)
g2 = n * (
2 * c ** 2 * κ ** 2 + 3 * c * n * κ + 3 * c * κ ** 2 + 3 * c * κ + n ** 2 + 3 * n * κ + 3 * n + κ ** 2 + 3 * κ + 1)
assert np.allclose(g, g2)
temp = [λ * κ, n * (κ * (1 + c + λ) + 1), g]
ans.loc[(dist, 'theoretical'), :] = temp + ma(temp) + [
np.nan] # note: hard to interpret this, but for FIXED cv of NB it tends to zero
ans.loc[(dist, 'diff'), :] = ans.loc[(dist, 'empirical'), :] / ans.loc[(dist, 'theoretical'), :] - 1
# sichel: find sichel to increase mixing skewness indicated amount from IG skewness
# ======
# three flavors
# 1. sichel cv, lambda solve for beta and mu to hit mean and cv given lambda
# lambda = -0.5 = ig; lambda = 0.5 inverse IG
# 2. sichel.gamma cv, certain match moments to negative binomial
# 3. sichel.ig cv, certain match moments to shifted ig distribution
dist = 'sichel'
c = ν * ν # contagion
if sichel_case == 'gamma':
# sichel_case 2: match delaporte moments
# G = f + G'; E(G') = 1 - f, SD(G) = SD(G') = ν, skew(G') = skew(G)
# a = ((1 - f) / ν) ** 2
# FWIW θ = ν / (1 - f) # (1 - f) / a
target = np.array([1, ν, 2 * ν / (1 - f)]) # / np.sqrt(a)])
elif sichel_case == 'ig':
# match shifted IG moments
# μ = (ν / (1 - f)) ** 2
target = np.array([1, ν, 3.0 * ν / (1 - f)]) # np.sqrt(μ)])
elif sichel_case == '':
# input lambda, match mean = 1 and cv (input)
pass
else:
raise ValueError("Idiot")
add_sichel = True
if sichel_case in ('gamma', 'ig'):
# need starting parameters (Panjer and Willmost format
if sichel_case == 'gamma':
λ = -0.5
else:
λ = -0.5
μ = 1
β = ν ** 2
def f(arrIn):
"""
calibration function to match target mean, cv and skewness (keeps the scale about the same)
:param arrIn:
:return:
"""
μ, β, λ = arrIn
# mu and beta are positive...
μ = np.exp(μ)
β = np.exp(β)
ex1, ex2, ex3 = np.array([μ ** r * kv(λ + r, μ / β) / kv(λ, μ / β) for r in (1, 2, 3)])
sd = np.sqrt(ex2 - ex1 * ex1)
skew = (ex3 - 3 * ex2 * ex1 + 2 * ex1 ** 3) / (sd ** 3)
return np.array([ex1, sd, skew]) - target
try:
params1 = broyden2(f, (np.log(μ), np.log(β), λ), verbose=False, iter=10000,
f_rtol=1e-11) # , f_rtol=1e-9) , line_search='wolfe'
params2 = newton_krylov(f, (np.log(μ), np.log(β), λ), verbose=False, iter=10000,
f_rtol=1e-11) # , f_rtol=1e-9) , line_search='wolfe'
if np.sum((params1 - params2) ** 2) > 0.05:
print(f'Broyden {params1}\nNewton K {params2}')
m1 = np.sum(np.abs(params1))
m2 = np.sum(np.abs(params2))
if m1 < m2:
print(f'selecting Broyden {params1}')
params = params1
else:
print(f'selecting Newton K {params2}')
params = params2
else:
print(f'Two estimates similar, selecting Bry {params1}, {params2}')
params = params1
except NoConvergence as e:
print('ERROR: broyden did not converge')
print(e)
add_sichel = False
raise e
elif sichel_case == '':
# input lambda match mean = 1 and cv
λ = κ # will actually be freq_b
# need starting parameters (for lamda = -0.5) Panjer Willmot format
μ = 1
β = ν ** 2
target = np.array([1, ν])
def f(arrIn):
"""
calibration function to match target mean = 1 and cv
:param arrIn:
:return:
"""
μ, β = arrIn
# mu and beta are positive...
μ = np.exp(μ)
β = np.exp(β)
ex1, ex2 = np.array([μ ** r * kv(λ + r, μ / β) / kv(λ, μ / β) for r in (1, 2)])
sd = np.sqrt(ex2 - ex1 * ex1)
return np.array([ex1, sd]) - target
try:
params = broyden2(f, (np.log(μ), np.log(β)), verbose=False, iter=10000,
f_rtol=1e-11) # , f_rtol=1e-9) , line_search='wolfe'
except NoConvergence as e:
print('ERROR: broyden did not converge')
print(e)
add_sichel = False
raise e
else:
raise ValueError("Idiot ")
# if parameters found...
if add_sichel:
if sichel_case == '':
μ, β = params
else:
μ, β, λ = params
μ, β = np.exp(μ), np.exp(β)
print(f'Sichel params {(μ, β, λ)}')
# theoretic noncentral moments of the **mixing** distribution = Factorial of N
# have calibrated to EG=1 so use same kernel n * (z - 1)
# compute density
inner = np.sqrt(1 - 2 * β * kernel)
sichel = np.real(ift(inner ** (-λ) * kv(λ, μ * inner / β) / kv(λ, μ / β)))
sichel = defuzz(sichel)
ans.loc[(dist, 'empirical'), :] = row(sichel)
mw = MomentWrangler()
junk = [μ ** r * kv(λ + r, μ / β) / kv(λ, μ / β) for r in (1, 2, 3)]
g = junk[2] # non central G
temp = noncentral_n_moms_from_mixing_moms(n, c, g)
print('Noncentral N from mixing moms ', temp)
mw.factorial = junk
print('Non central N moms ', mw.noncentral)
print('Empirical central N moms ', row(sichel))
ans.loc[(dist, 'theoretical'), :] = temp + ma(temp) + asy_skew(g, c)
ans.loc[(dist, 'diff'), :] = ans.loc[(dist, 'empirical'), :] / ans.loc[(dist, 'theoretical'), :] - 1
# ---------------------------------------------------------------------
# sum of all errors is small
print(ans.loc[(slice(None), 'diff'), :].abs().sum().sum())
# assert ans.loc[(slice(None), 'diff'), :].abs().sum().sum() < 1e-6
# graphics
df = pd.DataFrame(dict(x=x, poisson=p, pois_pascal=pois_pascal, negbin=nb, delaporte=delaporte, pig=pig,
shifted_pig=shifted_pig, sichel=sichel))
df = df.query(f' x < {xmax} ')
df = df.set_index('x')
axiter = axiter_factory(None, 12, aspect=1.414, nr=4)
all_dist = ['poisson', 'negbin', 'delaporte', 'pig', 'shifted_pig', 'pois_pascal', 'sichel']
for vars in [all_dist,
['poisson', 'negbin', 'pig', ],
['poisson', 'negbin', 'delaporte', ],
['poisson', 'pig', 'shifted_pig', 'sichel'],
['poisson', 'negbin', 'pois_pascal'],
['poisson', 'delaporte', 'shifted_pig', 'sichel'],
]:
# keep the same colors
# FIX TODO
print('FIX palette')
# pal = [sns.color_palette("Paired", 7)[i] for i in [all_dist.index(j) for j in vars]]
df[vars].plot(kind='line', ax=next(axiter)) #, color=pal)
axiter.ax.set_xlim(0, 4 * n)
df[vars].plot(kind='line', logy=True, ax=next(axiter), legend=None) # , color='virdis')
axiter.tidy()
display(ans.unstack())
return df, ans
[docs]class Answer(dict):
# TODO replace with collections.namedtuple? Or at least, stop using it!
# Maybe not, namedtuples are immutable
[docs] def __init__(self, **kwargs):
"""
Generic answer wrapping class with plotting
:param kwargs: key=value to wrap
"""
super().__init__(kwargs)
def __getattr__(self, item):
return self[item]
def __repr__(self):
return str(self.list())
def __str__(self):
return self.list()
[docs] def list(self):
""" List elements """
return pd.DataFrame(zip(self.keys(),
[self.nice(v) for v in self.values()]),
columns=['Item', 'Type']).set_index('Item')
_repr_html_ = list
def __str__(self):
return '\n'.join([f'{i[0]:<20s}\t{i[1]}'
for i in zip(self.keys(), [self.nice(v) for v in self.values()])
])
[docs] @staticmethod
def nice(x):
""" return a nice rep of x """
if type(x) in [str, float, int]:
return x
else:
return type(x)
[docs] def summary(self):
"""
just print out the dataframes: horz or vertical as appropriate
reasonable styling
:return:
"""
for k, v in self.items():
if isinstance(v, pd.core.frame.DataFrame):
print(f'\n{k}\n{"=" * len(k)}\n')
if v.shape[1] > 12:
display(v.head(5).T)
else:
display(v.head(10))
[docs]def log_test():
""""
Issue logs at each level
"""
print('Issuing five messages...')
for l, n in zip([logger], ['logger']):
print(n)
l.debug('A debug message')
l.info('A info message')
l.warning('A warning message')
l.error('A error message')
l.critical('A critical message')
print(f'...done with {n}')
print('...done')
class LoggerManager():
def __init__(self, level, name='aggregate'):
"""
Manage all the aggregate loggers: toggle levels
Put lm = LoggerManager(10) at the start of a function.
When it goes out of scope it puts the level back
where it was.
TODO: make work on a per logger basis!
"""
self.level = level
self.loggers = [v for k, v in logging.root.manager.loggerDict.items()
if isinstance(v, logging.Logger) and k.find(name) >= 0 and v.getEffectiveLevel() != level]
if len(self.loggers) > 0:
self.base_level = self.loggers[0].getEffectiveLevel()
for l in self.loggers:
l.setLevel(level)
def __del__(self):
for l in self.loggers:
logger.info(f'Putting logger level back to {self.base_level} from {self.level}')
l.setLevel(self.base_level)
[docs]def logger_level(level=30, name='aggregate', verbose=False):
"""
Code from common.py
Change logger level all loggers containing name
Changing for EVERY logger is a really bad idea,
you get the endless debug info out of matplotlib
find_font, for exapmle.
FWIW, to list all loggers:
::
loggers = [logging.getLogger()] # get the root logger
loggers = loggers + [logging.getLogger(name) for name in logging.root.manager.loggerDict]
loggers
:param level:
:return:
"""
try:
# logging.basicConfig(format='%(asctime)s.%(msecs)03d|%(lineno)4d|%(levelname)-10s| %(name)s.%(funcName)s| %(message)-s',
logging.basicConfig(format='line %(lineno)4d|%(levelname)-10s| %(name)s.%(funcName)s| %(message)-s',
datefmt='%M:%S')
loggers = [logging.getLogger()] # get the root logger
loggers = loggers + \
[logging.getLogger(name)
for name in logging.root.manager.loggerDict]
# set the level selectively
for logger in loggers:
if logger.name.find(name) >= 0:
logger.setLevel(level)
if verbose:
for logger in loggers:
print(logger.name, logger.getEffectiveLevel())
except ValueError as e:
raise e
[docs]def subsets(x):
"""
all non empty subsets of x, an interable
"""
return list(itertools.chain.from_iterable(
itertools.combinations(x, n) for n in range(len(x) + 1)))[1:]
# new graphics methods
[docs]def nice_multiple(mx):
"""
Suggest a nice multiple for an axis with scale 0 to mx. Used by the MultipleLocator in discrete plots,
where you want an integer multiple. Return 0 to let matplotlib figure the answer. Real issue is stopping
multiples like 2.5.
:param mx:
:return:
"""
m = mx / 6
if m < 0:
return 0
m = mx // 6
m = {3: 2, 4: 5, 6: 5, 7: 5, 8: 10, 9: 10}.get(m, m)
if m < 10:
return m
# punt back to mpl for larger values
return 0
[docs]def knobble_fonts(color=False):
"""
Not sure we should get into this...
See FigureManager in Great or common.py
https://matplotlib.org/3.1.1/tutorials/intermediate/color_cycle.html
https://matplotlib.org/3.1.1/users/dflt_style_changes.html#colors-in-default-property-cycle
https://matplotlib.org/2.0.2/examples/color/colormaps_reference.html
https://matplotlib.org/3.1.0/gallery/lines_bars_and_markers/linestyles.html
https://stackoverflow.com/questions/22408237/named-colors-in-matplotlib
"""
# reset everything
plt.rcdefaults()
# this sets a much smaller base fontsize
# everything scales off font size
plt.rcParams['font.size'] = FONT_SIZE
# mpl default is medium
plt.rcParams['legend.fontsize'] = LEGEND_FONT
# color set up
if color:
plt.rcParams["axes.facecolor"] = PLOT_FACE_COLOR
# note plt.rc lets you set multiple related properties at once:
plt.rc('legend', fc=PLOT_FACE_COLOR, ec=PLOT_FACE_COLOR)
plt.rcParams['figure.facecolor'] = FIGURE_BG_COLOR
else:
# graphics defaults - better res graphics
plt.rcParams['figure.dpi'] = 300
plt.rc('legend', fc="white", ec="white")
default_colors = [(0,0,0)]
default_ls = ['solid', 'dashed', 'dotted', 'dashdot']
props = []
cc = [i[1] for i in product(default_ls, default_colors)]
lsc = [i[0] for i in product(default_ls, default_colors)]
props.append(cycler('color', cc))
props.append(cycler('linestyle', lsc))
# combine all cyclers
cprops = props[0]
for c in props[1:]:
cprops += c
mpl.rcParams['axes.prop_cycle'] = cycler(cprops)
# fonts: add some better fonts as earlier defaults
mpl.rcParams['font.serif'] = ['STIX Two Text', 'Times New Roman', 'DejaVu Serif']
# 'Nirmala UI' has poor glyph coverage, removed as an option
mpl.rcParams['font.sans-serif'] = ['Myriad Pro', 'Segoe UI', 'DejaVu Sans']
mpl.rcParams['font.monospace'] = ['Ubuntu Mono', 'QuickType II Mono', 'Cascadia Mono', 'DejaVu Sans Mono']
# this matches html output better
mpl.rcParams['font.family'] = 'sans-serif'
mpl.rcParams['mathtext.fontset'] = 'stixsans'
pd.options.display.width = 120
# styling - greys verions
[docs]def style_df(df):
"""
Style a df similar to pricinginsurancerisk.com styles.
graph background color is B4C3DC and figure (paler) background is F1F8F#
Dropped row lines; bold level0, caption
:param df:
:return: styled dataframe
"""
cell_hover = {
'selector': 'td:hover',
'props': [('background-color', '#ffffb3')]
}
index_names = {
'selector': '.index_name',
'props': 'font-style: italic; color: white; background-color: #777777; '
'font-weight:bold; border: 1px solid white; text-transform: capitalize; '
'text-align:left;'
}
headers = {
'selector': 'th:not(.index_name)',
'props': 'background-color: #DDDDDD; color: black; border: 1px solid #ffffff;'
}
center_heading = {
'selector': 'th.col_heading',
'props': 'text-align: center;'
}
left_index = {
'selector': '.row_heading',
'props': 'text-align: left;'
}
td = {
'selector': 'td',
'props': f'text-align: right;'
}
all_styles = [cell_hover, index_names, headers, center_heading, left_index, td]
return df.style.set_table_styles(all_styles)
[docs]def friendly(df):
"""
Attempt to format df "nicely", in a user-friendly manner. Not designed for big dataframes!
:param df:
:return:
"""
def ur(x):
# simple ultimate renamer
if type(x) == str:
sx = x.split("_")
if len(sx) > 1:
x = ' '.join([rn(i) for i in sx])
else:
x = rn(x).title()
return x
def rn(x):
# specific renamings...
return {
'freq': 'Freqency',
'sev': 'Severity',
'agg': 'Aggregate',
'el': 'Expected Loss',
'm': 'Mean',
'cv': 'CV', 'skew': 'Skewness', 'kurt': 'Kurtosis',
'L': 'Exp Loss', 'P': 'Premium', 'PQ': 'Leverage',
'LR': 'Loss Ratio',
'M': 'Margin', 'Q': 'Capital', 'a': 'Assets',
'Roe': 'ROE'
}.get(x,x)
bit = df.rename(index=ur).rename(columns=ur)
# style like pricinginsurancerisk.com
# return style_df(bit).format(lambda x: x if type(x)==str else f'{x:,.3f}')
return bit
class FigureManager():
def __init__(self, cycle='c', lw=1.5, color_mode='mono', k=0.8, font_size=12,
legend_font='small', default_figsize=(FIG_W, FIG_H)):
"""
Another figure/plotter manager: manages cycles for color/black and white
from Great utilities.py, edited and stripped down
combined with lessons from MetaReddit on matplotlib options for fonts, background
colors etc.
Font size was 9 and legend was x-small
Create figure with common defaults
cycle = cws
c - cycle colors
w - cycle widths
s - cycle styles
o - styles x colors, implies csw and w=single number (produces 8 series)
lw = default line width or [lws] of length 4
smaller k overall darker lines; colors are equally spaced between 0 and k
k=0.8 is a reasonable range for four colors (0, k/3, 2k/3, k)
https://matplotlib.org/3.1.1/tutorials/intermediate/color_cycle.html
https://matplotlib.org/3.1.1/users/dflt_style_changes.html#colors-in-default-property-cycle
https://matplotlib.org/2.0.2/examples/color/colormaps_reference.html
https://matplotlib.org/3.1.0/gallery/lines_bars_and_markers/linestyles.html
https://stackoverflow.com/questions/22408237/named-colors-in-matplotlib
"""
assert len(cycle) > 0
# this sets a much smaller base fontsize
# plt.rcParams.update({'axes.titlesize': 'large'})
# plt.rcParams.update({'axes.labelsize': 'small'})
# list(map(plt.rcParams.get, ('axes.titlesize', 'font.size')))
# everything scales off font size
plt.rcParams['font.size'] = font_size
# mpl default is medium
plt.rcParams['legend.fontsize'] = legend_font
# see https://matplotlib.org/stable/gallery/color/named_colors.html
self.plot_face_color = PLOT_FACE_COLOR
self.figure_bg_color = FIGURE_BG_COLOR
# graphics set up
plt.rcParams["axes.facecolor"] = self.plot_face_color
# note plt.rc lets you set multiple related properties at once:
plt.rc('legend', fc=self.plot_face_color, ec=self.plot_face_color)
# is equivalent to two calls:
# plt.rcParams["legend.facecolor"] = self.plot_face_color
# plt.rcParams["legend.edgecolor"] = self.plot_face_color
plt.rcParams['figure.facecolor'] = self.figure_bg_color
self.default_figsize = default_figsize
self.plot_colormap_name = 'cividis'
# fonts: add some better fonts as earlier defaults
mpl.rcParams['font.serif'] = ['STIX Two Text', 'Times New Roman', 'DejaVu Serif', 'Bitstream Vera Serif',
'Computer Modern Roman', 'New Century Schoolbook', 'Century Schoolbook L',
'Utopia', 'ITC Bookman',
'Bookman', 'Nimbus Roman No9 L', 'Times', 'Palatino', 'Charter', 'serif']
mpl.rcParams['font.sans-serif'] = ['Nirmala UI', 'Myriad Pro', 'Segoe UI', 'DejaVu Sans', 'Bitstream Vera Sans',
'Computer Modern Sans Serif', 'Lucida Grande', 'Verdana', 'Geneva', 'Lucid',
'Arial',
'sans-serif']
mpl.rcParams['font.monospace'] = ['Ubuntu Mono', 'QuickType II Mono', 'Cascadia Mono', 'DejaVu Sans Mono',
'Bitstream Vera Sans Mono', 'Computer Modern Typewriter', 'Andale Mono',
'Nimbus Mono L', 'Courier New',
'Courier', 'Fixed', 'Terminal', 'monospace']
mpl.rcParams['font.family'] = 'serif'
# or
# plt.rc('font', family='serif')
# much nicer math font, default is dejavusans
mpl.rcParams['mathtext.fontset'] = 'stixsans'
if color_mode == 'mono':
# https://stackoverflow.com/questions/20118258/matplotlib-coloring-line-plots-by-iteration-dependent-gray-scale
# default_colors = ['black', 'grey', 'darkgrey', 'lightgrey']
default_colors = [(i * k, i * k, i * k) for i in [0, 1 / 3, 2 / 3, 1]]
default_ls = ['solid', 'dashed', 'dotted', 'dashdot']
elif color_mode == 'cmap':
# print(plt.rcParams['axes.prop_cycle'].by_key()['color'])
norm = mpl.colors.Normalize(0, 1, clip=True)
cmappable = mpl.cm.ScalarMappable(
norm=norm, cmap=self.plot_colormap_name)
mapper = cmappable.to_rgba
default_colors = list(map(mapper, np.linspace(0, 1, 10)))
default_ls = ['solid', 'dashed',
'dotted', 'dashdot', (0, (5, 1))] * 2
else:
default_colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2',
'#7f7f7f', '#bcbd22', '#17becf']
default_ls = ['solid', 'dashed',
'dotted', 'dashdot', (0, (5, 1))] * 2
props = []
if 'o' in cycle:
n = len(default_colors) // 2
if color_mode == 'mono':
cc = [i[1] for i in product(default_ls, default_colors[::2])]
else:
cc = [i[1] for i in product(default_ls, default_colors[:n])]
lsc = [i[0] for i in product(default_ls, default_colors[:n])]
props.append(cycler('color', cc))
props.append(
cycler('linewidth', [lw] * (len(default_colors) * len(default_ls) // 2)))
props.append(cycler('linestyle', lsc))
else:
if 'c' in cycle:
props.append(cycler('color', default_colors))
else:
props.append(
cycler('color', [default_colors[0]] * len(default_ls)))
if 'w' in cycle:
if type(lw) == int:
props.append(
cycler('linewidth', [lw] * len(default_colors)))
else:
props.append(cycler('linewidth', lw))
if 's' in cycle:
props.append(cycler('linestyle', default_ls))
# combine all cyclers
cprops = props[0]
for c in props[1:]:
cprops += c
mpl.rcParams['axes.prop_cycle'] = cycler(cprops)
def make_fig(self, nr=1, nc=1, figsize=None, xfmt='great', yfmt='great',
places=None, power_range=(-3, 3), sep='', unit='', sci=True,
mathText=False, offset=True, **kwargs):
"""
make grid of axes
apply format to xy axes
xfmt='d' for default axis formatting, n=nice, e=engineering, s=scientific, g=great
great = engineering with power of three exponents
"""
if figsize is None:
figsize = self.default_figsize
f, axs = plt.subplots(nr, nc, figsize=figsize,
constrained_layout=True, squeeze=False, **kwargs)
for ax in axs.flat:
if xfmt[0] != 'd':
FigureManager.easy_formatter(ax, which='x', kind=xfmt, places=places,
power_range=power_range, sep=sep, unit=unit, sci=sci, mathText=mathText,
offset=offset)
if yfmt[0] != 'default':
FigureManager.easy_formatter(ax, which='y', kind=yfmt, places=places,
power_range=power_range, sep=sep, unit=unit, sci=sci, mathText=mathText,
offset=offset)
if nr * nc == 1:
axs = axs[0, 0]
self.last_fig = f
return f, axs
__call__ = make_fig
@staticmethod
def easy_formatter(ax, which, kind, places=None, power_range=(-3, 3), sep='', unit='', sci=True,
mathText=False, offset=True):
"""
set which (x, y, b, both) to kind = sci, eng, nice
nice = engineering but uses e-3, e-6 etc.
see docs for ScalarFormatter and EngFormatter
"""
def make_fmt(kind, places, power_range, sep, unit):
if kind == 'sci' or kind[0] == 's':
fm = ticker.ScalarFormatter()
fm.set_powerlimits(power_range)
fm.set_scientific(True)
elif kind == 'eng' or kind[0] == 'e':
fm = ticker.EngFormatter(unit=unit, places=places, sep=sep)
elif kind == 'great' or kind[0] == 'g':
fm = GreatFormatter(
sci=sci, power_range=power_range, offset=offset, mathText=mathText)
elif kind == 'nice' or kind[0] == 'n':
fm = ticker.EngFormatter(unit=unit, places=places, sep=sep)
fm.ENG_PREFIXES = {
i: f'e{i}' if i else '' for i in range(-24, 25, 3)}
else:
raise ValueError(f'Passed {kind}, expected sci or eng')
return fm
# what to set
if which == 'b' or which == 'both':
which = ['xaxis', 'yaxis']
elif which == 'x':
which = ['xaxis']
else:
which = ['yaxis']
for w in which:
fm = make_fmt(kind, places, power_range, sep, unit)
getattr(ax, w).set_major_formatter(fm)
[docs]@lru_cache()
def ic_noise(n, d):
"""
Implements steps 1, 2, 3, 4, 5, and 6
This is bottleneck function, therefore cache it
It handles the true-up of the random sample to ensure it is exactly independent
:param n: row
:param d: columns
:return:
"""
# step 1: make a reference n x d random uncorrelated normal sample
p = [ss.norm.ppf( x / (n + 1)) for x in range(1, n+1)]
# mean is zero...but belt and braces
p = (p - np.mean(p)) / np.std(p)
# space for answer
score = np.zeros((n, d))
# steps 2 and 3
for j in range(0, score.shape[1]):
# shuffle each column
score[:, j] = ar.RANDOM.permutation(p)
# actual correlation of reference (this will be close to, but not equal to, the identity)
# @ denotes matrix multiplication
# step 4 and 5
E = np.linalg.cholesky((score.T @ score) / n)
# sample with exact desired correlation
# step 6
return score @ np.linalg.inv(E.T)
[docs]@lru_cache()
def ic_t_noise(n, d, dof):
"""
as above using multivariate t distribution noise
"""
mvt = multivariate_t([0.]*d, 1, df=dof)
score = mvt.rvs(n)
# actual correlation of reference (this will be close to, but not equal to, the identity)
# @ denotes matrix multiplication
# step 4 and 5
E = np.linalg.cholesky((score.T @ score) / n)
# sample with exact desired correlation
# step 6
return score @ np.linalg.inv(E.T)
[docs]def ic_rank(N):
"""
rankdata function: assign ranks to data, dealing with ties appropriately
work by column
N is a numpy array
"""
rank = np.zeros((N.shape[0], N.shape[1]))
for j in range(0, N.shape[1]):
rank[:, j] = ss.rankdata(N[:, j], method='ordinal')
return rank.astype(int) - 1
[docs]def ic_reorder(ranks, samples):
"""
put samples into the order determined by ranks
array is calibrated to the reference distribution
space for the answer
"""
rank_samples = np.zeros((samples.shape[0], samples.shape[1]))
for j in range(0, samples.shape[1]):
s = np.sort(samples[:, j])
rank_samples[:, j] = s[ranks[:,j]]
return rank_samples
[docs]def iman_conover(marginals, desired_correlation, dof=0, add_total=True):
"""
Perform Iman Conover shuffling on input marginals to achieve desired_correlation
Desired_correlation must be positive definite and of the correct size.
The result has the same rank correlation as a reference sample with the
desired linear correlation. Thus, the process relies on linear and rank
correlation (for the reference and the input sample) being close.
if dof==0 use normal scores; else you mv t
Sample code:
::
n = 100
df = pd.DataFrame({ f'line_{i}': ss.lognorm(.1 + .2*np.random.rand(),
scale=10000).rvs(n) for i in range(3)})
desired = np.matrix([[1, -.3, 0], [-.3, 1, .8], [0, .8, 1]])
print(desired)
# check it is a corr matrix
np.linalg.cholesky(desired)
df2 = iman_conover(df, desired)
df2.corr()
df_scatter(df2)
Iman Conover Method
**Make rank order the same as a reference sample with desired correlation structure.**
Reference sample usually chosen as multivariate normal because it is easy and flexible, but you can use **any** reference, e.g. copula based.
The old @Risk software used Iman Conover.
Input: matrix $\\mathbf X$ of marginals and desired correlation matrix $\\mathbf S$
1. Make one column of scores $a_i=\\Phi^{-1}(i/(n+1))$ for $i=1,\\dots,n$ and rescale to have standard deviation one.
1. Copy the scores $r$ times to make the score matrix $\\mathbf M$.
1. Randomly permute the entries in each column of $\\mathbf M$.
1. Compute the correlation matrix $n^{-1}\\mathbf M'\\mathbf M$ of the sample scores $\\mathbf M$.
1. Compute the Choleski decomposition $n^{-1}\\mathbf M^t\\mathbf M=\\mathbf E\\mathbf E^t$ of the score correlation matrix.
1. Compute $\\mathbf M' = \\mathbf M(\\mathbf E^t)^{-1}$, which is exactly uncorrelated.
1. Compute the Choleski decomposition $\\mathbf S=\\mathbf C\\mathbf C^t$ of the desired correlation matrix $\\mathbf S$.
1. Compute $\\mathbf T=\\mathbf M'\\mathbf C^t$. The matrix $\\mathbf T$ has exactly the desired correlation structure
1. Let $\\mathbf Y$ be the input matrix $\\mathbf X$ with each column reordered to have exactly the same **rank ordering** as the corresponding column of $\\mathbf T$.
Relies on the fact that rank (Spearman) and linear (Pearson) correlation are approximately the same.
"""
n, d = marginals.shape
# "square root" of "variance"
# step 7
C = np.linalg.cholesky(desired_correlation)
# make a perfectly uncorrelated reference: noise function = steps 1-6; product is step 8 (transposed)
if dof == 0:
N = ic_noise(n, d) @ C.T
else:
N = ic_t_noise(n, d, dof) @ C.T
# required ordering of marginals determined by reference sample, step 9
R = ic_rank(N)
# re order
if type(marginals) == np.ndarray:
shuffled_marginals = ic_reorder(R, marginals)
df = pd.DataFrame(shuffled_marginals)
else:
shuffled_marginals = ic_reorder(R, marginals.to_numpy())
df = pd.DataFrame(shuffled_marginals, columns=marginals.columns)
# add total if requested
if add_total:
df['total'] = df.sum(axis=1)
df = df.set_index('total')
df = df.sort_index(ascending=False)
return df
[docs]def block_iman_conover(unit_losses, intra_unit_corrs, inter_unit_corr, as_frame=False):
"""
Apply Iman Conover to the unit loss blocks in ``unit_losses`` with correlation matrices in ``intra``.
Then determine the ordering for the unit totals with correlation ``inter``.
Re-order each unit, row by row, so that the totals have the desired correlation structure, but
leaving the intra unit correlation unchanged.
``unit_losses = [np.arrays or pd.Series]`` of losses by subunit within units, without totals
``len(unit_losses) == len(intra_unit corrs)``
For simplicity all normal copula; can add other later if required.
No totals input or output anywhere.
``if as_frame`` then a dataframe version returned, for auditing.
Here is some tester code, using great.test_df to make random unit losses. Vary num_units and
num_sims as required.
::
def bic_tester(num_units=3, num_sims=10000):
from aggregate import random_corr_matrix
# from great import test_df
# create samples
R = range(num_units)
unit_losses = [test_df(num_sims, 3 + i) for i in R]
totals = [u.sum(1) for u in unit_losses]
# manual dataframe to check against
manual = pd.concat(unit_losses + totals, keys=[f'Unit_{i}' for i in R] + ['Total' for i in R], axis=1)
# for input to method
unit_losses = [i.to_numpy() for i in unit_losses]
totals = [i.to_numpy() for i in totals]
# make corrs
intra_unit_corrs = [random_corr_matrix(i.shape[1], p=.5, positive=True) for i in unit_losses]
inter_unit_corr = random_corr_matrix(len(totals), p=1, positive=True)
# apply method
bic = block_iman_conover(unit_losses, intra_unit_corrs, inter_unit_corr, True)
# extract frame answer, put col names back
bic.frame.columns = manual.columns
dm = bic.frame
# achieved corr
for i, target in zip(dm.columns.levels[0], intra_unit_corrs + [inter_unit_corr]):
print(i)
print((dm[i].corr() - target).abs().max().max())
# print(dm[i].corr() - target)
# total corr across subunits
display(dm.drop(columns=['Total']).corr())
# total corr across subunits
display(dm.drop(columns=['Total']).corr())
return manual, bic, intra_unit_corrs, inter_unit_corr
manual, bic, intra, inter = bic_tester(3, 10000)
"""
if isinstance(unit_losses, dict):
unit_losses = unit_losses.values()
if isinstance(intra_unit_corrs, dict):
intra_unit_corrs = intra_unit_corrs.values()
# shuffle unit losses
# IC returns a dataframe
unit_losses = [iman_conover(l, c, dof=0, add_total=False).to_numpy() for l, c in zip(unit_losses, intra_unit_corrs)]
# extract totals
totals = [l.sum(1) for l in unit_losses]
totals = np.vstack(totals)
# apply the interunit correlation to totals: this code copies iman_conover because we want
# to keep the same ordering matrices
# block shuffle units; this code is IC by hand to keep track of R and apply to the units
d, n = totals.shape
# "square root" of "variance"
# step 7
C = np.linalg.cholesky(inter_unit_corr)
# make a perfectly uncorrelated reference: noise function = steps 1-6; product is step 8 (transposed)
N = ic_noise(n, d) @ C.T
# required ordering of marginals determined by reference sample, step 9
R = ic_rank(N)
# re-order totals and the corresponding unit losses
for i, (u, t) in enumerate(zip(unit_losses, totals)):
r = np.argsort(t)
unit_losses[i] = u[r]
totals[i] = t[r]
# put into the desired IC ordering,
for i, (u, t, r) in enumerate(zip(unit_losses, totals, R.T)):
totals[i] = t[r]
unit_losses[i] = u[r]
# assembled sample
combined = np.hstack(unit_losses)
if as_frame:
fr = pd.concat((pd.DataFrame(combined), pd.DataFrame(totals.T)), axis=1, keys=['units', 'totals'])
else:
fr = None
BlockImanConover = namedtuple('BlockImanConover', 'totals combined frame')
ans = BlockImanConover(totals, combined, fr)
return ans
[docs]def rearrangement_algorithm_max_VaR(df, p=0, tau=1e-3, max_n_iter=100):
"""
Implementation of the Rearragement Algorithm (RA). Determines the worst p-VaR
rearrangement of the input variables.
For loss random variables p is usually close to 1.
Embrechts, Paul, Giovanni Puccetti, and Ludger Ruschendorf, 2013, *Model uncertainty and
VaR aggregation*, Journal of Banking and Finance 37, 2750–2764.
**Worst-Case VaR**
Worst value at risk arrangement of marginals.
See `Actuarial Review article <https://ar.casact.org/the-re-arrangement-algorithm>`_.
Worst TVaR / Variance arrangement of bivariate data = pair best with worst, second best with
second worst, etc., called **countermonotonic** arangement.
More than 2 marginals: can’t *make everything negatively correlated with
everything else*. If :math:`X` and :math:`Y` are negatively correlated
and :math:`Y` and :math:`Z` are negatively correlated then :math:`X` and
:math:`Z` will be positively correlated.
Next best attempt: make :math:`X` countermonotonic to :math:`Y+Z`,
:math:`Y` to :math:`X+Z` and :math:`Z` to :math:`X+Y`. Basis of
**rearrangement algorithm**.
*The Rearrangement Algorithm*
1. Randomly permute each column of :math:`X`, the :math:`N\\times d`
matrix of top :math:`1-p` observations
2. Loop
- Create a new matrix :math:`Y` as follows. For column
:math:`j=1,\\dots,d`
- Create a temporary matrix :math:`V_j` by deleting the
:math:`j`\ th column of :math:`X`
- Create a column vector :math:`v` whose :math:`i`\ th element
equals the sum of the elements in the :math:`i`\ th row of
:math:`V_j`
- Set the :math:`j`\ th column of :math:`Y` equal to the
:math:`j`\ th column of :math:`X` arranged to have the opposite
order to :math:`v`, i.e. the largest element in the
:math:`j`\ th column of :math:`X` is placed in the row of
:math:`Y` corresponding to the smallest element in :math:`v`,
the second largest with second smallest, etc.
- Compute :math:`y`, the :math:`N\\times 1` vector with
:math:`i`\ th element equal to the sum of the elements in the
:math:`i`\ th row of :math:`Y` and let :math:`y^*=\min(y)` be the
smallest element of :math:`y` and compute :math:`x^*` from
:math:`X` similarly
- If :math:`y^*-x^* \\ge \\epsilon` then set :math:`X=Y` and repeat
the loop
- If :math:`y^*-x^* < \\epsilon` then break from the loop
3. The arrangement :math:`Y` is an approximation to the worst
:math:`\text{VaR}_p` arrangement of :math:`X`.
:param df: Input DataFrame containing samples from each marginal. RA will only combine the
top 1-p proportion of values from each marginal.
:param p: If ``p==0`` assume df has already truncated to the top p values (for each marginal).
Otherwise truncate each at the ``int(1-p * len(df))``
:param tau: simulation tolerance
:param max_iter: maximum number of iterations to attempt
:return: the top 1-p values of the rearranged DataFrame
"""
sorted_marginals = {}
# worst N shuffled
if p:
N = int(np.round((1 - p) * len(df), 0))
else:
N = len(df)
# container for answer
df_out = pd.DataFrame(columns=df.columns, dtype=float)
# iterate over each column, sort, truncate (if p>0)
for m in df:
sorted_marginals[m] = df[m].sort_values(ascending=False).reset_index(drop=True).iloc[:N]
df_out[m] = ar.RANDOM.permutation(sorted_marginals[m])
# change in VaR and last VaR computed, to control looping
chg_var = max(100, 2 * tau)
last_var = 2 * chg_var
# iteration counter for reporting
n_iter = 0
while abs(chg_var) > tau:
for m in df_out:
# sum all the other columns
E = df_out.loc[:, df_out.columns != m].sum(axis=1)
# ranks of sums
rks = np.array(E.rank(method='first') - 1, dtype=int)
# make current column counter-monotonic to sum (sorted marginals are in descedending order)
df_out[m] = sorted_marginals[m].loc[rks].values
# achieved VaR is minimum value
v = df_out.sum(axis=1).sort_values(ascending=False).iloc[-1]
chg_var = last_var - v
last_var = v
# reporting and loop control
n_iter += 1
if n_iter >= 2:
logger.info(f'Iteration {n_iter:d}\t{v:5.3e}\tChg\t{chg_var:5.3e}')
if n_iter > max_n_iter:
logger.error("ERROR: not converging...breaking")
break
df_out['total'] = df_out.sum(axis=1)
logger.info(f'Ending VaR\t{v:7.5e}\ns lower {df_out.total.min():7.5e}')
return df_out.sort_values('total')
[docs]def make_corr_matrix(vine_spec):
r"""
Make a correlation matrix from a vine specification, https://en.wikipedia.org/wiki/Vine_copula.
A vine spececification is::
row 0: correl of X0...Xn-1 with X0
row 1: correl of X1....Xn-1 with X1 given X0
row 2: correl of X2....Xn-1 with X2 given X0, X1
etc.
For example ::
vs = np.array([[1,.2,.2,.2,.2],
[0,1,.3,.3,.3],
[0,0,1,.4, .4],
[0,0,0,1,.5],
[0,0,0,0,1]])
make_corr_matrix(vs)
Key fact is the partial correlation forumula
.. math::
\rho(X,Y|Z) = \frac{(\rho(X,Y) - \rho(X,Z)\rho(Y,Z))}{\sqrt{(1-\rho(X,Z)^2)(1-\rho(Y,Z)^2)}}
and therefore
.. math::
\rho(X,Y) = \rho(X,Z)\rho(Y,Z) + \rho(X,Y|Z) \sqrt((1-\rho(XZ)^2)(1-\rho(YZ)^2))
see https://en.wikipedia.org/wiki/Partial_correlation#Using_recursive_formula.
"""
A = np.matrix(vine_spec)
n, m = A.shape
assert n==m
for i in range(n - 2, 0, -1):
for j in range(i + 1, n):
for k in range(1, i+1):
# recursive formula
A[i, j] = A[i - k, i] * A[i - k, j] + A[i, j] * np.sqrt((1 - A[i - k, i] ** 2) * (1 - A[i - k, j] ** 2))
# fill in (unnecessary but simpler)
for i in range(n):
for j in range(i + 1, n):
A[j, i] = A[i, j]
return A
[docs]def random_corr_matrix(n, p=1, positive=False):
"""
make a random correlation matrix
smaller p results in more extreme correlation
0 < p <= 1
Eg ::
rcm = random_corr_matrix(5, .8)
rcm
np.linalg.cholesky(rcm)
positive=True for all entries to be positive
"""
if positive is True:
A = ar.RANDOM.random((n, n))**p
else:
A = 1 - 2 * ar.RANDOM.random((n, n))**p
np.fill_diagonal(A, 1)
return make_corr_matrix(A)
[docs]def show_fig(f, format='svg', **kwargs):
"""
Save a figure so it can be placed precisely in output. Used by Underwriter.show to
interleaf tables and plots.
:param f: a plt.Figure
:param format: svg or png
:param kwargs: passed to savefig
"""
bio = BytesIO()
f.savefig(bio, format=format, **kwargs)
bio.seek(0)
if format == 'png':
display(ipImage(bio.read()))
elif format == 'svg':
display(ipSVG(bio.read()))
else:
raise ValueError(f'Unknown type {format}')
plt.close(f)
[docs]def partial_e(sev_name, fz, a, n):
"""
Compute the partial expected value of fz. Computing moments is a bottleneck, so you
want analytic computation for the most commonly used types.
Exponential (for mixed exponentials) implemented separate from gamma even though it
is a special case.
.. math:
\int_0^a x^k fz.pdf(x)dx
for k=0,...,n as a np.array
To do: beta? weibull? Burr? invgamma, etc.
:param sev_name: scipy.stats name for distribution
:param fz: frozen scipy.stats instance
:param a: double, limit for integral
:param n: int, power
:return: partial expected value
"""
if sev_name not in ['lognorm', 'gamma', 'pareto', 'expon']:
raise NotImplementedError(f'{sev_name} NYI for analytic moments')
if a == 0:
return [0] * (n+1) # for k in range(n+1)]
if sev_name == 'lognorm':
m = fz.stats('m')
sigma = fz.args[0]
mu = np.log(m) - sigma**2 / 2
ans = [np.exp(k * mu + (k * sigma)**2 / 2) *
(ss.norm.cdf((np.log(a) - mu - k * sigma**2)/sigma) if a < np.inf else 1.0)
for k in range(n+1)]
return ans
elif sev_name == 'expon':
# really needed for MEDs
# expon is gamma with shape = 1
scale = fz.stats('m')
shape = 1.
lgs = loggamma(shape)
ans = [scale ** k * np.exp(loggamma(shape + k) - lgs) *
(ss.gamma(shape + k, scale=scale).cdf(a) if a < np.inf else 1.0)
for k in range(n + 1)]
return ans
elif sev_name == 'gamma':
shape = fz.args[0]
scale = fz.stats('m') / shape
# magic ingredient is the norming constant
# c = lambda sh: 1 / (scale ** sh * gamma(sh))
# therefore c(shape)/c(shape+k) = scale**k * gamma(shape + k) / gamma(shape)
# = scale ** k * exp(loggamma(shape + k) - loggamma(shape)) to avoid errors
ans = [scale ** k * np.exp(loggamma(shape + k) - loggamma(shape)) *
(ss.gamma(shape + k, scale=scale).cdf(a) if a < np.inf else 1.0)
for k in range(n + 1)]
return ans
elif sev_name == 'pareto':
# integrate xf(x) even though nx^n-1 S(x) may be more obvious
# former fits into the overall scheme
# a Pareto defined by agg is like so: ss.pareto(2.5, scale=1000, loc=-1000)
α = fz.args[0]
λ = fz.kwds['scale']
loc = fz.kwds.get('loc', 0.0)
# regular Pareto is scale=lambda, loc=-lambda, so this has no effect
# single parameter Pareto is scale=lambda, loc=0
# these formulae for regular pareto, hence
if λ + loc != 0:
logger.log(WL, 'Pareto not shifted to x>0 range...using numeric moments.')
return partial_e_numeric(fz, a, n)
ans = []
# will return inf if the Pareto does not have the relevant moments
# TODO: formula for shape=1,2,3
for k in range(n + 1):
b = [α * (-1) ** (k - i) * binom(k, i) * λ ** (k + α - i) *
((λ + a) ** (i - α) - λ ** (i - α)) / (i - α)
for i in range(k + 1)]
ans.append(sum(b))
return ans
[docs]def partial_e_numeric(fz, a, n):
"""
Simple numerical integration version of partial_e for auditing purposes.
"""
ans = []
for k in range(n+1):
temp = quad(lambda x: x ** k * fz.pdf(x), 0, a)
if temp[1] > 1e-4:
logger.warning('Potential convergence issues with numerical integral')
ans.append(temp[0])
return ans
[docs]def moms_analytic(fz, limit, attachment, n, analytic=True):
"""
Return moments of :math:`E[(X-attachment)^+ \wedge limit]^m`
for m = 1,2,...,n.
To check:
::
# fz = ss.lognorm(1.24)
fz = ss.gamma(6.234, scale=100)
# fz = ss.pareto(3.4234, scale=100, loc=-100)
a1 = moms_analytic(fz, 50, 1234, 3)
a2 = moms_analytic(fz, 50, 1234, 3, False)
a1, a2, a1-a2, (a1-a2) / a1
:param fz: frozen scipy.stats instance
:param limit: double, limit (layer width)
:param attachment: double, limit
:param n: int, power
:param analytic: if True use analytic formula, else numerical integrals
"""
# easy
if limit == 0:
return np.array([0.] * n)
# don't know how robust this will be...
sev_name = str(fz.__dict__['dist']).split('.')[-1].split('_')[0]
# compute and store the partial_e
detachment = attachment + limit
if analytic is True:
pe_attach = partial_e(sev_name, fz, attachment, n)
pe_detach = partial_e(sev_name, fz, detachment, n)
else:
pe_attach = partial_e_numeric(fz, attachment, n)
pe_detach = partial_e_numeric(fz, detachment, n)
ans1 = np.array([sum([(-1) ** (m - k) * binom(m, k) * attachment ** (m - k) * (pe_detach[k] - pe_attach[k])
for k in range(m + 1)])
for m in range(n + 1)])
if np.isinf(limit):
ans2 = np.zeros_like(ans1)
else:
ans2 = np.array([limit ** m * fz.sf(detachment) for m in range(n+1)])
ans = ans1 + ans2
return ans
[docs]def qd(*argv, accuracy=3, align=True, trim=True, ff=None, **kwargs):
"""
Endless quest for a robust display format!
Quick display (qd) a list of objects.
Dataframes handled in text with reasonable defaults.
For use in documentation.
:param: argv: list of objects to print
:param: accuracy: number of decimal places to display
:param: align: if True, align columns at decimal point (sEngFormatter)
:param: trim: if True, trim trailing zeros (sEngFormatter)
:param: ff: if not None, use this function to format floats, or 'basic', or 'binary'
:kwargs: passed to pd.DataFrame.to_string for dataframes only. e.g., pass dict of formatters by column.
"""
from .distributions import Aggregate
from .portfolio import Portfolio
# ff = sEngFormatter(accuracy=accuracy - (2 if align else 0), min_prefix=0, max_prefix=12, align=align, trim=trim)
if ff is None:
ff = lambda x: f'{x:.5g}'
elif ff == 'basic':
ff = lambda x: f'{x:.1%}' if x < 1 else f'{x:12,.0f}'
elif ff == 'int_ratio':
def format_function(x):
ir = np.round(x, 13).as_integer_ratio()
return f'{int(x)}' if x in [0, 1] else f' {ir[0]}/{ir[1]}'
ff = format_function
# split output
for x in argv:
if isinstance(x, (Aggregate, Portfolio)):
if 'Err CV(X)' in x.describe.columns:
qd(x.describe.drop(columns=['Err CV(X)']).fillna(''), accuracy=accuracy, **kwargs)
else:
# object not updated
qd(x.describe.fillna(''), accuracy=accuracy, **kwargs)
bss = 'na' if x.bs == 0 else (f'{x.bs:.0f}' if x.bs >= 1 else f'1/{1/x.bs:.0f}')
vr = x.explain_validation()
print(f'log2 = {x.log2}, bandwidth = {bss}, validation: {vr}.')
elif isinstance(x, pd.DataFrame):
# 100 line width matches rtd html format
args = {'line_width': 100,
'max_cols': 35,
'max_rows': 25,
'float_format': ff,
# needs to be larger for text output
# 'max_colwidth': 10,
'sparsify': True,
'justify': None
}
args.update(kwargs)
print()
print(x.to_string(**args))
# print(x.to_string(formatters={c: f for c in x.columns}))
elif isinstance(x, pd.Series):
args = {'max_rows': 25,
'float_format': ff,
'name': True
}
args.update(kwargs)
print()
print(x.to_string(**args))
elif isinstance(x, int):
print(x)
elif isinstance(x, Number):
print(ff(x))
else:
print(x)
[docs]def qdp(df):
"""
Quick describe with nice percentiles and cv for a dataframe.
"""
d = df.describe()
# replace with non-sample sd
d.loc['std'] = df.std(ddof=0)
d.loc['cv'] = d.loc['std'] / d.loc['mean']
return d
[docs]def mv(x, y=None):
"""
Nice display of mean and variance for Aggregate or Portfolios or
entered values.
R style function, no return value.
:param x: Aggregate or Portfolio or float
:param y: float, if x is a float
:return: None
"""
from .distributions import Aggregate
from .portfolio import Portfolio
if y is None and isinstance(x, (Aggregate, Portfolio)):
print(f'mean = {x.agg_m:.6g}')
print(f'variance = {x.agg_var:.7g}')
print(f'std dev = {x.agg_sd:.6g}')
else:
print(f'mean = {x:.6g}')
print(f'variance = {y:.7g}')
print(f'std dev = {y**.5:.6g}')
[docs]def picks_work(attachments, layer_loss_picks, xs, sev_density, n=1, sf=None, debug=False):
"""
Adjust the layer unconditional expected losses to target. You need int xf(x)dx, but
that is fraught when f is a mixed distribution. So we only use the int S version.
``fz`` was initially a frozen continuous distribution; but adjusted to sf function
and dropped need for pdf function.
See notes for how the parts are defined. Notice that::
np.allclose(p.layers.v - p.layers.f, p.layers.l - p.layers.e)
is true.
:param attachments: array of layer attachment points, in ascending order (bottom to top). a[0]>0
:param layer_loss_picks: Target means. If ``len(layer_loss_picks)==len(attachments)`` then the bottom layer, 0 to a[0],
is added. Can be input as unconditional layer severity (i.e., :math:`\\mathbb{E}[(X-a)^+\wedge y]`) or as the
layer loss pick (i.e., :math:`\\mathbb{E}[(X-a)^+\wedge y]'times n` where *n* is the number of ground-up (to the
insurer) claims. Multiplying and dividing by :math:`S(a)` shows this equals conditional severity in the layer
times the number of claims in the layer.) Actuaries usually estimate the loss pick to the layer in pricing. When
called from :class:`Aggregate` the number of ground up claims is known.
:param en: ground-up expected claims. Target is divided by ``en``.
:param xs: x values for discretization
:param sev_density: Series of existing severity density from Aggregate.
:param sf: cdf function for the severity distribution.
:param debug: if True, return debug information (layers, density with adjusted probs, audit
of layer expected values.
"""
# want xs, attachments, and sev_density to be numpy arrays
xs = np.array(xs)
attachments = np.array(attachments)
# target is the unconditional layer expected loss, E[(X-a)^+ ^ y]
target = np.array(layer_loss_picks) / n
# print(n, layer_loss_picks, target)
sev_density = np.array(sev_density)
# figure bucket size
bs = xs[1] - xs[0]
# dataframe of adjusted probabilties, starts here
density = pd.DataFrame({'x': xs, 'p': sev_density}).set_index('x', drop=False)
fill_value = max(0, 1. - density.p.sum())
density['S'] = density.p.shift(-1, fill_value=fill_value)[::-1].cumsum()
# numerical integrals - these match
layers = pd.DataFrame(columns=['a', 'lev', 'int_fdx', 'aS', 'S'], index=range(1, 1+len(attachments)),
dtype=float)
for i, x in enumerate(attachments):
ix = density.loc[0:x-bs, 'S'].sum() * bs
ix2 = density.loc[0:x-bs, ['x', 'p']].prod(axis=1).sum()
layers.loc[i+1, :] = [x, ix, ix2, x * density.loc[x, 'S'] if x < np.inf else 0.0, density.loc[x, 'S']]
# prob of loss in layer
layers['p'] = layers.S.shift(1, fill_value=1) - layers.S
# unconditional expected loss in layer
layers['l'] = layers.lev - layers.lev.shift(1, fill_value=0)
layers.index.name = 'layer'
# bottom of layer
layers['a_bottom'] = layers.a.shift(1, fill_value=0)
# width of layer
layers['y'] = layers.a - layers.a_bottom
# e = rectangle to right in int S computation
layers['e'] = layers.S * layers.y
# f = rectangle below attachment in int xf computation
layers['f'] = layers.p * layers.a_bottom
# these are two versions of m (unconditional)
# m-bit: int S - e == int xf - f
layers['m'] = layers.l - layers.e
# int f dx in layer
layers['v'] = layers.f + layers.m
# and conditional vertical loss in layer
layers['v_c'] = layers.v / layers.p
layers = layers[['a_bottom', 'a', 'y', 'lev', 'S', 'p', 'l', 'v', 'v_c', 'm', 'e', 'f']]
# add weights w and offsets=omega, computed from the top layer down
layers['t'] = target
layers['w'] = 0.0
layers['ω'] = 0.0
# this computation leaves the tail unchanged and uses the same "adjust the curve" method
# in all layers
ω = layers.loc[len(layers), 'S']
for i in layers.index[::-1]:
layers.loc[i, 'w'] = (layers.loc[i, 't'] - ω * layers.loc[i, 'y']) / layers.loc[i, 'm']
layers.loc[i, 'ω'] = ω
ω += layers.loc[i, 'p'] * layers.loc[i, 'w']
# adjusted S: bins -> layer number; add in offsets
density['bin'] = pd.cut(density.x, np.hstack((0, layers.a.values)), include_lowest=True, right=True)
# layer description returned by cut to layer number in layers
mapper = {i:j+1 for j, i in enumerate(density.bin.unique())}
density['layer'] = density.bin.map(mapper.get)
density['ω'] = density.layer.map(layers.ω).astype(float)
# S(a_n-1)
density['Sa'] = density.layer.map(layers.S).astype(float)
density['w'] = density.layer.map(layers.w).astype(float)
density['S_adj'] = np.minimum(1, density.ω + (density.S - density.Sa) * density.w)
# no change in the tail
density.loc[attachments[-1]:, 'S_adj'] = density.loc[attachments[-1]:, 'S']
# adj probs as difference of S
density['p_adj'] = density['S_adj'].shift(1, fill_value=1) - density['S_adj']
achieved = density.groupby(density.layer.shift(-1)).apply(lambda g: g['S_adj'].sum() * bs)
# display(achieved)
if abs(achieved.iloc[0] - target[0]) > 1e-3:
# issues with hitting 1
logger.log(WL, f'achieved[0] = {achieved.iloc[0]} != target[0] = {target[0]}')
# take top right corner off
if target[0] > attachments[0]:
raise ValueError(f'target[0] = {target[0]} > first attachment[0] = {attachments[0]} which is impossible.')
s0 = 2 * (attachments[0] - target[0]) / (1 - layers.loc[1, 'ω'])
# snap to index
s0 = bs * np.round(s0 / bs, 0)
# convert to probability
s = attachments[0] - s0
density.loc[0:s, 'S_adj'] = 1.0
temp = np.array(density.loc[s+bs:attachments[0]].index)
wts = (temp - s) / s0
density.loc[s+bs:attachments[0], 'S_adj'] = 1 - wts + layers.loc[1, 'ω'] * wts
# update
density['p_adj'] = density['S_adj'].shift(1, fill_value=1) - density['S_adj']
achieved = density.groupby(density.layer.shift(-1)).apply(lambda g: g['S_adj'].sum() * bs)
logger.log(WL, f'Revised layer 1 achieved = {achieved.iloc[0]}')
density['diff S'] = density['S'] - density['Sa']
if debug is False:
return density['p_adj'].values
# data frame of layer statistics from input density
exact = None
if sf is not None:
logger.log(WL, 'sf passed in; computing exact layer statistics')
exact = pd.DataFrame(columns=['a', 'lev', 'aS', 'S'],
index=range(1, 1+len(attachments)), dtype=float)
for i, x in enumerate(attachments):
ix = quad(sf, 0, x)
# check error is small
assert ix[1] < 1e-6
sf_ = sf(x)
exact.loc[i+1, :] = [x, ix[0], x * sf_ if x < np.inf else 0.0, sf_]
if exact is None:
l = layers.l
ln = 'layers'
else:
l = exact.lev - exact.lev.shift(1, fill_value=0)
ln = 'exact'
t = pd.concat((l,
density.groupby(density.layer.shift(-1)).apply(lambda g: g['S'].sum() * bs),
achieved,
), keys=[ln, 'computed', 'adj'], axis=1)
t.loc['sum'] = t.sum()
Picks = namedtuple('picks', ['layers', 'exact', 'density', 'audit'])
return Picks(layers=layers, exact=exact, density=density, audit=t)
[docs]def integral_by_doubling(func, x0, err=1e-8):
r"""
Compute :math:`\int_{x_0}^\infty f` as the sum
.. math::
\int_{x_0}^\infty f = \sum_{n \ge 0} \int_{2^nx_0}^{2^{n+1}x_0} f
Caller should check the integral actually converges.
:param func: function to be integrated.
:param x0: starting x value
:param err: desired accuracy: stop when incremental integral is <= err.
"""
ans = 0.
counter = 0
# from to
f, t = x0, 2 * x0
last_int = 10
while last_int > err:
s = quad(func, f, t)
if s[1] > err:
raise ValueError(
f'Questionable integral numeric convergence, err {s[1]:.4g}\n'
f'f={f}, t={t}, x0={x0}, counter={counter}')
last_int = s[0]
ans += s[0]
f, t = t, 2 * t
counter += 1
if counter > 96:
raise ValueError(f'counter = {counter} and error = {err}')
return -ans
[docs]@lru_cache(maxsize=128)
def logarithmic_theta(mean):
"""
Solve for theta parameter given mean, see JKK p. 288
"""
f = lambda x: x / (-np.log(1 - x) * (1 - x)) - mean
theta = brentq(f, 1e-10, 1-1e-10)
if not np.allclose(mean, theta / (-np.log(1 - theta) * (1 - theta))):
print('num method failed')
else:
return theta
[docs]def make_var_tvar(ser):
"""
Make var (lower quantile), upper quantile, and tvar functions from a ``pd.Series`` ``ser``, which
has index given by losses and p_total values.
``ser`` must have a unique monotonic increasing index and all p_totals > 0.
Such a series comes from ``a.density_df.query('p_total > 0').p_total``, for example.
Tested using numpy vs pd.Series lookup functions, and this version is much
faster. See ``var_tvar_test_suite`` function below for testers (obviously
run before this code was integrated).
Changed in v. 0.13.0
"""
# audits
assert ser.index.is_unique, 'index values must be unique'
assert ser.index.is_monotonic_increasing, 'index values must be increasing'
# detach from the outside scope
ser = ser.copy()
# create needed arrays
x_np = np.array(ser.index)
# better not to cumulate array when all elements are equal (because of
# floating point issues). This does make some difference.
if np.all(np.isclose(ser, ser.iloc[0], atol=2**-53)):
d = 1 / len(ser)
cser = pd.Series(np.linspace(d, 1, len(ser)), index=ser.index)
else:
cser = ser.cumsum()
cser_F_np = cser.to_numpy()
# detach the index values
# cser_idx = pd.Index(cser.values)
tvar_unconditional = ((ser * ser.index)[::-1].cumsum()[::-1]).to_numpy()
# these last three are annoyting because np.where does not short circuit
tvar_unconditional = np.hstack((tvar_unconditional, np.inf, np.inf))
cser_F_np2 = np.hstack((cser_F_np, 1))
x_np2l = np.hstack((x_np, x_np[-1]))
x_np2u = np.hstack((x_np, np.inf))
# x_max = cser_F_np[-2]
# tests show this is about 6 times faster than
# q = interp1d(cser, ser.index, kind='next', bounds_error=False, fill_value=(ser.index.min(), ser.index.max()))
def q_lower(p):
nonlocal x_np2l, cser_F_np
return x_np2l[np.searchsorted(cser_F_np, p, side='left')]
def q_upper(p):
nonlocal x_np2u, cser_F_np
return x_np2u[np.searchsorted(cser_F_np, p, side='right')]
def tvar(p):
"""
Vectorized TVaR computation.
"""
nonlocal cser_F_np, x_np, tvar_unconditional
if isinstance(p, (float, int)):
# easy
if p >= cser_F_np[-2]:
return x_np[-1]
else:
idx = np.searchsorted(cser_F_np, p, side='right')
return ((cser_F_np[idx] - p) * x_np[idx] + tvar_unconditional[idx + 1]) / (1 - p)
else:
# vectorized
p = np.array(p)
idx = np.searchsorted(cser_F_np, p, side='right')
return np.where(idx >= len(cser_F_np) - 1,
x_np[-1],
((cser_F_np2[idx] - p) * x_np2u[idx] + tvar_unconditional[idx + 1]) / (1 - p))
QuantileFunctions = namedtuple("QuantileFUnctions", 'q q_lower var q_upper tvar')
return QuantileFunctions(q_lower, q_lower, q_lower, q_upper, tvar)
[docs]def test_var_tvar(program, bs=0, n_ps=1025, normalize=False, speed_test=False, log2=16):
"""
Run a test suite of programs against new var and tvar functions compared to
old aggregate.Aggregate versions
Suggestion::
args = [
('agg T dfreq [1,2,4,8] dsev [1]', 0, 1025, False),
('agg T dfreq [1:8] dsev [2:3]', 0, 1025, False),
('agg D dfreq [1:6] dsev [1]', 0, 7, False),
('agg T2 10 claims 100 x 0 sev lognorm 10 cv 1 poisson ', 1/64, 1025, False),
('agg T2 10 claims sev lognorm 10 cv 4 poisson ', 1/16, 1025, False),
('agg T2 10 claims sev lognorm 10 cv 4 mixed gamma 1.2 ', 1/8, 1025, False),
('agg Dice dfreq [1] dsev [1:6]', 0, 7, False)
]
from IPython.display import display, Markdown
for t in args:
display(Markdown(f'## {t[0]}'))
bs = t[1]
test_suite(*t, speed_test=False, log2=16 if bs!= 1/8 else 20)
Expected output to show that new functions are 3+ orders of magnitude faster, and
agree with the old functions. q and q_upper agree everywhere except the jumps.
"""
from aggregate import build, qd
a = build(program, bs=bs, normalize=normalize, log2=log2)
qd(a)
ser = a.density_df.query('p_total > 0').p_total
print(f'\ntotal probability = {ser.sum():.16f}')
print()
qf = make_var_tvar(ser)
ps = np.linspace(0, 1, n_ps)
if speed_test:
pass
# print('Timeit Tests\n============\n')
# print('new var')
# %timeit qf.var(ps)
# print('agg.var')
# %timeit [a.q(i) for i in ps]
# %timeit qf.q_upper(ps)
# print('new tvar')
# %timeit qf.tvar(ps)
# print('agg.tvar')
# %timeit [a.tvar(i, kind='tail') for i in ps]
# print()
# report results
test_df = pd.DataFrame({
'q': qf.var(ps),
'a.q': [a.q(i) for i in ps],
'q_upper': qf.q_upper(ps),
'a.q upper': [a.q(i, kind='upper') for i in ps],
'tvar': qf.tvar(ps),
'a.tvar tail': [a.tvar(i, kind='tail') for i in ps]
}, index=ps)
for tq in ['q != `a.q`', '`a.q upper` != q_upper', 'q != q_upper', 'abs(tvar - `a.tvar tail`) > 1e-12']:
df = test_df.query(tq)
display(df.head(10).style.format(precision=5).set_caption(f'{tq} with {len(df)} rows'))
display(test_df.sample(min(25, len(test_df))).sort_index().style.format(precision=5).set_caption('Whole Dataframe Sample of 25 Values'))
fig, ax = plt.subplots(1, 1, figsize=(6,4))
test_df.plot(lw=1, ax=ax, drawstyle='steps-pre')
for l, ls in zip(ax.lines, ['-', '--', ':', '-.', '-', ':']):
l.set_linestyle(ls)
ax.legend()
ax.set(title=program)
[docs]def kaplan_meier(df, loss='loss', closed='closed'):
"""
Compute Kaplan Meier Product limit estimator based on a sample
of losses in the dataframe df. For each loss you know the current
evaluation in column ``loss`` and a 0/1 indicator for open/closed
in ``closed``.
The output dataframe has columns
* index x_i, size of loss
* open - the number of open events of size x_i (open claim with this size)
* closed - the number closed at size x_i
* events - total number of events of size x_i
* n - number at risk at x_i
* s - probability of suriviving past x_i = 1 - closed / n
* pl - cumulative probability of surviving past x_i
See ipython workbook kaplan_meier.ipynb for a check against lifelines
and some kaggle data (telco customer churn,
https://www.kaggle.com/datasets/blastchar/telco-customer-churn?resource=download
https://towardsdatascience.com/introduction-to-survival-analysis-the-kaplan-meier-estimator-94ec5812a97a
:param df: dataframe of data
:param loss: column containing loss amount data
:param closed: column indicating if the obervation is a closed claim (1) or open (0)
:return: dataframe as described above
"""
df = df[[loss, closed]].rename(columns={loss: 'loss', closed: 'closed'}).copy()
df['open'] = 1 - df.closed
df = df.sort_values(['loss', 'closed'], ascending=[False, True]).reset_index(drop=True)
df = df.groupby(['loss', 'closed']).count()
# c has index loss amount and closed indicator, and column number of observations
c = df.unstack(1)
# total number of observables at each loss event size
c['t'] = c.sum(1)
# total number at risk at each event size
c['n'] = c.t[::-1].cumsum()
# better column names
c.columns = ['open', 'closed', 'events', 'n']
#
c = c.fillna(0)
# prob of surviving past each observed event size
c['s'] = 1 - c.closed / c.n
# KM product estimator
c['pl'] = c.s.cumprod()
return c
[docs]def kaplan_meier_np(loss, closed):
"""
Feeder to kaplan_meier where loss is np array of loss amounts and
closed a same sized array of 0=open, 1=closed indicators.
"""
df = pd.DataFrame({'loss': loss, 'closed': closed})
return kaplan_meier(df)
[docs]def more(self, regex):
"""
Investigate self for matches to the regex. If callable, try calling with no args, else display.
"""
for i in dir(self):
if re.search(regex, i):
ob = getattr(self, i)
if not callable(ob):
display(Markdown(f'### Attribute: {i}\n'))
display(ob)
else:
display(Markdown(f'### Callable: {i}\n'))
try:
print(ob())
except Exception as e:
help(ob)
[docs]def parse_note(txt):
"""
Extract kwargs from txt note. Recognizes bs, log2, padding, normalize, recommend_p.
CSS format.
Split on ; and then look for k=v pairs
bs can be entered as 1/32 etc.
:param txt: input text
:return value: dictionary of keyword: typed value
"""
stxt = txt.split(';')
ans = {}
for s in stxt:
kw = s.split('=')
if len(kw) == 2:
k = kw[0].strip()
v = kw[1].strip()
if re.match('bs|recommend_p', k):
if re.match(r'(\d+\.?\d*|\d*\.\d+)([eE](\+|\-)?\d+)?/(\d+\.?\d*|\d*\.\d+)([eE](\+|\-)?\d+)?', v):
v = eval(v)
else:
v = float(v)
elif re.match('log2|padding', k):
v = int(v)
elif 'normalize':
v = v == 'True'
ans[k] = v
return ans
[docs]def parse_note_ex(txt, log2, bs, recommend_p, kwargs):
"""
Avoid duplication: this is how the function is used in Underwriter.build.
"""
kw = parse_note(txt)
if 'log2' in kw and log2 == 0:
log2 = kw.pop('log2')
if 'bs' in kw and bs == 0:
bs = kw.pop('bs')
if 'recommend_p' in kw:
# always take the recommend_p from the note
recommend_p = kw.pop('recommend_p')
# rest are passed through
kwargs.update(kw)
return log2, bs, recommend_p, kwargs
[docs]def introspect(ob):
"""
Discover the non-private methods and properties of an object ob. Returns
a pandas DataFrame.
"""
d = [i for i in dir(ob) if i[0] != '_']
df = pd.DataFrame({'name': d})
ans = []
for i in d:
g = getattr(ob, i)
c = callable(g)
v = ''
t = ''
h = ''
l = 0
if not c:
v = str(g)
t = type(g)
h = ''
if isinstance(getattr(ob.__class__, i, None), property):
c = 'property'
else:
c = 'field'
try:
l = len(g)
except:
pass
else:
c = 'method'
h = g.__doc__
ans.append([c, v, t, h, l])
df[['callable', 'value', 'type', 'help', 'length']] = ans
df = df.sort_values(['callable', 'length', 'name'])
return df
[docs]def explain_validation(rv):
"""
Explain the validation result rv.
Don't over report: if you fail CV don't need to be told you fail Skew too.
"""
if rv == Validation.NOT_UNREASONABLE:
return "not unreasonable"
elif rv & Validation.NOT_UPDATED:
return "n/a, not updated"
elif rv & Validation.REINSURANCE:
return "n/a, reinsurance"
else:
explanation = 'fails '
if rv & Validation.SEV_MEAN:
# explanation += f'sev mean: {ob.sev_m: .4e} vs {ob.est_sev_m: .4e}\n'
explanation += f'sev mean, '
if rv & Validation.AGG_MEAN:
explanation += f'agg mean, '
if rv & Validation.ALIASING:
explanation += "agg mean error >> sev, possible aliasing; try larger bs, "
if not(rv & Validation.SEV_MEAN) and (rv & Validation.SEV_CV):
# explanation += f'sev cv: {ob.sev_cv: .4e} vs {ob.est_sev_cv: .4e}, '
explanation += f'sev cv, '
if not(rv & Validation.AGG_MEAN) and (rv & Validation.AGG_CV):
explanation += f'agg cv, '
if not (rv & Validation.SEV_CV) and (rv & Validation.SEV_SKEW):
# explanation += f'sev skew: {ob.sev_skew: .4e} vs {ob.est_sev_skew: .4e}, '
explanation += f'sev skew, '
if not (rv & Validation.AGG_CV) and (rv & Validation.AGG_SKEW):
explanation += f'agg skew, '
return explanation[:-2]