Source code for aggregate.portfolio

from collections import namedtuple
from collections.abc import Iterable
from copy import deepcopy
import json
import logging
import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from matplotlib.ticker import (MultipleLocator, StrMethodFormatter, MaxNLocator,
                               FixedLocator, FixedFormatter, AutoMinorLocator)
import numpy as np
import pandas as pd
from pandas.io.formats.format import EngFormatter
from pandas.plotting import scatter_matrix
from pathlib import Path
import re
import scipy.stats as ss
from scipy import interpolate
from scipy.interpolate import interp1d
from scipy.optimize import bisect
from scipy.spatial import ConvexHull
from textwrap import fill
import warnings
from IPython.core.display import HTML, display

from .constants import *
from .distributions import Aggregate, Severity
from .spectral import Distortion
from .utilities import (ft, ift, axiter_factory, AxisManager, html_title,
                        suptitle_and_tight, pprint_ex,
                        MomentAggregator, Answer, subsets, round_bucket,
                        make_mosaic_figure, iman_conover, approximate_work,
                        make_var_tvar, more, explain_validation, )
import aggregate.random_agg as ar


# fontsize : int or float or {'xx-small', 'x-small', 'small', 'medium', 'large', 'x-large', 'xx-large'}
# matplotlib.rcParams['legend.fontsize'] = 'xx-small'
logger = logging.getLogger(__name__)


[docs]class Portfolio(object): """ Portfolio creates and manages a portfolio of Aggregate objects each modeling one unit of business. Applications include - Model a book of insurance - Model a large account with several sub lines - Model a reinsurance portfolio or large treaty """ # namer helper classes premium_capital_renamer = { 'Assets': "0. Assets", 'T.A': '1. Allocated assets', 'T.P': '2. Market value liability', 'T.L': '3. Expected incurred loss', 'T.M': '4. Margin', 'T.LR': '5. Loss ratio', 'T.Q': '6. Allocated equity', 'T.ROE': '7. Cost of allocated equity', 'T.PQ': '8. Premium to surplus ratio', 'EPD': '9. Expected pol holder deficit' }
[docs] def __init__(self, name, spec_list, uw=None): """ Create a new :class:`Portfolio` object. :param name: The name of the portfolio. No spaces or underscores. :param spec_list: A list of 1. dictionary: Aggregate object dictionary specifications or 2. Aggregate: An actual aggregate objects or 3. tuple (type, dict) as returned by uw['name'] or 4. string: Names referencing objects in the optionally passed underwriter 5. a single DataFrame: empirical samples (the total column, if present, is ignored); a p_total column is used for probabilities if present :returns: new :class:`Portfolio` object. """ self.name = name self.agg_list = [] self.line_names = [] self._valid = None self.sample_df = None logger.debug(f'Portfolio.__init__| creating new Portfolio {self.name}') # logger.debug(f'Portfolio.__init__| creating new Portfolio {self.name} at {super(Portfolio, self).__repr__()}') ma = MomentAggregator() max_limit = 0 if len(spec_list) == 1 and isinstance(spec_list[0], pd.DataFrame): # create from samples...slightly different looping behavior logger.info('Creating from sample DataFrame') spec_list = spec_list[0] if isinstance(spec_list, pd.DataFrame): spec_list = spec_list.copy().astype(float) if 'p_total' not in spec_list: logger.info('Adding p_total column to DataFrame with equal probs') spec_list['p_total'] = np.repeat(1 / len(spec_list), len(spec_list)) # it is helpful to know what sample the object is created self.sample_df = spec_list for spec in spec_list: if isinstance(spec, Aggregate): # directly passed in an agg object a = spec agg_name = spec.name elif isinstance(spec, str) and isinstance(spec_list, pd.DataFrame): if spec not in ['total', 'p_total']: # hack: issue: close values mess up the discrete distribution # 2**-30 = 9.313225746154785e-10 approx 1e-9, so to ensure we don't # have any merging issues in the discrete distribution, we round # to 8 decimal places. temp = spec_list[[spec, 'p_total']] temp['rounded'] = np.round(temp[spec].astype(float), 8) s = temp.groupby('rounded').sum() a = Aggregate(name=spec, exp_en=1, sev_name='dhistogram', sev_xs=s.index.values, sev_ps=s.p_total.values, freq_name='fixed') agg_name = spec else: a = None elif isinstance(spec, str): # look up object in uw return actual instance # note here you could do uw.aggregate[spec] and get the dictionary def # or uw.write(spec) to return the already-created (and maybe updated) object # we go the latter route...if user wants they can pull off the dict item themselves if uw is None: raise ValueError(f'Must pass valid Underwriter instance to create aggs by name') try: a_out = uw.write(spec) except Exception as e: logger.error(f'Item {spec} not found in your underwriter') raise e # a is a disct (kind, name) -> (obj or spec, program) pair. Portfolios are ?always created so # here, spec is the name assert len(a_out) == 1 # remember, the thing you make must be called a as part of the loop a = a_out[('agg', spec)][0] assert isinstance(a, Aggregate) agg_name = a.name elif isinstance(spec, tuple): # uw returns type, spec assert spec[0] == 'agg' a = Aggregate(**spec[1]) agg_name = spec[1]['name'] elif isinstance(spec, dict): a = Aggregate(**spec) agg_name = spec['name'][0] if isinstance(spec['name'], list) else spec['name'] else: raise ValueError(f'Invalid type {type(spec)} passed to Portfolio, expect Aggregate, str or dict.') if a is not None: # deals with total in DataFrame intput mode self.agg_list.append(a) self.line_names.append(agg_name) self.__setattr__(agg_name, a) ma.add_fs(a.report_ser[('freq', 'ex1')], a.report_ser[('freq', 'ex2')], a.report_ser[('freq', 'ex3')], a.report_ser[('sev', 'ex1')], a.report_ser[('sev', 'ex2')], a.report_ser[('sev', 'ex3')]) max_limit = max(max_limit, np.max(np.array(a.limit))) self.line_names_ex = self.line_names + ['total'] self.line_name_pipe = "|".join(self.line_names_ex) for n in self.line_names: # line names cannot equal total if n == 'total': raise ValueError('Line names cannot equal total, it is reserved for...total') # make a pandas data frame of all the statistics_df temp_report = pd.concat([a.report_ser for a in self.agg_list], axis=1) # max_limit = np.inf # np.max([np.max(a.get('limit', np.inf)) for a in spec_list]) temp = pd.DataFrame(ma.stats_series('total', max_limit, 0.999, remix=False)) self.statistics_df = pd.concat([temp_report, temp], axis=1) # future storage self.density_df = None self.independent_density_df = None self.augmented_df = None self._epd_2_assets = None self._assets_2_epd = None self._priority_capital_df = None self.priority_analysis_df = None self.audit_df = None self.independent_audit_df = None self.padding = 0 self.tilt_amount = 0 self._var_tvar_function = None self._cdf = None self._pdf = None self.bs = 0 self.log2 = 0 self.ex = 0 self.last_update = 0 self.hash_rep_at_last_update = '' self._distortion = None self.sev_calc = '' self._remove_fuzz = 0 self.approx_type = "" self.approx_freq_ge = 0 self.discretization_calc = '' self.normalize = None self._renamer = None self._line_renamer = None self._tm_renamer = None # if created by uw it stores the program here self.program = '' self.audit_percentiles = [.9, .95, .99, .996, .999, .9999, 1 - 1e-6] self.dists = None self.dist_ans = None self.figure = None # for consistency with Aggregates self.agg_m = self.statistics_df.loc[('agg', 'ex1'), 'total'] self.agg_cv = self.statistics_df.loc[('agg', 'cv'), 'total'] self.agg_skew = self.statistics_df.loc[('agg', 'skew'), 'total'] # variance and sd come up in exam questions self.agg_sd = self.agg_m * self.agg_cv self.agg_var = self.agg_sd * self.agg_sd # these are set when the object is updated self.est_m = self.est_cv = self.est_skew = self.est_sd = self.est_var = 0 # enhanced portfolio items self.EX_premium_capital = None self.EX_multi_premium_capital = None self.EX_accounting_economic_balance_sheet = None self.validation_eps = VALIDATION_EPS
[docs] def more(self, regex): """ More information about methods and properties matching regex """ more(self, regex)
[docs] def add_exa_sample(self, sample, S_calculation='forwards'): """ Computes a version of density_df using sample to compute E[Xi | X]. Then fill in the other ex.... variables using code from Portfolio.add_exa, stripped down to essentials. If no p_total is given then samples are assumed equally likely. total is added if not given (sum across rows) total is then aligned to the bucket size self.bs using (total/bs).round(0)*bs. The other loss columns are then scaled so they sum to the adjusted total Next, group by total, sum p_total and average the lines to create E[Xi|X] This sample is merged into a stripped down density_df. Then the other ex... columns are added. Excludes eta mu columns. Anticipated use: replace density_df with this, invalidate quantile function and then compute various allocation metrics. The index on the input sample is ignored. Formally ``extensions.samples.add_exa_sample``. """ # starter information # cut_eps = np.finfo(float).eps bs = self.bs # working copy sample_in = sample.copy() if 'total' not in sample: # p_total may be in sample cols = list(sample.columns) if 'p_total' in sample: cols.remove('p_total') sample_in['total'] = sample_in[cols].sum(axis=1) # index may be called total; that causes confusion; throw away input index sample_in = sample_in.reset_index(drop=True) # want to align the index to that of self.density_df; all multiples of self.bs # at the same time, want to scale all elements # temp0 gives the multiples of bs for the index; temp is the scaling for # all the other columns; temp0 will all be exact temp0 = (sample_in.total / bs).round(0) * bs temp = (temp0 / sample_in.total).to_numpy().reshape((len(sample_in), 1)) # re-scale loss samples so they sum to total, need to extract p_total first if 'p_total' not in sample_in: # equally likely probs logger.info('Adding p_total to sample_in') # logger.info('Adding p_total to sample_in') p_total = 1.0 / len(sample_in) else: # use input probs p_total = sample_in['p_total'] # re-scale sample_in = sample_in * temp # exact for total sample_in['total'] = temp0 # and put probs back sample_in['p_total'] = p_total # Group by X values, aggregate probs and compute E[Xi | X] exeqa_sample = sample_in.groupby(by='total').agg( **{f'exeqa_{i}': (i, np.mean) for i in self.line_names}) # need to do this after rescaling to get correct (rounded) total values probs = sample_in.groupby(by='total').p_total.sum() # want all probs to be positive probs = np.maximum(0, probs.fillna(0.0)) # working copy of self's density_df with relevant columns df = self.density_df.filter( regex=f'^(loss|(p|e)_({self.line_name_pipe})|(e|p)_total)$').copy() # want every value in sample_in.total to be in the index of df # this code verifies that has occurred # for t in sample_in.total: # try: # df.index.get_loc(t) # except KeyError: # print(f'key error for t={t}') # # or, if you prefer, # # test = df[['loss', 'p_total']].merge(sample_in, left_index=True, right_on='total', how='outer', indicator=True) # test.groupby('_merge')[['loss']].count() # # shows nothing right_only. # fix p_total and hence S and F # fill in these values (note, all this is to get an answer the same # shape as df, so it can be swapped in) df['p_total'] = probs df['p_total'] = df['p_total'].fillna(0.) # macro, F, S df['F'] = df.p_total.cumsum() # np.cumsum(df.p_total) if S_calculation == 'forwards': df['S'] = 1 - df.F else: # add_exa method; you'd think the fill value should be 0, which # will be the case when df.p_total sums to 1 (or more) df['S'] = \ df.p_total.shift(-1, fill_value=min(df.p_total.iloc[-1], max(0, 1. - (df.p_total.sum()))))[::-1].cumsum()[::-1] # this avoids irritations later on df.F = np.minimum(df.F, 1) df.S = np.minimum(df.S, 1) # where is S=0 Seq0 = (df.S == 0) # invalidate quantile functions self._var_tvar_function = None # E[X_i | X=a], E(xi eq a) # all in one go (outside loop) df = pd.merge(df, exeqa_sample, how='left', left_on='loss', right_on='total').fillna(0.0).set_index('loss', drop=False) # check exeqa sums to correct total. note this only happens ae, ie when # p_total > 0 assert np.allclose(df.query('p_total > 0').loss, df.query('p_total > 0')[[f'exeqa_{i}' for i in self.line_names]].sum(axis=1)) assert df.index.is_unique df['exeqa_total'] = df.loss # add additional variables via loop for col in self.line_names_ex: # ### Additional Variables # * exeqa_line = $E(X_i \mid X=a)$ # * exlea_line = $E(X_i \mid X\le a)$ # * e_line = $E(X_i)$ # * exgta_line = $E(X_i \mid X \ge a)$ # * exi_x_line = $E(X_i / X \mid X = a)$ # * and similar for le and gt a # * exa_line = $E(X_i(a))$ # * Price based on same constant ROE formula (later we will do $g$s) # need the stand alone LEV calc # E(min(Xi, a) # needs to be shifted down by one for the partial integrals.... # stemp = 1 - df['p_' + col].cumsum() stemp = df['p_' + col].shift(-1, fill_value=min(df['p_' + col].iloc[-1], max(0, 1. - (df['p_' + col].sum()))))[::-1].cumsum()[::-1] df['lev_' + col] = stemp.shift(1, fill_value=0).cumsum() * self.bs # E[X_i | X<= a] temp is used in le and gt calcs temp = np.cumsum(df['exeqa_' + col] * df.p_total) df['exlea_' + col] = temp / df.F # E[X_i | X>a] df['exgta_' + col] = (df['e_' + col] - temp) / df.S # E[X_i / X | X > a] (note=a is trivial!) temp = df.loss.iloc[0] # loss=0, should always be zero df.loss.iloc[0] = 1 # avoid divide by zero # unconditional E[X_i/X] df['exi_x_' + col] = np.sum( df['exeqa_' + col] * df.p_total / df.loss) temp_xi_x = np.cumsum(df['exeqa_' + col] * df.p_total / df.loss) df['exi_xlea_' + col] = temp_xi_x / df.F df.loc[0, 'exi_xlea_' + col] = 0 # selection, 0/0 problem # more generally F=0 error: V # df.loc[df.exlea_total == 0, 'exi_xlea_' + col] = 0 # ?? not an issue for samples; don't have exlea_total anyway?? # put value back df.loss.iloc[0] = temp fill_value = np.nan assert df.index.is_unique, "Index is not unique!" df['exi_xgta_' + col] = ((df[f'exeqa_{col}'] / df.loss * df.p_total).shift(-1, fill_value=fill_value)[ ::-1].cumsum()) / df.S # need this NOT to be nan otherwise exa won't come out correctly df.loc[Seq0, 'exi_xgta_' + col] = 0. df['exi_xeqa_' + col] = df['exeqa_' + col] / df['loss'] df.loc[0, 'exi_xeqa_' + col] = 0 # need the loss cost with equal priority rule df[f'exa_{col}'] = (df.S * df['exi_xgta_' + col]).shift(1, fill_value=0).cumsum() * self.bs # put in totals for the ratios... this is very handy in later use for metric in ['exi_xlea_', 'exi_xgta_', 'exi_xeqa_']: df[metric + 'sum'] = df.filter(regex=metric).sum(axis=1) df = df.set_index('loss', drop=False) df.index.name = None return df
[docs] @staticmethod def create_from_sample(name, sample_df, bs, log2=16, **kwargs): """ Create from a multivariate sample, update with bs, execute switcheroo, and return new Portfolio object. OED: switcheroo, n. a change of position or an exchange, esp. one intended to surprise or deceive; a reversal or turn-about; spec. an unexpected change or ‘twist’ in a story. Also attributive, reversible, reversed. """ logger.info(f'Creating Porfolio {name} from sample_df. Handles adding total and converting to floats.') port = Portfolio(name, sample_df) logger.info(f'Updating with bs={bs}, log2={log2}, remove_fuzz=True') port.update(bs=bs, log2=log2, remove_fuzz=True, **kwargs) # archive the original density_df port.independent_density_df = port.density_df.copy() # execute switeroo logger.info('Creating exa_sample and executing switcheroo') port.density_df = port.add_exa_sample(sample_df) # update total stats logger.info('Updating total statistics (WARNING: these are now emprical)') port.independent_audit_df = port.audit_df.copy() # just update the total stats port.make_audit_df(['total'], None) # return new created object logger.info('Returning new Portfolio object') return port
[docs] def sample_compare(self, ax=None): """ Compare the sample sum to the independent sum of the marginals. """ if self.independent_density_df is None: raise ValueError('No independent_density_df, cannot compare') if ax is not None: ax.plot(self.independent_density_df.index, self.independent_density_df['S'], lw=1, label='independent') ax.plot(self.density_df.index, self.density_df['S'], lw=1, label='sample') ax.legend() df = pd.concat((self.independent_audit_df.T, self.audit_df.T), keys=['independent', 'sample'], axis=1).xs('total', 1,1, drop_level=False) return df
[docs] def sample_density_compare(self, fuzz=0): """ Compare from density_df """ bit = pd.concat((self.independent_density_df.filter(regex='p_'), self.density_df.filter(regex='p_total|exeqa_')), axis=1, keys=['independent', 'sample']) bit = bit.loc[(bit[('independent', 'p_total')] > fuzz) + (bit[('sample', 'p_total')] > fuzz)] bit[('', 'difference')] = bit[('independent', 'p_total')] - bit[('sample', 'p_total')] return bit
[docs] def pricing_bounds(self, premium, a=0, p=0, n_tps=64, kind='tail', slow=False, verbose=250): """ Compute the natural allocation premium ranges by unit consistent with total premium at asset level a or p (one of which must be provided). Unlike typical case with even s values, this is run at the actual S values of the Portfolio. Visualize:: from pandas.plotting import scatter_matrix ans = port.pricing_bounds(premium, p=0.98) scatter_matrix(ans.allocs, marker='.', s=5, alpha=1, figsize=(10, 10), diagonal='kde' ) """ from .bounds import Bounds if a == 0: assert p > 0, 'Must provide either a or p' a = self.q(p) # need a -= self.bs? S = self.density_df.loc[:a, 'S'].copy() # last entry needs to include all remaining losses from a-bs onwards, hence: S.iloc[-1] = 0. bounds = Bounds(self) bounds.add_one = True bounds.tvar_cloud('total', premium, a, n_tps, S.values, kind=kind) # TODO: (hack) pl=1 and s=1 is driving an error NAN - need to replace with 0. But WHY? gS = bounds.cloud_df.fillna(0).values.T gps = -np.diff(gS, axis=1, prepend=1) # sum products for allocations deal_losses = self.density_df.filter(regex='exeqa_[A-Z0-9]').loc[:a] if self.sf(a) > 0: # see notes below in slow method logger.info('Adjusting tail of deal_losses') deal_losses.iloc[-1] = self.density_df.loc[a-self.bs].filter(regex='exi_xgta_[A-Z]') * a # compute the allocations allocs = pd.DataFrame( gps @ (deal_losses.to_numpy()), columns=[i.replace('exeqa_', 'alloc_') for i in deal_losses.columns], index=bounds.weight_df.index) # this is a good audit: should have max = min allocs['total'] = allocs.sum(1) allocs = allocs.rename(columns=lambda x: x.replace('alloc_', '')) # summary stats stats = pd.concat((pd.concat((allocs.min(0), allocs.mean(0), allocs.max(0)), keys=['min', 'mean', 'max'], axis=0).unstack(1), pd.concat((allocs.idxmin(0), allocs.idxmax(0)), keys=['min', 'max'], axis=0).unstack(1)), axis=1).fillna('') if slow: # alternative method (slow) logger.info('Calculating extreme natural allocations: mechanical method') # probabilities of total loss pt = self.density_df.p_total S = 1 - np.minimum(1, pt.loc[:a].cumsum()) # individual deal losses deal_losses = self.density_df.loc[:a].filter(regex='exeqa_') if self.sf(a) > 0: # need to adjust for losses in default region # use the linear natural allocation (note price uses the lifted allocation) # replace the last row with E[Xi/X | X>=a] a # need >=, so pull from the prior row # exi_xgta includes a sum column for the total, must exclude that deal_losses.iloc[-1] = self.density_df.\ drop(columns='exi_xgta_sum').\ loc[[a-self.bs]].filter(regex='exi_xgta_') * a # deal_losses # linear natural allocation for each total outcome ans = {} i = 0 for _, r in bounds.weight_df.reset_index().iterrows(): pl, pu, tl, tu, w = r.values d = Distortion('bitvar', w, df=[pl, pu]) # pricing kernel, this in-lines the function rap (that was in extensions.sample) gS = np.array(d.g(S)) z = -np.diff(gS[:-1], prepend=1, append=0) ans[(pl, pu)] = z @ deal_losses i += 1 if verbose and i % verbose == 0: logger.info(f'Completed {i} out of {len(bounds.weight_df)} biTVaRs') # all pricing ranges: rows = (pl, pu) pairs defining extreme distortion # cols = deals allocs_slow = pd.concat(ans.values(), keys=ans.keys()).unstack(2) # get max/min/mean natural allocation by deal and compare to actual pricing comp = {} for c in allocs_slow.iloc[:, :-1]: col = allocs_slow[c] nm = c.split('_')[1] comp[nm] = [col.min(), col.mean(), col.max()] comp = pd.DataFrame(comp.values(), index=comp.keys(), columns=['min', 'mean', 'max']) else: comp = allocs_slow = None # assemble answer ans = namedtuple('pricing_bounds', 'bounds allocs stats comp allocs_slow p_star') p_star = bounds.p_star('total', premium, a, kind=kind) return ans(bounds, allocs, stats, comp, allocs_slow, p_star)
@property def distortion(self): return self._distortion
[docs] def remove_fuzz(self, df=None, eps=0, force=False, log=''): """ remove fuzz at threshold eps. if not passed use np.finfo(float).eps. Apply to self.density_df unless df is not None Only apply if self.remove_fuzz or force :param eps: :param df: apply to dataframe df, default = self.density_df :param force: do regardless of self.remove_fuzz :return: """ if df is None: df = self.density_df if eps == 0: eps = np.finfo(float).eps if self._remove_fuzz or force: logger.debug(f'Portfolio.remove_fuzz | Removing fuzz from {self.name} dataframe, caller {log}') df[df.select_dtypes(include=['float64']).columns] = \ df.select_dtypes(include=['float64']).applymap(lambda x: 0 if abs(x) < eps else x)
def __repr__(self): """ Goal unmbiguous :return: """ # return str(self.to_dict()) # this messes up when port = self has been enhanced... # cannot use ex, etc. because object may not have been updated return f'{self.name} at {super().__repr__()}'
[docs] def _repr_html_(self): """ Updated to mimic Aggregate """ s = [f'<h3>Portfolio object: {self.name}</h3>'] _n = len(self.agg_list) _s = "" if _n <= 1 else "s" s.append(f'Portfolio contains {_n} aggregate component{_s}.') if self.bs > 0: s.append(f'Updated with bucket size {self.bs:.6g}, log2 = {self.log2}, validation: {self.explain_validation()}') df = self.describe return '\n'.join(s) + df.fillna('').to_html()
def __str__(self): """ Default behavior """ if self.audit_df is None: ex = self.statistics_df.loc[('agg', 'mean'), 'total'] empex = np.nan isupdated = False else: ex = self.audit_df.loc['total', 'Mean'] empex = self.audit_df.loc['total', 'EmpMean'] isupdated = True s = [f'Portfolio object {self.name:s}', f'Theoretic expected loss {ex:,.1f}', f'Estimated expected loss {empex:,.1f}', f'Error {empex / ex - 1:.6g}' ] s.append( f'Updated {isupdated}' ) if self.bs > 0: if self.bs > 1: s.append(f'bs {self.bs}') else: s.append(f'bs 1 / {int(1/self.bs)}') s.append(f'log2 {self.log2}') s.append(f'validation_eps {self.validation_eps}') s.append(f'padding {self.padding}') s.append(f'sev_calc {self.sev_calc}') s.append(f'normalize {self.normalize}') s.append(f'remove_fuzz {self._remove_fuzz}') s.append(f'approx_type {self.approx_type}') s.append(f'approx_freq_ge {self.approx_freq_ge}') s.append(f'distortion {repr(self._distortion)}') if isupdated: s.append('') with pd.option_context('display.width', 140, 'display.float_format', lambda x: f'{x:,.5g}'): # get it on one row s.append(str(self.describe)) # s.append(super(Portfolio, self).__repr__()) return '\n'.join(s) def __hash__(self): """ hashing behavior :return: """ return hash(repr(self.__dict__)) def __iter__(self): """ make Portfolio iterable: for each x in Portfolio :return: """ return iter(self.agg_list) def __getitem__(self, item): """ allow Portfolio[slice] to return bits of agg_list :param item: :return: """ if type(item) == str: return self.agg_list[self.line_names.index(item)] return self.agg_list[item] @property def statistics(self): """ Same as statistics df, to be consistent with Aggregate objects :return: """ return self.statistics_df @property def info(self): s = [] s.append(f'portfolio object name {self.name}') s.append(f'aggregate objects {len(self.line_names):d}') if self.bs > 0: bss = f'{self.bs:.6g}' if self.bs >= 1 else f'1/{int(1/self.bs)}' s.append(f'bs {bss}') s.append(f'log2 {self.log2}') s.append(f'padding {self.padding}') s.append(f'sev_calc {self.sev_calc}') s.append(f'normalize {self.normalize}') s.append(f'last update {self.last_update}') s.append(f'hash {self.hash_rep_at_last_update:x}') return '\n'.join(s) @property def describe(self): """ Theoretic and empirical stats. Used in _repr_html_. Leverage Aggregate object stats; same format """ sev_m = self.statistics_df.loc[('sev', 'ex1'), 'total'] sev_cv = self.statistics_df.loc[('sev', 'cv'), 'total'] sev_skew = self.statistics_df.loc[('sev', 'skew'), 'total'] n_m = self.statistics_df.loc[('freq', 'ex1'), 'total'] n_cv = self.statistics_df.loc[('freq', 'cv'), 'total'] n_skew = self.statistics_df.loc[('freq', 'skew'), 'total'] a_m = self.statistics_df.loc[('agg', 'ex1'), 'total'] a_cv = self.statistics_df.loc[('agg', 'cv'), 'total'] a_skew = self.statistics_df.loc[('agg', 'skew'), 'total'] df = pd.DataFrame({'E[X]': [n_m, sev_m, a_m], 'CV(X)': [n_cv, sev_cv, a_cv], 'Skew(X)': [n_skew, sev_skew, a_skew]}, index=['Freq', 'Sev', 'Agg']) df.index.name = 'X' if self.audit_df is not None: df.loc['Sev', 'Est E[X]'] = np.nan df.loc['Agg', 'Est E[X]'] = self.est_m df['Err E[X]'] = df['Est E[X]'] / df['E[X]'] - 1 df.loc['Sev', 'Est CV(X)'] = np.nan df.loc['Agg', 'Est CV(X)'] = self.est_cv df['Err CV(X)'] = df['Est CV(X)'] / df['CV(X)'] - 1 df.loc['Sev', 'Est Skew(X)'] = np.nan df.loc['Agg', 'Est Skew(X)'] = self.est_skew df = df[['E[X]', 'Est E[X]', 'Err E[X]', 'CV(X)', 'Est CV(X)', 'Err CV(X)', 'Skew(X)', 'Est Skew(X)']] t1 = [a.describe for a in self] + [df] t2 = [a.name for a in self] + ['total'] df = pd.concat(t1, keys=t2, names=['unit', 'X']) if self.audit_df is not None: # add estimated severity sev_est_m = (df.xs('Sev', axis=0, level=1)['Est E[X]'].iloc[:-1].astype(float) * df.xs('Freq', axis=0, level=1)['E[X]'].iloc[:-1]).sum() / df.loc[('total', 'Freq'), 'E[X]'] df.loc[('total', 'Sev'), 'Err E[X]'] = sev_est_m / df.loc[('total', 'Sev'), 'E[X]'] - 1 df.loc[('total', 'Sev'), 'Est E[X]'] = sev_est_m return df @property def spec(self): """ Get the dictionary specification. :return: """ d = dict() d['name'] = self.name d['spec_list'] = [a._spec for a in self.agg_list] return d @property def spec_ex(self): """ All relevant info. :return: """ return {'type': type(self), 'spec': self.spec, 'bs': self.bs, 'log2': self.log2, 'aggs': len(self.agg_list)}
[docs] def json(self, stream=None): """ write object as json :param stream: :return: stream or text """ args = dict() args["bs"] = self.bs args["log2"] = self.log2 args["padding"] = self.padding args["tilt_amount"] = self.tilt_amount args["distortion"] = repr(self._distortion) args["sev_calc"] = self.sev_calc args["remove_fuzz"] = self._remove_fuzz args["approx_type"] = self.approx_type args["approx_freq_ge"] = self.approx_freq_ge args["last_update"] = str(self.last_update) args["hash_rep_at_last_update"] = str(self.hash_rep_at_last_update) d = self.spec d['args'] = args logger.debug(f'Portfolio.json| dummping {self.name} to {stream}') s = json.dumps(d) # , default_flow_style=False, indent=4) logger.debug(f'Portfolio.json | {s}') if stream is None: return s else: return stream.write(s)
[docs] def save(self, filename='', mode='a'): """ persist to json in filename; if none save to user.json :param filename: :param mode: for file open :return: """ if filename == "": filename = Path.home() / 'agg/user.json' filename.parent.mkdir(parents=True, exist_ok=True) with filename.open(mode=mode, encoding='utf-8') as f: self.json(stream=f) logger.debug(f'Portfolio.save | {self.name} saved to {filename}')
def __add__(self, other): """ Add two portfolio objects INDEPENDENT sum (down road can look for the same severity...) :param other: :return: """ assert isinstance(other, Portfolio) new_spec = [] for a in self.agg_list: c = deepcopy(a._spec) c['name'] = c['name'] new_spec.append(c) for a in other.agg_list: c = deepcopy(a._spec) c['name'] = c['name'] new_spec.append(c) return Portfolio(f'({self.name}) + ({other.name})', new_spec) def __rmul__(self, other): """ new = other * self; treat as scale change :param other: :return: """ assert other > 0 new_spec = [] for a in self.agg_list: new_spec.append(deepcopy(a._spec)) for d in new_spec: # d is a dictionary agg spec, need to adjust the severity s = d['severity'] if 'mean' in s: s['mean'] *= other elif 'scale' in s: s['scale'] *= other else: raise ValueError(f"Cannot adjust s['name'] for scale") return Portfolio(f'{other} x {self.name}', new_spec) def __mul__(self, other): """ new = self * other, other integer, sum of other independent copies :param other: :return: """ assert isinstance(other, int) new_spec = [] for a in self.agg_list: new_spec.append(deepcopy(a._spec)) for d in new_spec: # d is a dictionary agg spec, need to adjust the frequency # TODO better freq dists; deal with Bernoulli where n=log<1 d['frequency']['n'] *= other return Portfolio(f'Sum of {other} copies of {self.name}', new_spec)
[docs] def snap(self, x): """ snap value x to the index of density_df :param x: :return: """ ix = self.density_df.index.get_indexer([x], 'nearest')[0] return self.density_df.iloc[ix, 0]
[docs] def audits(self, kind='all', **kwargs): """ produce audit plots to assess accuracy of outputs. Currently only exeqa available :param kind: :param kwargs: passed to pandas plot, e.g. set xlim :return: """ if kind == 'all': kind = ['exeqa'] for k in kind: if k == 'exeqa': temp = self.density_df.filter(regex='exeqa_.*(?<!total)$').copy() temp['sum'] = temp.sum(axis=1) temp['err'] = temp['sum'] - temp.index f, axs = plt.subplots(1, 2, figsize=(8, 3.75), constrained_layout=True) ax = axs.flatten() a = temp['err'].abs().plot(logy=True, title=f'Exeqa Sum Error', ax=ax[1], **kwargs) a.plot(self.density_df.loss, self.density_df.p_total, label='p_total') a.plot(self.density_df.loss, self.density_df.p_total * temp.err, label='prob wtd err') a.grid('b') a.legend(loc='lower left') if 'xlim' in kwargs: kwargs['ylim'] = kwargs['xlim'] temp.filter(regex='exeqa_.*(?<!total)$|sum').plot(title='exeqa and sum of parts', ax=ax[0], **kwargs).grid('b') f.suptitle(f'E[Xi | X=x] vs. Sum of Parts\nbs={self.bs}, log2={self.log2}, padding={self.padding}', fontsize='x-large') return f # for doc maker
[docs] def q(self, p, kind='lower'): """ Return quantile function of density_df.p_total. Definition 2.1 (Quantiles) x(α) = qα(X) = inf{x ∈ R : P[X ≤ x] ≥ α} is the lower α-quantile of X x(α) = qα(X) = inf{x ∈ R : P[X ≤ x] > α} is the upper α-quantile of X. ``kind=='middle'`` has been removed. :param p: :param kind: 'lower' or 'upper'. :return: """ if kind == 'middle': logger.warning(f'kind=middle is deprecated, replacing with kind=lower') kind = 'lower' assert kind in ['lower', 'upper'], 'kind must be lower or upper' if self._var_tvar_function is None: # revised June 2023 ser = self.density_df.query('p_total > 0').p_total self._make_var_tvar(ser) return self._var_tvar_function[kind](p)
[docs] def _make_var_tvar(self, ser): """ There is no severity version here, so this knows where to store the answer, cf Aggregate version. """ self._var_tvar_function = {} qf = make_var_tvar(ser) self._var_tvar_function['upper'] = qf.q_upper self._var_tvar_function['lower'] = qf.q_lower self._var_tvar_function['tvar'] = qf.tvar
[docs] def cdf(self, x): """ distribution function :param x: :return: """ if self._cdf is None: # Dec 2019: kind='linear' --> kind='previous' self._cdf = interpolate.interp1d(self.density_df.loss, self.density_df.F, kind='previous', bounds_error=False, fill_value='extrapolate') return 0. + self._cdf(x)
[docs] def sf(self, x): """ survival function :param x: :return: """ return 1 - self.cdf(x)
[docs] def pdf(self, x): """ probability density function, assuming a continuous approximation of the bucketed density :param x: :return: """ if self._pdf is None: self._pdf = interpolate.interp1d(self.density_df.loss, self.density_df.p_total, kind='linear', bounds_error=False, fill_value='extrapolate') return self._pdf(x) / self.bs
[docs] def pmf(self, x): """ Probability mass function, treating aggregate as discrete x must be in the index (?) """ if self.density_df is None: raise ValueError("Must update before computing probabilities!") try: return self.density_df.loc[x, 'p_total'] except KeyError: return 0.0
# raise KeyError(f'Value {x} must be in index for probability mass function.')
[docs] def var(self, p): """ value at risk = alias for quantile function :param p: :return: """ return self.q(p)
[docs] def tvar(self, p, kind=''): """ Compute the tail value at risk at threshold p. Revised June 2023. Really this function returns ES, CVaR, but in modern terminology this is called TVaR. Definition 2.6 (Tail mean and Expected Shortfall) Assume E[X−] < ∞. Then x¯(α) = TM_α(X) = α^{−1}E[X 1{X≤x(α)}] + x(α) (α − P[X ≤ x(α)]) is α-tail mean at level α the of X. Acerbi and Tasche (2002) McNeil etc. p66-70 - this follows from def of ES as an integral of the quantile function :param p: :param kind: No longer neeed as the new method is exact (equals the old tail) and about 1000x faster. :return: """ if kind != '' and getattr(self, 'tvar-warning', 0) == 0: logger.warning('kind is no longer used in TVaR, new method equivalent to kind=tail but much faster. ' 'Argument kind will be removed in the future.') setattr(self, 'tvar-warning', 1) if kind == 'inverse': logger.warning('kind=inverse called...??!!') assert self.density_df is not None, 'Must recompute prior to computing tail value at risk.' if self._var_tvar_function is None: # revised June 2023 ser = self.density_df.query('p_total > 0').p_total self._make_var_tvar(ser) return self._var_tvar_function['tvar'](p)
[docs] def tvar_threshold(self, p, kind): """ Find the value pt such that TVaR(pt) = VaR(p) using Bisection method. Will fail if p=0 because signs are the same. """ # target value a = self.q(p, kind) if p == 0: # mean is mean return 0 def f(p): return self.tvar(p) - a p1 = bisect(f, 0, 1) # loop = 0 # p1 = max(.1, 1 - 2 * (1 - p)) # fp1 = f(p1) # delta = 1e-5 # while abs(fp1) > 1e-6 and loop < 20: # df1 = (f(p1 + delta / 2) - f(p1 - delta / 2)) / delta # p1 = p1 - fp1 / df1 # fp1 = f(p1) # loop += 1 # if loop == 20: # raise ValueError(f'Difficulty computing TVaR to match VaR at p={p}; last guess {p1}') return p1
[docs] def equal_risk_var_tvar(self, p_v, p_t): """ solve for equal risk var and tvar: find pv and pt such that sum of individual line VaR/TVaR at pv/pt equals the VaR(p) or TVaR(p_t) these won't return elements in the index because you have to interpolate hence using kind=middle """ # these two should obviously be the same target_v = self.q(p_v, 'middle') target_t = self.tvar(p_t) def fv(p): return sum([float(a.q(p, 'middle')) for a in self]) - target_v def ft(p): return sum([float(a.tvar(p)) for a in self]) - target_t ans = np.zeros(2) for i, f in enumerate([fv, ft]): p1 = 1 - 2 * (1 - (p_v if i == 0 else p_t)) fp1 = f(p1) loop = 0 delta = 1e-5 while abs(fp1) > 1e-6 and loop < 10: dfp1 = (f(p1 + delta) - fp1) / delta p1 = p1 - fp1 / dfp1 fp1 = f(p1) loop += 1 if loop == 100: raise ValueError(f'Trouble finding equal risk {"TVaR" if i else "VaR"} at p_v={p_v}, p_t={p_t}. ' 'No convergence after 100 iterations. ') ans[i] = p1 return ans
[docs] def equal_risk_epd(self, a): """ determine the common epd threshold so sum sa equals a """ def f(p): return sum([self.epd_2_assets[(l, 0)](p) for l in self.line_names]) - a p1 = self.assets_2_epd[('total', 0)](a) fp1 = f(p1) loop = 0 delta = 1e-5 while abs(fp1) > 1e-6 and loop < 10: dfp1 = (f(p1 + delta) - fp1) / delta p1 = p1 - fp1 / dfp1 fp1 = f(p1) loop += 1 if loop == 100: raise ValueError(f'Trouble finding equal risk EPD at pe={p1}. No convergence after 100 iterations. ') return p1
[docs] def merton_perold(self, p, kind='lower'): """ Compute Merton-Perold capital allocation at VaR(p) capital using VaR as risk measure. TODO TVaR version of Merton Perold """ # figure total assets a = self.q(p, kind) # shorthand abbreviation df = self.density_df loss = df.loss ans = [] total = 0 for l in self.line_names: q = self.density_df.loss.iloc[np.searchsorted(self.density_df[f'ημ_{l}'].cumsum(), .995, side='right')] diff = a - q ans.append(diff) total += diff ans.append(total) return ans
[docs] def cotvar(self, p): """ Compute the p co-tvar asset allocation using ISA. Asset alloc = exgta = tail expected value, treating TVaR like a pricing variable. """ av = self.q(p) return self.density_df.loc[av, [f'exgta_{l}' for l in self.line_names_ex]].values
[docs] def as_severity(self, limit=np.inf, attachment=0, conditional=False): """ Convert portfolio into a severity without recomputing. Throws an error if self not updated. :param limit: :param attachment: :param conditional: :return: """ if self.density_df is None: raise ValueError('Must update prior to converting to severity') return Severity(sev_name=self, sev_a=self.log2, sev_b=self.bs, exp_attachment=attachment, exp_limit=limit, sev_conditional=conditional)
[docs] def approximate(self, approx_type='slognorm', output='scipy'): """ Create an approximation to self using method of moments matching. Returns a dictionary specification of the portfolio aggregate_project. If updated uses empirical moments, otherwise uses theoretic moments :param approx_type: slognorm | sgamma | normal :param output: return a dict or agg language specification :return: """ if approx_type == 'all': return {kind: self.approximate(kind) for kind in ['norm', 'gamma', 'lognorm', 'sgamma', 'slognorm']} if self.audit_df is None: # not updated m = self.statistics_df.loc[('agg', 'mean'), 'total'] cv = self.statistics_df.loc[('agg', 'cv'), 'total'] skew = self.statistics_df.loc[('agg', 'skew'), 'total'] else: # use statistics_df matched to computed aggregate_project m, cv, skew = self.audit_df.loc['total', ['EmpMean', 'EmpCV', 'EmpSkew']] name = f'{approx_type[0:4]}.{self.name[0:5]}' agg_str = f'agg {name} 1 claim sev ' note = f'frozen version of {self.name}' return approximate_work(m, cv, skew, name, agg_str, note, approx_type, output)
[docs] def collapse(self, approx_type='slognorm'): """ Returns new Portfolio with the fit Deprecated...prefer uw.write(self.fit()) to go through the agg language approach. :param approx_type: slognorm | sgamma :return: """ spec = self.fit(approx_type, output='dict') logger.debug(f'Portfolio.collapse | Collapse created new Portfolio with spec {spec}') logger.warning(f'Portfolio.collapse | Collapse is deprecated; use fit() instead.') return Portfolio(f'Collapsed {self.name}', [spec])
[docs] def percentiles(self, pvalues=None): """ report_ser on percentiles and large losses. Uses interpolation, audit_df uses nearest. :param pvalues: optional vector of log values to use. If None sensible defaults provided :return: DataFrame of percentiles indexed by line and log """ df = pd.DataFrame(columns=['line', 'log', 'Agg Quantile']) df = df.set_index(['line', 'log']) # df.columns.name = 'perspective' if pvalues is None: pvalues = [0.5, 0.75, 0.8, 0.85, 0.9, 0.95, 0.98, 0.99, 0.994, 0.995, 0.999, 0.9999] for line in self.line_names_ex: q_agg = interpolate.interp1d(self.density_df[f'p_{line}'].cumsum(), self.density_df.loss, kind='linear', bounds_error=False, fill_value='extrapolate') for p in pvalues: qq = q_agg(p) df.loc[(line, p), :] = [float(qq)] df = df.unstack(level=1) return df
[docs] def recommend_bucket(self): """ Data to help estimate a good bucket size. :return: """ df = pd.DataFrame(columns=['line', 'bs10']) df = df.set_index('line') for a in self.agg_list: df.loc[a.name, :] = [a.recommend_bucket(10)] df['bs11'] = df['bs10'] / 2 df['bs12'] = df['bs10'] / 4 df['bs13'] = df['bs10'] / 8 df['bs14'] = df['bs10'] / 16 df['bs15'] = df['bs10'] / 32 df['bs16'] = df['bs10'] / 64 df['bs17'] = df['bs10'] / 128 df['bs18'] = df['bs10'] / 256 df['bs19'] = df['bs10'] / 515 df['bs20'] = df['bs10'] / 1024 df.loc['total', :] = df.sum() return df
[docs] def best_bucket(self, log2=16, recommend_p=RECOMMEND_P): """ Recommend the best bucket. Rounded recommended bucket for log2 points. TODO: Is this really the best approach?! :param log2: :param recommend_p: :return: """ # bs = sum([a.recommend_bucket(log2, p=recommend_p) for a in self]) bs = sum([a.recommend_bucket(log2, p=recommend_p) ** 2 for a in self]) ** 0.5 return round_bucket(bs)
[docs] def update(self, log2, bs, approx_freq_ge=100, approx_type='slognorm', remove_fuzz=False, sev_calc='discrete', discretization_calc='survival', normalize=True, padding=1, tilt_amount=0, trim_df=False, add_exa=True, force_severity=True, recommend_p=RECOMMEND_P, approximation=None, debug=False): """ TODO: currently debug doesn't do anything... Create density_df, performs convolution. optionally adds additional information if ``add_exa=True`` for allocation and priority analysis tilting: [@Grubel1999]: Computation of Compound Distributions I: Aliasing Errors and Exponential Tilting (ASTIN 1999) tilt x numbuck < 20 is recommended log. 210 num buckets and max loss from bucket size Aggregate reinsurance in parser has replaced the aggregate_cession_function (a function of a Portfolio object that adjusts individual line densities; applied after line aggs created but before creating not-lines; actual statistics do not reflect impact.) Agg re by unit is now applied in the Aggregate object. TODO: consider aggregate covers at the portfolio level...Where in parse - at the top! :param log2: :param bs: bucket size :param approx_freq_ge: use method of moments if frequency is larger than ``approx_freq_ge`` :param approx_type: type of method of moments approx to use (slognorm or sgamma) :param remove_fuzz: remove machine noise elements from FFT :param sev_calc: how to calculate the severity, discrete (point masses as xs) or continuous (uniform between xs points) :param discretization_calc: survival or distribution (accurate on right or left tails) :param normalize: if true, normalize the severity so sum probs = 1. This is generally what you want; but :param padding: for fft 1 = double, 2 = quadruple :param tilt_amount: for tiling methodology - see notes on density for suggested parameters :param epds: epd points for priority analysis; if None-> sensible defaults :param trim_df: remove unnecessary columns from density_df before returning :param add_exa: run add_exa to append additional allocation information needed for pricing; if add_exa also add epd info :param force_severity: force computation of severities for aggregate components even when approximating :param recommend_p: percentile to use for bucket recommendation. :param approximation: if not None, use these instructions ('exact') :param debug: if True, print debug information :return: """ self._valid = None # reset valid flag if approximation is not None: if approximation == 'exact': approx_freq_ge = 1e9 if log2 <= 0: raise ValueError('log2 must be >= 0') self.log2 = log2 if bs == 0: self.bs = self.best_bucket(log2, recommend_p) logger.info(f'bs=0 enterered, setting bs={bs:.6g} using self.best_bucket rounded to binary fraction.') else: self.bs = bs self.padding = padding self.tilt_amount = tilt_amount self.approx_type = approx_type self.sev_calc = sev_calc self._remove_fuzz = remove_fuzz self.approx_type = approx_type self.approx_freq_ge = approx_freq_ge self.discretization_calc = discretization_calc self.normalize = normalize if self.hash_rep_at_last_update == hash(self): # this doesn't work logger.warning(f'Nothing has changed since last update at {self.last_update}') return self._var_tvar_function = None ft_line_density = {} # line_density = {} # not_line_density = {} # add the densities # tilting: [@Grubel1999]: Computation of Compound Distributions I: Aliasing Errors and Exponential Tilting # (ASTIN 1999) # tilt x numbuck < 20 recommended log. 210 # num buckets and max loss from bucket size N = 1 << log2 MAXL = N * bs xs = np.linspace(0, MAXL, N, endpoint=False) # make all the single line aggs # note: looks like duplication but will all be references # easier for add_exa to have as part of the portfolio module # tilt if self.tilt_amount != 0: tilt_vector = np.exp(self.tilt_amount * np.arange(N)) else: tilt_vector = None # where the answer will live self.density_df = pd.DataFrame(index=xs) self.density_df['loss'] = xs ft_all = None for agg in self.agg_list: raw_nm = agg.name nm = f'p_{agg.name}' # agg.update_work handles the reinsurance too agg.update_work(xs, self.padding, tilt_vector, 'exact' if agg.n < approx_freq_ge else approx_type, sev_calc, discretization_calc, normalize, force_severity, debug) ft_line_density[raw_nm] = agg.ftagg_density self.density_df[nm] = agg.agg_density if ft_all is None: ft_all = np.copy(ft_line_density[raw_nm]) else: ft_all *= ft_line_density[raw_nm] self.density_df['p_total'] = np.real(ift(ft_all, self.padding, tilt_vector)) # ft_line_density['total'] = ft_all # make the not self.line_density = sum of all but the given line # have the issue here that if you divide and the dist # is symmetric then you get a div zero... ft_nots = {} for line in self.line_names: ft_not = np.ones_like(ft_all) if np.any(ft_line_density[line] == 0): # have to build up for not_line in self.line_names: if not_line != line: ft_not *= ft_line_density[not_line] else: if len(self.line_names) > 1: ft_not = ft_all / ft_line_density[line] ft_nots[line] = ft_not self.remove_fuzz(log='update') # make audit statistics_df df theoretical_stats = self.statistics_df.T.filter(regex='agg') theoretical_stats.columns = ['EX1', 'EX2', 'EX3', 'Mean', 'CV', 'Skew', 'Limit', 'P99.9Est'] theoretical_stats = theoretical_stats[['Mean', 'CV', 'Skew', 'Limit', 'P99.9Est']] self.make_audit_df(columns=self.line_names_ex, theoretical_stats=theoretical_stats) # add exa details if add_exa: self.add_exa(self.density_df, ft_nots=ft_nots) else: # at least want F and S to get quantile functions self.density_df['F'] = np.cumsum(self.density_df.p_total) self.density_df['S'] = 1 - self.density_df.F self.ex = self.audit_df.loc['total', 'EmpMean'] # pull out estimated stats to match Aggergate self.est_m, self.est_cv, self.est_skew = self.audit_df.loc['total', ['EmpMean', 'EmpCV', 'EmpSkew']] self.est_sd = self.est_m * self.est_cv self.est_var = self.est_sd ** 2 self.last_update = np.datetime64('now') self.hash_rep_at_last_update = hash(self) if trim_df: self.trim_df() # invalidate stored functions self._var_tvar_function = None self._cdf = None
[docs] def make_audit_df(self, columns, theoretical_stats=None): """ Add or update the audit_df. """ # these are the values set by the audit column_names = ['Sum probs', 'EmpMean', 'EmpCV', 'EmpSkew', "EmpKurt", 'EmpEX1', 'EmpEX2', 'EmpEX3'] + \ ['P' + str(100 * i) for i in self.audit_percentiles] if self.audit_df is None: self.audit_df = pd.DataFrame(columns=column_names) for col in columns: sump = np.sum(self.density_df[f'p_{col}']) t = self.density_df[f'p_{col}'] * self.density_df['loss'] ex1 = np.sum(t) t *= self.density_df['loss'] ex2 = np.sum(t) t *= self.density_df['loss'] ex3 = np.sum(t) t *= self.density_df['loss'] ex4 = np.sum(t) m, cv, s = MomentAggregator.static_moments_to_mcvsk(ex1, ex2, ex3) # empirical kurtosis kurt = (ex4 - 4 * ex3 * ex1 + 6 * ex1 ** 2 * ex2 - 3 * ex1 ** 4) / ((m * cv) ** 4) - 3 ps = np.zeros((len(self.audit_percentiles))) temp = self.density_df[f'p_{col}'].cumsum() for i, p in enumerate(self.audit_percentiles): ps[i] = (temp > p).idxmax() newrow = [sump, m, cv, s, kurt, ex1, ex2, ex3] + list(ps) # TODO this is fragile if you request another set of percentiles # add the row, then subset it (when called first time you don't have # the Err columns) self.audit_df.loc[col, column_names] = newrow if theoretical_stats is not None and 'Mean' not in self.audit_df.columns: # merges on the first five columns: mean, cv, sk, limit, p99.9E self.audit_df = pd.concat((theoretical_stats, self.audit_df), axis=1, sort=True) try: self.audit_df['MeanErr'] = self.audit_df['EmpMean'] / self.audit_df['Mean'] - 1 self.audit_df['CVErr'] = self.audit_df['EmpCV'] / self.audit_df['CV'] - 1 self.audit_df['SkewErr'] = self.audit_df['EmpSkew'] / self.audit_df['Skew'] - 1 except ZeroDivisionError as e: raise e
@property def valid(self): """ Check if the model appears valid. See documentation for Aggregate.valid. An answer of True does not guarantee the model is valid, but False means it is definitely suspect. (Similar to the null hypothesis in a statistical test). Called and reported automatically by qd for Aggregate objects. Checks the relative errors (from ``self.describe``) for: * severity mean < eps * severity cv < 10 * eps * severity skew < 100 * eps (skewness is more difficult to estimate) * aggregate mean < eps and < 2 * severity mean relative error (larger values indicate possibility of aliasing and that ``bs`` is too small). * aggregate cv < 10 * eps * aggregate skew < 100 * esp eps = 1e-3 by default; change in ``validation_eps`` attribute. Test only applied for CV and skewness when they are > 0. :return: True if all tests are passed, else False. """ if self._valid is not None: return self._valid rv = Validation.NOT_UNREASONABLE if self.density_df is None: self._valid = Validation.NOT_UPDATED return Validation.NOT_UPDATED for a in self.agg_list: r = a.valid if r & Validation.REINSURANCE: logger.info(f'Aggregate {a.name} has reinsurance, validation n/a') elif not r: logger.info(f'Aggregate {a.name} fails validation') rv |= r if rv != Validation.NOT_UNREASONABLE: logger.info(f'Exiting: Portfolio validation steps skipped due to failed or n/a Aggregate validation') self._valid = rv return rv else: logger.info('No Aggregate object fails validation') # apply validation to the Portfolio total df = self.describe.xs('total', level=0, axis=0).abs() try: df['Err Skew(X)'] = df['Est Skew(X)'] / df['Skew(X)'] - 1 except ZeroDivisionError: df['Err Skew(X)'] = np.nan except TypeError: df['Err Skew(X)'] = np.nan eps = self.validation_eps if df.loc['Sev', 'Err E[X]'] > eps: logger.info('FAIL: Portfolio Sev mean error > eps') rv |= Validation.SEV_MEAN if df.loc['Agg', 'Err E[X]'] > eps: logger.info('FAIL: Portfolio Agg mean error > eps') rv |= Validation.AGG_MEAN if abs(df.loc['Sev', 'Err E[X]']) > 0 and df.loc['Agg', 'Err E[X]'] > 10 * df.loc['Sev', 'Err E[X]']: logger.info('FAIL: Agg mean error > 10 * sev error') rv |= Validation.ALIASING try: if np.inf > df.loc['Sev', 'CV(X)'] > 0 and df.loc['Sev', 'Err CV(X)'] > 10 * eps: logger.info('FAIL: Portfolio Sev CV error > eps') rv |= Validation.SEV_CV if np.inf > df.loc['Agg', 'CV(X)'] > 0 and df.loc['Agg', 'Err CV(X)'] > 10 * eps: logger.info('FAIL: Portfolio Agg CV error > eps') rv |= Validation.AGG_CV if np.inf > df.loc['Sev', 'Skew(X)'] > 0 and df.loc['Sev', 'Err Skew(X)'] > 100 * eps: logger.info('FAIL: Portfolio Sev skew error > eps') rv |= Validation.SEV_SKEW if np.inf > df.loc['Agg', 'Skew(X)'] > 0 and df.loc['Agg', 'Err Skew(X)'] > 100 * eps: logger.info('FAIL: Portfolio Agg skew error > eps') rv |= Validation.AGG_SKEW except (TypeError, ZeroDivisionError): pass if rv == Validation.NOT_UNREASONABLE: logger.info('Portfolio does not fail any validation: not unreasonable') self._valid = rv return rv
[docs] def explain_validation(self): """ Explain the validation result. Can pass in if already calculated. """ return explain_validation(self.valid)
@property def priority_capital_df(self): if self._priority_capital_df is None: # default priority analysis # never use these, so movaed out of main update # TODO make a property and create on demand logger.debug('Adding EPDs in Portfolio.update') epds = np.hstack( [np.linspace(0.5, 0.1, 4, endpoint=False)] + [np.linspace(10 ** -n, 10 ** -(n + 1), 9, endpoint=False) for n in range(1, 7)]) epds = np.round(epds, 7) self._priority_capital_df = pd.DataFrame(index=pd.Index(epds)) for col in self.line_names: for i in range(3): self._priority_capital_df['{:}_{:}'.format(col, i)] = self.epd_2_assets[(col, i)](epds) self._priority_capital_df['{:}_{:}'.format('total', 0)] = self.epd_2_assets[('total', 0)]( epds) col = 'not ' + col for i in range(2): self._priority_capital_df['{:}_{:}'.format(col, i)] = self.epd_2_assets[(col, i)](epds) self._priority_capital_df['{:}_{:}'.format('total', 0)] = self.epd_2_assets[('total', 0)](epds) self._priority_capital_df.columns = self._priority_capital_df.columns.str.split("_", expand=True) self._priority_capital_df.sort_index(axis=1, level=1, inplace=True) self._priority_capital_df.sort_index(axis=0, inplace=True) return self._priority_capital_df
[docs] def trim_df(self): """ Trim out unwanted columns from density_df epd used in graphics :return: """ self.density_df = self.density_df.drop( self.density_df.filter(regex='^e_|^exi_xlea|^[a-z_]+ημ').columns, axis=1 )
[docs] def gradient(self, epsilon=1 / 128, kind='homog', method='forward', distortion=None, remove_fuzz=True, extra_columns=None, do_swap=True): """ Compute the gradient of various quantities relative to a change in the volume of each portfolio component. Focus is on the quantities used in rate calculations: S, gS, p_total, exa, exag, exi_xgta, exi_xeqq, exeqa, exgta etc. homog: inhomog: :param epsilon: the increment to use; scale is 1+epsilon :param kind: homog[ogeneous] or inhomog: homog computes impact of f((1+epsilon)X_i)-f(X_i). Inhomog scales the frequency and recomputes. Note inhomog will have a slight scale issues with E[Severity] :param method: forward, central (using epsilon/2) or backwards :param distortion: if included derivatives of statistics using the distortion, such as exag are also computed :param extra_columns: extra columns to compute dervs of. Note there is virtually no overhead of adding additional columns :param do_swap: force the step to replace line with line+epsilon in all not line2's line2!=line1; whether you need this or not depends on what variables you to be differentiated. E.g. if you ask for exa_total only you don't need to swap. But if you want exa_A, exa_B you do, otherwise the d/dA exa_B won't be correct. TODO: replace with code! :return: DataFrame of gradients and audit_df in an Answer class """ if kind == 'inhomog' or kind[:7] == 'inhomog': raise NotImplementedError(f'kind=={kind} not yet implemented') if method == 'central': raise NotImplementedError(f'method=={method} not yet implemented') if method not in ('forward', 'backwards', 'central'): raise ValueError('Inadmissible option passed to gradient.') if self.tilt_amount: raise ValueError('Gradients do not allow tilts') # central = run this code forwards and backwards with epsilon / 2 and average?! # Forwards or backwards if method == 'forward': delta = 1 + epsilon dx = epsilon pm = '+' else: delta = 1 - epsilon dx = -epsilon pm = '-' # FFT functions for use in exa calculations; padding needs to be consistent with agg def loc_ft(x): return ft(x, self.padding, None) def loc_ift(x): return ift(x, self.padding, None) # setup (compare self.update) xs = self.density_df['loss'].values tilt_vector = None # (1+e)X computed for each line agg_epsilon_df = pd.DataFrame(index=xs) # compute the individual line (1+epsilon)X_i and then the revised total new_aggs = {} for base_agg in self.agg_list: agg = base_agg.rescale(delta, kind) new_aggs[base_agg.name] = agg _a = agg.update_work(xs, self.padding, tilt_vector, 'exact' if agg.n < self.approx_freq_ge else self.approx_type, self.sev_calc, self.discretization_calc) agg_epsilon_df[f'p_{agg.name}'] = agg.agg_density # the total with the line incremented agg_epsilon_df[f'p_total_{agg.name}'] = \ np.real(self.ift(agg.ftagg_density * loc_ft(self.density_df[f'ημ_{agg.name}']))) self.remove_fuzz(df=agg_epsilon_df, force=remove_fuzz, log='gradient') percentiles = [0.9, 0.95, 0.99, 0.996, 0.999, 0.9999, 1 - 1e-6] audit_df = pd.DataFrame( columns=['Sum probs', 'EmpMean', 'EmpCV', 'EmpSkew', 'EmpEX1', 'EmpEX2', 'EmpEX3'] + ['P' + str(100 * i) for i in percentiles]) # 949 = epsilon 916 Delta ep = chr(949) D = chr(916) for col in agg_epsilon_df.columns: sump = np.sum(agg_epsilon_df[col]) t = agg_epsilon_df[col] * xs ex1 = np.sum(t) t *= xs ex2 = np.sum(t) t *= xs ex3 = np.sum(t) m, cv, s = MomentAggregator.static_moments_to_mcvsk(ex1, ex2, ex3) ps = np.zeros((len(percentiles))) temp = agg_epsilon_df[col].cumsum() for i, p in enumerate(percentiles): ps[i] = (temp > p).idxmax() audit_df.loc[f'{col[2:]}{pm}{ep}', :] = [sump, m, cv, s, ex1, ex2, ex3] + list(ps) for l in self.line_names_ex: audit_df.loc[l, :] = self.audit_df.loc[l, :] # differences for l in self.line_names: audit_df.loc[f'{l}{D}', :] = audit_df.loc[f'{l}{pm}{ep}'] - audit_df.loc[l] audit_df.loc[f'total_{l}{D}', :] = audit_df.loc[f'total_{l}{pm}{ep}'] - audit_df.loc['total'] audit_df = audit_df.sort_index() # now need to iterate through each line to compute differences # variables we want to differentiate # note asking for derv of exa_A makes things a lot more complex...see swap function below # may want to default to not including that? columns_of_interest = ['S'] + [f'exa_{line}' for line in self.line_names_ex] if extra_columns: columns_of_interest += extra_columns # these are the columns add_exa expects columns_p_only = ['loss'] + [f'p_{line}' for line in self.line_names_ex] + \ [f'ημ_{line}' for line in self.line_names] # first, need a base and add exag to coi if distortion: _x = self.apply_distortion(distortion, create_augmented=False) base = _x.augmented_df columns_of_interest.extend(['gS'] + [f'exag_{line}' for line in self.line_names_ex]) else: base = self.density_df # and then a holder for the answer answer = pd.DataFrame(index=pd.Index(xs, name='loss'), columns=pd.MultiIndex.from_arrays(((), ()), names=('partial_wrt', 'line'))) answer.columns.name = 'derivatives' # the exact same as add exa; same padding no tilt def ae_ft(x): return ft(x, 1, None) def swap(adjust_line): """ in the not line swap A for Ae E.g. X = A + B + C and adjust_Line = A. Then not A is the same, but for not B and not C you need to swap A with A+epsilon. This function accomplishes the swap. :param ημ: :param A: :param Ae: :return: collection of all not lines with adjusted adjust_line adjusted_not_fft[line] is fft of not_line with adjust_line swapped out for line + epsilon """ # look if there are just two lines then this is easy......but want to see if this works too... adjusted_not_fft = {} adjust_line_ft = ae_ft(agg_epsilon_df[f'p_{adjust_line}']) base_line_ft = ae_ft(base[f'p_{adjust_line}']) adj_factor = adjust_line_ft / base_line_ft adj_factor[np.logical_and(base_line_ft == 0, adjust_line_ft == 0)] = 0 n_and = np.sum(np.logical_and(base_line_ft == 0, adjust_line_ft == 0)) n_or = np.sum(np.logical_or(base_line_ft == 0, adjust_line_ft == 0)) # TODO sort this out...often not actually the same... logger.info(f'SAME? And={n_and} Or={n_or}; Zeros in fft(line) and ' 'fft(line + epsilon for {adjust_line}.') for line in self.line_names: if line == adjust_line: # nothing changes, adjust_line not in not adjust_line it doesn't need to change adjusted_not_fft[line] = ae_ft(base[f'ημ_{line}']) else: adjusted_not_fft[line] = ae_ft(base[f'ημ_{line}']) * adj_factor return adjusted_not_fft # finally perform iteration and compute differences for line in self.line_names: gradient_df = base[columns_p_only].copy() gradient_df[f'p_{line}'] = agg_epsilon_df[f'p_{line}'] gradient_df['p_total'] = agg_epsilon_df[f'p_total_{line}'] if do_swap: # we also need to update ημ_lines whenever it includes line (i.e. always # call original add_exa function, operates on gradient_df in-place self.add_exa(gradient_df, ft_nots=swap(line)) else: self.add_exa(gradient_df) if distortion is not None: # apply to line + epsilon gradient_df = self.apply_distortion(distortion, df_in=gradient_df, create_augmented=False).augmented_df # compute differentials and store answer! answer[[(line, i) for i in columns_of_interest]] = (gradient_df[columns_of_interest] - base[columns_of_interest]) / dx return Answer(gradient=answer, audit=audit_df, new_aggs=new_aggs)
[docs] def report(self, report_list='quick'): """ :param report_list: :return: """ full_report_list = ['statistics', 'quick', 'audit', 'priority_capital', 'priority_analysis'] if report_list == 'all': report_list = full_report_list for r in full_report_list: if r in report_list: html_title(f'{r} Report for {self.name}', 1) if r == 'priority_capital': if self._priority_capital_df is not None: display(self._priority_capital_df.loc[1e-3:1e-2, :].style) else: html_title(f'Report {r} not generated', 2) elif r == 'quick': if self.audit_df is not None: df = self.audit_df[['Mean', 'EmpMean', 'MeanErr', 'CV', 'EmpCV', 'CVErr', 'P99.0']] display(df.style) else: html_title(f'Report {r} not generated', 2) else: df = getattr(self, r + '_df', None) if df is not None: try: display(df.style) except ValueError: display(df) else: html_title(f'Report {r} not generated', 2)
@property def report_df(self): if self.audit_df is not None: summary_sl = (slice(None), ['mean', 'cv', 'skew']) bit1 = self.statistics_df.loc[summary_sl, :] bit1.index = ['freq_m', 'sev_m', 'agg_m', 'freq_cv', 'sev_cv', 'agg_cv', 'freq_skew', 'sev_skew', 'agg_skew'] bit2 = self.audit_df[['EmpMean', 'EmpCV', 'EmpSkew', 'EmpKurt', 'P99.0', 'P99.6']].T bit2.index = ['agg_emp_m', 'agg_emp_cv', 'agg_emp_skew', 'agg_emp_kurt', 'P99.0_emp', 'P99.6_emp'] df = pd.concat((bit1, bit2), axis=0) df.loc['agg_m_err', :] = df.loc['agg_emp_m'] / df.loc['agg_m'] - 1 df.loc['agg_cv_err', :] = df.loc['agg_emp_cv'] / df.loc['agg_cv'] - 1 df.loc['agg_skew_err', :] = df.loc['agg_emp_skew'] / df.loc['agg_skew'] - 1 df = df.loc[['freq_m', 'freq_cv', 'freq_skew', 'sev_m', 'sev_cv', 'sev_skew', 'agg_m', 'agg_emp_m', 'agg_m_err', 'agg_cv', 'agg_emp_cv', 'agg_cv_err', 'agg_skew', 'agg_emp_skew', 'agg_skew_err', 'agg_emp_kurt', 'P99.0_emp','P99.6_emp']] df.columns.name = 'unit' df.index.name = 'statistic' else: df = None return df @property def pprogram(self): """ pretty print the program to html """ return pprint_ex(self.program, 20) @property def pprogram_html(self): """ pretty print the program to html """ return pprint_ex(self.program, 0, html=True)
[docs] def limits(self, stat='range', kind='linear', zero_mass='include'): """ Suggest sensible plotting limits for kind=range, density, .. (same as Aggregate). Should optionally return a locator for plots? Called by ploting routines. Single point of failure! Must work without q function when not computed (apply_reins_work for occ reins...uses report_ser instead). :param stat: range or density or logy (for log density/survival function...ensure consistency) :param kind: linear or log (this is the y-axis, not log of range...that is rarely plotted) :param zero_mass: include exclude, for densities :return: """ # fudge l/r factors def f(x): fl, fr = 0.02, 1.02 return [-fl * x, fr * x] # lower bound for log plots eps = 1e-16 # if not computed have no business asking for limits assert self.density_df is not None if stat == 'range': if kind == 'linear': return f(self.q(0.999)) else: # wider range for log density plots return f(self.q(1 - 1e-10)) elif stat == 'density': mx = self.density_df.filter(regex='p_[a-zA-Z]').max().max() mxx0 = self.density_df.filter(regex='p_[a-zA-Z]').iloc[1:].max().max() if kind == 'linear': if zero_mass == 'include': return f(mx) else: return f(mxx0) else: return [eps, mx * 1.5] elif stat == 'logy': mx = min(1, self.density_df.filter(regex='p_[A-Za-z]').max().max()) return [1e-12, mx * 2] else: # if you fall through to here, wrong args raise ValueError(f'Inadmissible stat/kind passsed, expected range/density and log/linear.')
[docs] def plot(self, axd=None, figsize=(2 * FIG_W, FIG_H)): """ Defualt plot of density, survival functions (linear and log) :param axd: dictionary with plots A and B for density and log density :param figsize: arguments passed to make_mosaic_figure if no axd :return: """ if axd is None: self.figure, axd = make_mosaic_figure('AB', figsize=figsize) ax = axd['A'] xl = self.limits() yl = self.limits(stat='density', zero_mass='exclude') bit = self.density_df.filter(regex='p_[a-zA-Z]') if bit.shape[1] == 3: # put total first = Book standard bit = bit.iloc[:, [2,0,1]] bit.plot(ax=ax, xlim=xl, ylim=yl) ax.set(xlabel='Loss', ylabel='Density') ax.legend() ax = axd['B'] xl = self.limits(kind='log') yl = self.limits(stat='logy') bit.plot(ax=ax, logy=True, xlim=xl, ylim=yl) ax.set(xlabel='Loss', ylabel='Log density') ax.legend().set(visible=False)
# ax = axd['C'] # self.density_df.filter(regex='p_[a-zA-Z]')[::-1].cumsum().plot(ax=ax, xlim=xl, logy=True)
[docs] def scatter(self, marker='.', s=5, alpha=1, figsize=(10, 10), diagonal='kde', **kwargs): """ Create a scatter plot of marginals against one another, using pandas.plotting scatter_matrix. Designed for use with samples. Plots exeqa columns """ bit = self.density_df.query('p_total > 0').filter(regex='exeqa_[a-zA-Z]') ax = scatter_matrix(bit, marker='.', s=5, alpha=1, figsize=(10, 10), diagonal='kde', **kwargs) return ax
[docs] def uat_interpolation_functions(self, a0, e0): """ Perform quick audit of interpolation functions :param a0: base assets :param e0: base epd :return: """ # audit interpolation functions temp = pd.DataFrame(columns=['line', 'priority', 'epd', 'a from e', 'assets', 'e from a']) e2a = self.epd_2_assets a2e = self.assets_2_epd for i in range(3): for c in self.line_names + ['total'] + ['not ' + i for i in self.line_names]: # if i == 0 and c == 'total' or c != 'total': if (c, i) in a2e: e = a2e[(c, i)](a0) a = e2a[(c, i)](e0) temp.loc[c + "_" + str(i), :] = (c, i, e, e2a[(c, i)](e), a, a2e[(c, i)](a)) display(temp.style)
[docs] def add_exa(self, df, ft_nots=None): """ Use fft to add exeqa_XXX = E(X_i | X=a) to each dist also add exlea = E(X_i | X <= a) = sum_{x<=a} exa(x)*f(x) where f is for the total ie. self.density_df['exlea_attrit'] = np.cumsum( self.density_df.exa_attrit * self.density_df.p_total) / self.density_df.F and add exgta = E(X_i | X>a) since E(X) = E(X | X<= a)F(a) + E(X | X>a)S(a) we have exgta = (ex - exlea F) / S and add the actual expected losses (not theoretical) the empirical amount: self.density_df['e_attrit'] = np.sum( self.density_df.p_attrit * self.density_df.loss) Mid point adjustment is handled by the example creation routines self.density_df.loss = self.density_df.loss - bs/2 **YOU CANNOT HAVE A LINE with a name starting t!!!** See LCA_Examples for original code Alternative approach to exa: use UC=unconditional versions of exlea and exi_xgta: * exleaUC = np.cumsum(port.density_df['exeqa\_' + col] * port.density_df.p_total) # unconditional * exixgtaUC =np.cumsum( self.density_df.loc[::-1, 'exeqa\_' + col] / self.density_df.loc[::-1, 'loss'] * self.density_df.loc[::-1, 'p_total'] ) * exa = exleaUC + exixgtaUC * self.density_df.loss :param df: data frame to add to. Initially add_exa was only called by update and wrote to self.density_df. But now it is called by gradient too which writes to gradient_df, so we need to pass in this argument :param ft_nots: FFTs of the not lines (computed in gradients) so you don't round trip an FFT; gradients needs to recompute all the not lines each time around and it is stilly to do that twice """ # eps is used NOT to do silly things when x is so small F(x)< eps # below this percentile you do not try to condition on the event! # np.finfo(np.float).eps = 2.2204460492503131e-16 cut_eps = np.finfo(float).eps # get this done # defuzz(self.density_df, cut_eps) # bucket size bs = self.bs # self.density_df['loss'].iloc[1] - self.density_df['loss'].iloc[0] # index has already been reset # sum of p_total is so important...we will rescale it... if not np.all(df.p_total >= 0): # have negative densities...get rid of them first_neg = df.query(f'p_total < {-cut_eps}') logger.log(WL, # f'Portfolio.add_exa | p_total has a negative value starting at {first_neg.head()}; NOT setting to zero...') f'p_total has {len(first_neg)} negative values; NOT setting to zero...') sum_p_total = df.p_total.sum() logger.info(f'{self.name}: sum of p_total is 1 - {1 - sum_p_total:12.8e} NOT rescaling.') # df.p_total /= sum_p_total df['F'] = np.cumsum(df.p_total) # this method is terrible because you lose precision for very small values of S # df['S'] = 1 - df.F # old method # df['S'] = np.hstack((df.p_total.to_numpy()[:0:-1].cumsum()[::-1], # min(df.p_total.iloc[-1], # max(0, 1. - (df.p_total.sum()))))) # which was ugly and not quite right because the it ended ... pn plast vs pn+plast, plast # Dec 2020 # TODO: fix and rationalize with Aggregate.density_df; this can't be right, can it? # Fill value is just S(last point) #df['S'] = \ # df.p_total.shift(-1, fill_value=min(df.p_total.iloc[-1], max(0, 1. - (df.p_total.sum()))))[::-1].cumsum()[::-1] # Jan 2023 update: use the same method as Aggregate.density_df # And the end of all our exploring # Will be to arrive where we started # And know the place for the first time. # T.S. Eliot # The loss of accuracy is not germaine in portfolio where you have gone through # FFTs df['S'] = 1 - df.F # get rounding errors, S may not go below zero logger.info( f'Portfolio.add_exa | {self.name}: S <= 0 values has length {len(np.argwhere((df.S <= 0).to_numpy()))}') # E(min(X, a)) # needs to be shifted down by one for the partial integrals.... # temp = np.hstack((0, np.array(df.iloc[:-1, :].loc[:, 'S'].cumsum()))) # df['exa_total'] = temp * bs # df['exa_total'] = self.cumintegral(df['S']) df['exa_total'] = df.S.shift(1, fill_value=0).cumsum() * self.bs df['lev_total'] = df['exa_total'] # $E(X\wedge a)=\int_0^a tf(t)dt + aS(a)$ therefore exlea # (EXpected $X$ given Less than or Equal to **a**) # $$=E(X \mid X\le a)=\frac{E(X\wedge a)-aS(a)}{F(a)}$$ df['exlea_total'] = \ (df.exa_total - df.loss * df.S) / df.F # fix very small values # don't pretend you know values! # find the largest value where exlea_total > loss, which has to be an error # 100 bs is a hack, move a little beyond last problem observation # from observation looks about right with 1<<16 buckets # TODO What is this crap? n_ = df.shape[0] if n_ < 1100: mult = 1 elif n_ < 15000: mult = 10 else: mult = 100 loss_max = df[['loss', 'exlea_total']].query(' exlea_total>loss ').loss.max() if np.isnan(loss_max): loss_max = 0 else: loss_max += mult * bs # try nan in place of 0 V df.loc[0:loss_max, 'exlea_total'] = np.nan # df.loc[df.F < 2 * cut_eps, 'exlea_total'] = df.loc[ # df.F < 2*cut_eps, 'loss'] # if F(x)<very small then E(X | X<x) = x, you are certain to be above the threshold # this is more stable than dividing by the very small F(x) df['e_total'] = np.sum(df.p_total * df.loss) df['exgta_total'] = df.loss + (df.e_total - df.exa_total) / df.S df['exeqa_total'] = df.loss # E(X | X=a) = a(!) included for symmetry was exa # where is S=0 Seq0 = (df.S == 0) for col in self.line_names: # ### Additional Variables # # * exeqa_line = $E(X_i \mid X=a)$ # * exlea_line = $E(X_i \mid X\le a)$ # * e_line = $E(X_i)$ # * exgta_line = $E(X_i \mid X \ge a)$ # * exi_x_line = $E(X_i / X \mid X = a)$ # * and similar for le and gt a # * exa_line = $E(X_i(a))$ # * Price based on same constant ROE formula (later we will do $g$s) # # # THE MONEY CALCULATION # EX_i | X=a, E(xi eq a) # # # # if ft_nots is None: df['exeqa_' + col] = \ np.real(self.ift(self.ft(df.loss * df['p_' + col]) * self.ft(df['ημ_' + col]))) / df.p_total else: df['exeqa_' + col] = \ np.real(self.ift(self.ft(df.loss * df['p_' + col]) * ft_nots[col])) / df.p_total # these are unreliable estimates because p_total=0 # JUNE 25: this makes a difference! df.loc[df.p_total < cut_eps, 'exeqa_' + col] = 0 # E(X_{i, 2nd priority}(a)) # need the stand alone LEV calc # E(min(Xi, a) # needs to be shifted down by one for the partial integrals.... stemp = 1 - df['p_' + col].cumsum() # temp = np.hstack((0, stemp.iloc[:-1].cumsum())) # df['lev_' + col] = temp * bs df['lev_' + col] = stemp.shift(1, fill_value=0).cumsum() * self.bs # EX_i | X<= a; temp is used in le and gt calcs temp = np.cumsum(df['exeqa_' + col] * df.p_total) df['exlea_' + col] = temp / df.F # revised version for small losses: do not know this value df.loc[0:loss_max, 'exlea_' + col] = 0 # df.loc[0:loss_max, 'loss'] # constant value, helpful in calculations df['e_' + col] = np.sum(df['p_' + col] * df.loss) # EX_i | X>a df['exgta_' + col] = (df['e_' + col] - temp) / df.S # E{X_i / X | X > a} (note=a is trivial!) temp = df.loss.iloc[0] # loss df.loss.iloc[0] = 1 # avoid divide by zero # unconditional E(X_i/X) df['exi_x_' + col] = np.sum( df['exeqa_' + col] * df.p_total / df.loss) temp_xi_x = np.cumsum(df['exeqa_' + col] * df.p_total / df.loss) df['exi_xlea_' + col] = temp_xi_x / df.F df.loc[0, 'exi_xlea_' + col] = 0 # df.F=0 at zero # more generally F=0 error: V df.loc[df.exlea_total == 0, 'exi_xlea_' + col] = 0 # put value back df.loss.iloc[0] = temp # this is so important we will calculate it directly rather than the old: # df['exi_xgta_' + col] = (df['exi_x_' + col] - temp_xi_x) / df.S # the last value is undefined because we know nothing about what happens beyond our array # above that we need a shift: > second to last value will only involve the last row (the John Major problem) # hence # Nov 2020, consider added fill values using same approach as exi_xgtag_[CHANGE NOT MADE, investigate] # fill_value = df[f'exeqa_{col}'].iloc[-1] / df.loss.iloc[-1] * df.S.iloc[-1] fill_value = np.nan # logger.info(f'exi_xgta_ fill_value = {fill_value}') df['exi_xgta_' + col] = ((df[f'exeqa_{col}'] / df.loss * df.p_total).shift(-1, fill_value=fill_value)[ ::-1].cumsum()) / df.S # print(df['exi_xgta_' + col].tail()) # need this NOT to be nan otherwise exa won't come out correctly df.loc[Seq0, 'exi_xgta_' + col] = 0. # df['exi_xgta_ημ_' + col] = \ # (df['exi_x_ημ_' + col] - temp_xi_x_not) / df.S # as for line # fill_value = df[f'exeqa_ημ_{col}'].iloc[-1] / df.loss.iloc[-1] * df.S.iloc[-1] df['exi_xeqa_' + col] = df['exeqa_' + col] / df['loss'] df.loc[0, 'exi_xeqa_' + col] = 0 # need the loss cost with equal priority rule # exa_ = E(X_i(a)) = E(X_i | X<= a)F(a) + E(X_i / X| X>a) a S(a) # = exlea F(a) + exixgta * a * S(a) # and hence get loss cost for line i # df['exa_' + col] = \ # df['exlea_' + col] * df.F + df.loss * \ # df.S * df['exi_xgta_' + col] # df['exa_ημ_' + col] = \ # df['exlea_ημ_' + col] * df.F + df.loss * \ # df.S * df['exi_xgta_ημ_' + col] # alt calc using S: validated the same, going with this as a more direct calc df[f'exa_{col}'] = (df.S * df['exi_xgta_' + col]).shift(1, fill_value=0).cumsum() * self.bs # put in totals for the ratios... this is very handy in later use for metric in ['exi_xlea_', 'exi_xgta_', 'exi_xeqa_']: df[metric + 'sum'] = df.filter(regex=metric + '[^η]').sum(axis=1)
[docs] def ft(self, x, tilt=None): """ FT of x with padding and tilt applied """ return ft(x, self.padding, tilt)
[docs] def ift(self, x, tilt=None): """ IFT of x with padding and tilt applied """ return ift(x, self.padding, tilt)
[docs] def add_exa_details(self, df, eta_mu=False): """ From add_exa, details for epd functions and eta_mu flavors. Note ``eta_mu=True`` is required for ``epd_2`` functions. """ cut_eps = np.finfo(float).eps bs = self.bs # epds for total on a standalone basis (all that makes sense) df['epd_0_total'] = \ np.maximum(0, (df['e_total'] - df['lev_total'])) / \ df['e_total'] # E[1/X 1_{X>a}] used for reimbursement effectiveness graph # Nov 2020 tweaks index_inv = 1.0 / df.loss df['e1xi_1gta_total'] = (df['p_total'] * index_inv).shift(-1)[::-1].cumsum() # TODO What is this crap? n_ = df.shape[0] if n_ < 1100: mult = 1 elif n_ < 15000: mult = 10 else: mult = 100 loss_max = df[['loss', 'exlea_total']].query(' exlea_total>loss ').loss.max() if np.isnan(loss_max): loss_max = 0 else: loss_max += mult * bs # where is S=0 Seq0 = (df.S == 0) # will need two decorators for epd functions: these handle swapping the arguments and # protecting against value errors def minus_arg_wrapper(a_func): def new_fun(x): try: x = a_func(-x) except ValueError: x = 999 return x return new_fun def minus_ans_wrapper(a_func): def new_fun(x): try: x = -a_func(x) except ValueError: x = 999 return x return new_fun if eta_mu: # recrecate the ημ columns (NO padding!) ft_line_density = {} ft_all = ft(self.density_df.p_total, self.padding, None) for line in self.line_names: # create all for inner loop below ft_line_density[line] = ft(self.density_df[f'p_{line}'], self.padding, None) for line in self.line_names: ft_not = np.ones_like(ft_all) if np.any(ft_line_density[line] == 0): # have to build up for not_line in self.line_names: if not_line != line: ft_not *= ft_line_density[not_line] else: if len(self.line_names) > 1: ft_not = ft_all / ft_line_density[line] self.density_df[f'ημ_{line}'] = np.real(ift(ft_not, self.padding, None)) for col in self.line_names: # fill in ημ if eta_mu: # rarely if ever ussed; not greatly tested df['exeqa_ημ_' + col] = \ np.real(self.ift(self.ft(df.loss * df['ημ_' + col]) * self.ft(df['p_' + col]))) / df.p_total # these are unreliable estimates because p_total=0 JUNE 25: this makes a difference! df.loc[df.p_total < cut_eps, 'exeqa_ημ_' + col] = 0 df['e2pri_' + col] = \ np.real(self.ift(self.ft(df['lev_' + col]) * self.ft(df['ημ_' + col]))) stemp = 1 - df['ημ_' + col].cumsum() # temp = np.hstack((0, stemp.iloc[:-1].cumsum())) # df['lev_ημ_' + col] = temp * bs df['lev_ημ_' + col] = stemp.shift(1, fill_value=0).cumsum() * self.bs temp_not = np.cumsum(df['exeqa_ημ_' + col] * df.p_total) df['exlea_ημ_' + col] = temp_not / df.F # revised version for small losses: do not know this value df.loc[0:loss_max, 'exlea_ημ_' + col] = 0 # df.loc[0:loss_max, 'loss'] df['e_ημ_' + col] = np.sum(df['ημ_' + col] * df.loss) # not version df['exi_x_ημ_' + col] = np.sum( df['exeqa_ημ_' + col] * df.p_total / df.loss) # as above temp_xi_x_not = np.cumsum( df['exeqa_ημ_' + col] * df.p_total / df.loss) df['exi_xlea_ημ_' + col] = temp_xi_x_not / df.F df.loc[0, 'exi_xlea_ημ_' + col] = 0 # df.F=0 at zero # more generally F=0 error: df.loc[df.exlea_total == 0, 'exi_xlea_ημ_' + col] = 0 # per about line 2052 above fill_value = np.nan df['exi_xgta_ημ_' + col] = ((df[f'exeqa_ημ_{col}'] / df.loss * df.p_total).shift(-1, fill_value=fill_value)[ ::-1].cumsum()) / df.S df.loc[Seq0, 'exi_xgta_ημ_' + col] = 0. df['exi_xeqa_ημ_' + col] = df['exeqa_ημ_' + col] / df['loss'] df.loc[0, 'exi_xeqa_ημ_' + col] = 0 df['exa_ημ_' + col] = (df.S * df['exi_xgta_ημ_' + col]).shift(1, fill_value=0).cumsum() * self.bs # E[1/X 1_{X>a}] used for reimbursement effectiveness graph # Nov 2020 # df['e1xi_1gta_total'] = (df['p_total'] * index_inv).shift(-1)[::-1].cumsum() df[f'e1xi_1gta_{col}'] = (df[f'p_{col}'] * index_inv).shift(-1)[::-1].cumsum() # epds df['epd_0_' + col] = \ np.maximum(0, (df['e_' + col] - df['lev_' + col])) / \ df['e_' + col] df['epd_1_' + col] = \ np.maximum(0, (df['e_' + col] - df['exa_' + col])) / \ df['e_' + col] if eta_mu: df['epd_2_' + col] = \ np.maximum(0, (df['e_' + col] - df['e2pri_' + col])) / \ df['e_' + col] df['epd_0_ημ_' + col] = \ np.maximum(0, (df['e_ημ_' + col] - df['lev_ημ_' + col])) / \ df['e_ημ_' + col] df['epd_1_ημ_' + col] = \ np.maximum(0, (df['e_ημ_' + col] - df['exa_ημ_' + col])) / \ df['e_ημ_' + col] # epd interpolation functions # capital and epd functions: for i = 0 and 1 we want line and not line loss_values = df.loss.values # only have type 2 when eta_mu is True options = [0, 1, 2] if eta_mu else [0, 1] for i in options: epd_values = -df['epd_{:}_{:}'.format(i, col)].values # if np.any(epd_values[1:] <= epd_values[:-1]): # print(i, col) # print( 1e12*(epd_values[1:][epd_values[1:] <= epd_values[:-1]] - # epd_values[:-1][epd_values[1:] <= epd_values[:-1]])) # raise ValueError('Need to be sorted ascending') self._epd_2_assets[(col, i)] = minus_arg_wrapper( interpolate.interp1d(epd_values, loss_values, kind='linear', assume_sorted=True, fill_value='extrapolate')) self._assets_2_epd[(col, i)] = minus_ans_wrapper( interpolate.interp1d(loss_values, epd_values, kind='linear', assume_sorted=True, fill_value='extrapolate')) if eta_mu: for i in [0, 1]: epd_values = -df['epd_{:}_ημ_{:}'.format(i, col)].values self._epd_2_assets[('not ' + col, i)] = minus_arg_wrapper( interpolate.interp1d(epd_values, loss_values, kind='linear', assume_sorted=True, fill_value='extrapolate')) self._assets_2_epd[('not ' + col, i)] = minus_ans_wrapper( interpolate.interp1d(loss_values, epd_values, kind='linear', assume_sorted=True, fill_value='extrapolate')) epd_values = -df['epd_0_total'].values # if np.any(epd_values[1:] <= epd_values[:-1]): # print('total') # print(epd_values[1:][epd_values[1:] <= epd_values[:-1]]) # raise ValueError('Need to be sorted ascending') loss_values = df.loss.values self._epd_2_assets[('total', 0)] = minus_arg_wrapper( interpolate.interp1d(epd_values, loss_values, kind='linear', assume_sorted=True, fill_value='extrapolate')) self._assets_2_epd[('total', 0)] = minus_ans_wrapper( interpolate.interp1d(loss_values, epd_values, kind='linear', assume_sorted=True, fill_value='extrapolate'))
@property def epd_2_assets(self): """ Make epd to assets and vice versa Note that the Merton Perold method requies the eta_mu fields, hence set True """ if self._epd_2_assets is None: self._epd_2_assets = {} self._assets_2_epd = {} self.add_exa_details(self.density_df, eta_mu=True) return self._epd_2_assets @property def assets_2_epd(self): if self._assets_2_epd is None: self._epd_2_assets = {} self._assets_2_epd = {} self.add_exa_details(self.density_df, eta_mu=True) return self._assets_2_epd
[docs] def calibrate_distortion(self, name, r0=0.0, df=[0.0, .9], premium_target=0.0, roe=0.0, assets=0.0, p=0.0, kind='lower', S_column='S', S_calc='cumsum'): """ Find transform to hit a premium target given assets of ``assets``. Fills in the values in ``g_spec`` and returns params and diagnostics...so you can use it either way...more convenient :param name: name of distortion :param r0: fixed parameter if applicable :param df: t-distribution degrees of freedom :param premium_target: target premium :param roe: or ROE :param assets: asset level :param p: :param kind: :param S_column: column of density_df to use for calibration (allows routine to be used in other contexts; if so used must input a premium_target directly. If assets they are used; else max assets used :return: """ # figure assets assert S_calc in ('S', 'cumsum') if S_column == 'S': if assets == 0: assert (p > 0) assets = self.q(p, kind) # figure premium target el = self.density_df.loc[assets, 'exa_total'] if premium_target == 0: assert (roe > 0) # expected losses with assets premium_target = (el + roe * assets) / (1 + roe) else: # if assets not entered, calibrating to unlimited premium; set assets = max loss and let code trim it if assets == 0: assets = self.density_df.loss.iloc[-1] el = self.density_df.loc[assets, 'exa_total'] # extract S and trim it: we are doing int from zero to assets # integration including ENDpoint is if S_calc == 'S': Splus = self.density_df.loc[0:assets, S_column].values else: Splus = (1 - self.density_df.loc[0:assets, 'p_total'].cumsum()).values last_non_zero = np.argwhere(Splus) ess_sup = 0 if len(last_non_zero) == 0: # no nonzero element last_non_zero = len(Splus) + 1 else: last_non_zero = last_non_zero.max() # remember length = max index + 1 because zero based if last_non_zero + 1 < len(Splus): # now you have issues... # truncate at first zero; numpy indexing because values S = Splus[:last_non_zero + 1] ess_sup = self.density_df.index[last_non_zero + 1] logger.info( 'Portfolio.calibrate_distortion | Mass issues in calibrate_distortion...' f'{name} at {last_non_zero}, loss = {ess_sup}') else: # this calc sidesteps the Splus created above.... if S_calc == 'original': S = self.density_df.loc[0:assets - self.bs, S_column].values else: S = (1 - self.density_df.loc[0:assets - self.bs, 'p_total'].cumsum()).values # now all S values should be greater than zero and it is decreasing assert np.all(S > 0) and np.all(S[:-1] >= S[1:]) if name == 'ph': lS = np.log(S) shape = 0.95 # starting param def f(rho): trho = S ** rho ex = np.sum(trho) * self.bs ex_prime = np.sum(trho * lS) * self.bs return ex - premium_target, ex_prime elif name == 'wang': n = ss.norm() shape = 0.95 # starting param def f(lam): temp = n.ppf(S) + lam tlam = n.cdf(temp) ex = np.sum(tlam) * self.bs ex_prime = np.sum(n.pdf(temp)) * self.bs return ex - premium_target, ex_prime elif name == 'ly': # linear yield model; min rol is ro/(1+ro) shape = 1.25 # starting param mass = ess_sup * r0 / (1 + r0) def f(rk): num = r0 + S * (1 + rk) den = 1 + r0 + rk * S tlam = num / den ex = np.sum(tlam) * self.bs + mass ex_prime = np.sum(S * (den ** -1 - num / (den ** 2))) * self.bs return ex - premium_target, ex_prime elif name == 'clin': # capped linear, input rf as min rol shape = 1 mass = ess_sup * r0 def f(r): r0_rS = r0 + r * S ex = np.sum(np.minimum(1, r0_rS)) * self.bs + mass ex_prime = np.sum(np.where(r0_rS < 1, S, 0)) * self.bs return ex - premium_target, ex_prime elif name in ['roe', 'ccoc']: # constant roe # TODO Note if you input the roe this is easy! shape = 0.25 def f(r): v = 1 / (1 + r) d = 1 - v mass = ess_sup * d r0_rS = d + v * S ex = np.sum(np.minimum(1, r0_rS)) * self.bs + mass ex_prime = np.sum(np.where(r0_rS < 1, S, 0)) * self.bs return ex - premium_target, ex_prime elif name == 'lep': # layer equivalent pricing # params are d=r0/(1+r0) and delta* = r/(1+r) d = r0 / (1 + r0) shape = 0.25 # starting param # these hard to compute variables do not change with the parameters rSF = np.sqrt(S * (1 - S)) mass = ess_sup * d def f(r): spread = r / (1 + r) - d temp = d + (1 - d) * S + spread * rSF ex = np.sum(np.minimum(1, temp)) * self.bs + mass ex_prime = (1 + r) ** -2 * np.sum(np.where(temp < 1, rSF, 0)) * self.bs return ex - premium_target, ex_prime elif name == 'tt': # wang-t-t ... issue with df, will set equal to 5.5 per Shaun's paper # finding that is a reasonable level; user can input alternative # TODO bivariate solver for t degrees of freedom? # param is shape like normal t = ss.t(df) shape = 0.95 # starting param def f(lam): temp = t.ppf(S) + lam tlam = t.cdf(temp) ex = np.sum(tlam) * self.bs ex_prime = np.sum(t.pdf(temp)) * self.bs return ex - premium_target, ex_prime elif name == 'cll': # capped loglinear shape = 0.95 # starting parameter lS = np.log(S) lS[0] = 0 ea = np.exp(r0) def f(b): uncapped = ea * S ** b ex = np.sum(np.minimum(1, uncapped)) * self.bs ex_prime = np.sum(np.where(uncapped < 1, uncapped * lS, 0)) * self.bs return ex - premium_target, ex_prime elif name == 'dual': # dual moment # be careful about partial at s=1 shape = 2.0 # starting parameter lS = -np.log(1 - S) # prob a bunch of zeros... lS[S == 1] = 0 def f(rho): temp = (1 - S) ** rho trho = 1 - temp ex = np.sum(trho) * self.bs ex_prime = np.sum(temp * lS) * self.bs return ex - premium_target, ex_prime elif name == 'tvar': # tvar shape = 0.9 # starting parameter def f(rho): temp = np.where(S <= 1-rho, S / (1 - rho), 1) temp2 = np.where(S <= 1-rho, S / (1 - rho)**2, 1) ex = np.sum(temp) * self.bs ex_prime = np.sum(temp2) * self.bs return ex - premium_target, ex_prime elif name == 'wtdtvar': # weighted tvar with fixed p parameters shape = 0.5 # starting parameter p0, p1 = df s = np.array([0., 1-p1, 1-p0, 1.]) def f(w): pt = (1 - p1) / (1 - p0) * (1 - w) + w gs = np.array([0., pt, 1., 1.]) g = interp1d(s, gs, kind='linear') trho = g(S) ex = np.sum(trho) * self.bs ex_prime = (np.sum(np.minimum(S / (1 - p1), 1) - np.minimum(S / (1 - p0), 1))) * self.bs return ex - premium_target, ex_prime else: raise ValueError(f'calibrate_distortion not implemented for {name}') # numerical solve except for tvar, and roe when premium is known if name in ('roe', 'ccoc'): assert el and premium_target r = (premium_target - el) / (assets - premium_target) shape = r r0 = 0 fx = 0 else: i = 0 fx, fxp = f(shape) # print(premium_target) # print('dist iter error shape deriv') max_iter = 200 if name == 'tvar' else 50 while abs(fx) > 1e-5 and i < max_iter: # print(f'{name}\t{i: 3d}\t{fx: 8.3f}\t{shape:8.3f}\t{fxp:8.3f}') shape = shape - fx / fxp fx, fxp = f(shape) i += 1 # print(f'{name}\t{i: 3d}\t{fx+premium_target: 8.3f}\t{shape:8.3f}\t{fxp:8.3f}\n\n') if abs(fx) > 1e-5: logger.warning( f'Portfolio.calibrate_distortion | Questionable convergenge! {name}, target ' f'{premium_target} error {fx}, {i} iterations') # build answer dist = Distortion(name=name, shape=shape, r0=r0, df=df) dist.error = fx dist.assets = assets dist.premium_target = premium_target return dist
[docs] def calibrate_distortions(self, LRs=None, COCs=None, ROEs=None, As=None, Ps=None, kind='lower', r0=0.03, df=5.5, strict='ordered', S_calc='cumsum'): """ Calibrate assets a to loss ratios LRs and asset levels As (iterables) ro for LY, it :math:`ro/(1+ro)` corresponds to a minimum rate online :param LRs: LR or ROEs given :param ROEs: ROEs override LRs :param COCs: CoCs override LRs, preferred terms to ROE; ROE maintained for backwards compatibility. :param As: Assets or probs given :param Ps: probability levels for quantiles :param kind: :param r0: for distortions that have a min ROL :param df: for tt :param strict: if=='ordered' then use the book nice ordering else if True only use distortions with no mass at zero, otherwise use anything reasonable for pricing :param S_calc: :return: """ if COCs is not None: ROEs = COCs ans = pd.DataFrame( columns=['$a$', 'LR', '$S$', '$\\iota$', '$\\delta$', '$\\nu$', '$EL$', '$P$', 'Levg', '$K$', 'ROE', 'param', 'error', 'method'], dtype=float) ans = ans.set_index(['$a$', 'LR', 'method'], drop=True) dists = {} if As is None: if Ps is None: raise ValueError('Must specify assets or quantile probabilities') else: As = [self.q(p, kind) for p in Ps] for a in As: exa, S = self.density_df.loc[a, ['exa_total', 'S']] if ROEs is not None: # figure loss ratios LRs = [] for r in ROEs: delta = r / (1 + r) nu = 1 - delta prem = nu * exa + delta * a LRs.append(exa / prem) for lr in LRs: P = exa / lr profit = P - exa K = a - P iota = profit / K delta = iota / (1 + iota) nu = 1 - delta if strict == 'ordered': # d_list = ['roe', 'ph', 'wang', 'dual', 'tvar'] d_list = ['ccoc', 'ph', 'wang', 'dual', 'tvar'] else: d_list = Distortion.available_distortions(pricing=True, strict=strict) for dname in d_list: dist = self.calibrate_distortion(name=dname, r0=r0, df=df, premium_target=P, assets=a, S_calc=S_calc) dists[dname] = dist ans.loc[(a, lr, dname), :] = [S, iota, delta, nu, exa, P, P / K, K, profit / K, dist.shape, dist.error] # very helpful to keep these... self.dist_ans = ans self.dists = dists return ans
@property def distortion_df(self): """ Nicely formatted version of self.dist_ans (that exhibited several bad choices!). ROE returned as COC in modern parlance. """ if self.dist_ans is None: return None df = self.dist_ans.iloc[:, [0,4,5,6,7,8,9,10]].copy() df.index.names = ['a', 'LR', 'method'] df.columns = ['S', 'L', 'P', 'PQ', 'Q', 'COC', 'param', 'error'] return df
[docs] def apply_distortions(self, dist_dict, As=None, Ps=None, kind='lower', efficient=True): """ Apply a list of distortions, summarize pricing and produce graphical output show loss values where :math:`s_ub > S(loss) > s_lb` by jump :param kind: :param dist_dict: dictionary of Distortion objects :param As: input asset levels to consider OR :param Ps: input probs (near 1) converted to assets using ``self.q()`` :return: """ ans = [] if As is None: As = np.array([float(self.q(p, kind)) for p in Ps]) for g in dist_dict.values(): _x = self.apply_distortion(g, efficient=efficient) df = _x.augmented_df # extract range of S values if As[0] in df.index: temp = df.loc[As, :].filter(regex=r'^loss|^S|exa[g]?_[^η][\.:~a-zA-Z0-9]*$|exag_sumparts|lr_').copy() else: logger.warning(f'{As} not in index...max value is {max(df.index)}...selecing that!') temp = df.iloc[[-1], :].filter(regex=r'^loss|^S|exa[g]?_[^η][\.:~a-zA-Z0-9]*$|exag_sumparts|lr_').copy() # jump = sensible_jump(len(temp), num_assets) # temp = temp.loc[::jump, :].copy() temp['method'] = g.name ans.append(temp) ans_table = pd.concat(ans) ans_table['return'] = np.round(1 / ans_table.S, 0) df2 = ans_table.copy() df2 = df2.set_index(['loss', 'method', 'return', 'S']) df2.columns = df2.columns.str.split('_', expand=True) ans_stacked = pd.DataFrame(df2.stack().stack()).reset_index() ans_stacked.columns = ['assets', 'method', 'return', 'S', 'line', 'stat', 'value'] # figure reasonable max and mins for LR plots mn = ans_table.filter(regex='^lr').min().min() mn1 = mn mx = ans_table.filter(regex='^lr').max().max() mn = np.round(mn * 20, 0) / 20 mx = np.round(mx * 20, 0) / 20 if mx >= 0.9: mx = 1 if mn <= 0.2: mn = 0 if mn1 < mn: mn -= 0.1 return ans_table, ans_stacked
[docs] def apply_distortion(self, dist, view='ask', plots=None, df_in=None, create_augmented=True, S_calculation='forwards', efficient=True): """ Apply the distortion, make a copy of density_df and append various columns to create augmented_df. augmented_df depends on the distortion but only includes variables that work for all asset levels, e.g. 1. marginal loss, lr, roe etc. 2. bottom up totals Top down total depend on where the "top" is and do not work in general. They are handled in analyze_distortions where you explicitly provide a top. Does not touch density_df: that is independent of distortions Optionally produce graphics of results controlled by plots a list containing none or more of: 1. basic: exag_sumparts, exag_total df.exa_total 2. extended: the full original set Per 0.11.0: no mass at 0 allowed. If you want to use a distortion with mass at 0 you must use a close approximation. :type create_augmented: object :param dist: agg.Distortion :param view: bid or ask price :param plots: iterable of plot types :param df_in: when called from gradient you want to pass in gradient_df and use that; otherwise use self.density_df :param create_augmented: store the output in self.augmented_df :param S_calculation: if forwards, recompute S summing p_total forwards...this gets the tail right; the old method was backwards, which does not change S :param efficient: just compute the bare minimum (T. series, not M. series) and return :return: density_df with extra columns appended """ if not efficient: # make sure eta-mu columns have been computed _ = self.assets_2_epd # initially work will "full precision" if df_in is None: df = self.density_df.copy() else: df = df_in # PREVIOUSLY: did not make this adjustment because loss of resolution on small S values # however, it doesn't work well for very thick tailed distributions, hence intro of S_calculation # July 2020 (during COVID-Trump madness) try this instead if S_calculation == 'forwards': logger.debug('Using updated S_forwards calculation in apply_distortion! ') df['S'] = 1 - df.p_total.cumsum() if type(dist) == str: # try looking it up in calibrated distortions dist = self.dists[dist] # make g and ginv and other interpolation functions if view == 'bid': g = dist.g_dual g_prime = lambda x: dist.g_prime(1 - x) elif view == 'ask': g = dist.g g_prime = dist.g_prime else: raise ValueError(f'kind must be bid or ask, not {view}') # maybe important that p_total sums to 1 # this appeared not to make a difference, and introduces an undesirable difference from # the original density_df # df.loc[df.p_total < 0, :] = 0 # df['p_total'] = df['p_total'] / df['p_total'].sum() # df['F'] = df.p_total.cumsum() # df['S'] = 1 - df.F # floating point issues: THIS HAPPENS, so needs to be corrected... cut_eps = np.finfo(float).eps if len(df.loc[df.S < 0, 'S'] > 0): logger.log(WL, f"{len(df.loc[df.S < -cut_eps, 'S'] > 0)} negative S < -eps values being set to zero...") # logger.log(WL, f"{len(df.loc[df.S < 0, 'S'] > 0)} negative S values being set to zero...") df.loc[df.S < 0, 'S'] = 0 # add the exag and distorted probs df['gS'] = g(df.S) df['gF'] = 1 - df.gS # updated for ability to prepend 0 in newer numpy # df['gp_total'] = np.diff(np.hstack((0, df.gF))) # also checked these two method give the same result: # bit2['gp1'] = -np.diff(bit2.gS, prepend=1) # bit2['gp2'] = np.diff(1 - bit2.gS, prepend=0) # validated using grt.test_df(): this is correct # t = grt.test_df(20,2) # t.A = t.A / t.A.sum() # # t['B'] = t.A.shift(-1, fill_value=0)[::-1].cumsum() # t['C'] = -np.diff(t.B, prepend=1) # t df['gp_total'] = -np.diff(df.gS, prepend=1) # weirdness occurs here were 0 appears as -0: get rid of that df.loc[df.gp_total==0, 'gp_total'] = 0.0 # figure out where to truncate df (which turns into augmented_df) lnp = '|'.join(self.line_names) # in discrete distributions there are "gaps" of impossible values; do not want to worry about these idx_pne0 = df.query(' p_total > 0 ').index exeqa_err = np.abs( (df.loc[idx_pne0].filter(regex=f'exeqa_({lnp})').sum(axis=1) - df.loc[idx_pne0].loss) / df.loc[idx_pne0].loss) exeqa_err.iloc[0] = 0 # print(exeqa_err) # +1 because when you iloc you lose the last element (two code lines down) idx = int(exeqa_err[exeqa_err < 1e-4].index[-1] / self.bs + 1) # idx = np.argmax(exeqa_err > 1e-4) logger.debug(f'index of max reliable value = {idx}') if idx: # if exeqa_err > 1e-4 is empty, np.argmax returns zero...do not want to truncate at zero in that case df = df.iloc[:idx] # where gS==0, which should be empty set gSeq0 = (df.gS == 0) logger.debug(f'S==0 values: {df.gS.loc[gSeq0]}') if idx: logger.debug(f'Truncating augmented_df at idx={idx}, loss={idx*self.bs}, len(S==0) = {np.sum(gSeq0)} elements') # print(f'Truncating augmented_df at idx={idx}, loss={idx*self.bs}\nS==0 on len(S==0) = {np.sum(gSeq0)} elements') else: logger.debug(f'augmented_df not truncated (no exeqa error), len(S==0) = {np.sum(gSeq0)} elements') # print(f'augmented_df not truncated (no exeqa error\nS==0 on len(S==0) = {np.sum(gSeq0)} elements') # S better be decreasing if not np.alltrue(df.S.iloc[1:] <= df.S.iloc[:-1].values): logger.error('S = density_df.S is not non-increasing...carrying on but you should investigate...') for line in self.line_names: # avoid double count: going up sum needs to be stepped one back, hence use cumintegral is perfect # for <=a cumintegral, for > a reverse and use cumsum (no step back) # UC = unconditional # old # # exleaUC = self.cumintegral(self.density_df[f'exeqa_{line}'] * df.gp_total, 1) # # correct that increasing integrals need the offset # exleaUC1 = np.cumsum(self.density_df[f'exeqa_{line}'] * df.gp_total) # # old # exixgtaUC = np.cumsum( # self.density_df.loc[::-1, f'exeqa_{line}'] / self.density_df.loc[::-1, 'loss'] * # df.loc[::-1, 'gp_total']) # # or shift...NO should be cumsum for gt # exixgtaUC1 = self.cumintegral( # self.density_df.loc[::-1, f'exeqa_{line}'] / self.density_df.loc[::-1, 'loss'] * # df.loc[::-1, 'gp_total'], 1)[::-1] # # when computed using np.cumsum exixgtaUC is a pd.Series has an index so when it is mult by .loss # (which also has an index) it gets re-sorted into ascending order # when computed using cumintegral it is a numpy array with no index and so need reversing # the difference between UC and UC1 is a shift up by 1. # # Here is a little tester example to show what goes on # # test = pd.DataFrame(dict(x=range(20))) # test['a'] = 10 * test.x # test['y'] = test.x * 3 + 5 # bit = np.cumsum(test['y'][::-1]) # test['z'] = bit # test['w'] = bit / test.a # test # # # again, exi_xgtag is super important, so we will compute it bare bones the same way as exi_xgta # df[f'exi_xgtag_{line}'] = exixgtaUC / df.gS # # # df['exi_xgtag_' + line] = ((df[f'exeqa_{line}'] / df.loss * # df.gp_total).shift(-1)[::-1].cumsum()) / df.gp_total.shift(-1)[::-1].cumsum() # exa uses S in the denominator...and actually checking values there is a difference between the sum and gS # # mass hints removed # original # df['exi_xgtag_' + line] = ((df[f'exeqa_{line}'] / df.loss * # df.gp_total).shift(-1)[::-1].cumsum()) / df.gS # df['exi_xgtag_ημ_' + line] = ((df[f'exeqa_ημ_{line}'] / df.loss * # df.gp_total).shift(-1)[::-1].cumsum()) / df.gS # Nov 2020 # shift[-1] because result is for > a, so the first entry sums from bucket 1... # need the denominator to equal the numerator sum of p values # the shift up needs to be filled in with the last S value (for the tail) otherwise that probability # is lost...hence we need to set fill_values: last_gS = df.gS.iloc[-1] last_x = df[f'exeqa_{line}'].iloc[-1] / df.loss.iloc[-1] * last_gS logger.debug(f'Tail adjustment for {line}: {last_x:.6g}') df['exi_xgtag_' + line] = \ ((df[f'exeqa_{line}'] / df.loss * df.gp_total). shift(-1, fill_value=last_x)[::-1].cumsum()) / df.gS # need these to be zero so nan's do not propagate df.loc[gSeq0, 'exi_xgtag_' + line] = 0. if not efficient: last_x = df[f'exeqa_ημ_{line}'].iloc[-1] / df.loss.iloc[-1] * last_gS df['exi_xgtag_ημ_' + line] = \ ((df[f'exeqa_ημ_{line}'] / df.loss * df.gp_total). shift(-1, fill_value=last_x)[::-1].cumsum()) / df.gS df.loc[gSeq0, 'exi_xgtag_ημ_' + line] = 0. # # # following the Audit Vignette this is the way to go: # in fact, need to shift both down because cumint (prev just gS, but int beta g...integrands on same # basis # df[f'exag_{line}'] = (df[f'exi_xgtag_{line}'].shift(1) * df.gS.shift(1)).cumsum() * self.bs # df[f'exag_ημ_{line}'] = (df[f'exi_xgtag_ημ_{line}'].shift(1) * df.gS.shift(1)).cumsum() * self.bs # np.allclose( # (df[f'exi_xgtag_{line}'].shift(1, fill_value=0) * df.gS.shift(1, fill_value=0)).cumsum() * self.bs, # (df[f'exi_xgtag_{line}'] * df.gS).shift(1, fill_value=0).cumsum() * self.bs) # Doh df[f'exag_{line}'] = (df[f'exi_xgtag_{line}'] * df.gS).shift(1, fill_value=0).cumsum() * self.bs if not efficient: df[f'exag_ημ_{line}'] = (df[f'exi_xgtag_ημ_{line}'] * df.gS).shift(1, fill_value=0).cumsum() * self.bs # maybe sometime you want this unchecked item? # df[f'exleag_1{line}'] = np.cumsum( df[f'exeqa_{line}'] * df.p_total ) # it makes a difference NOT to divivde by df.gS but to compute the actual weights you are using (you mess # around with the last weight) # # # # these are all here for debugging...see # C:\S\TELOS\spectral_risk_measures_monograph\spreadsheets\[AS_IJW_example.xlsx] # df[f'exag1_{line}'] = exleaUC + exixgtaUC1 * self.density_df.loss + mass # df[f'exi_xgtag1_{line}'] = exixgtaUC1 / df.gS # df[f'exleaUC_{line}'] = exleaUC # df[f'exleaUCcs_{line}'] = exleaUCcs # df[f'U_{line}'] = exixgtaUC # df[f'U1_{line}'] = exixgtaUC1 # df[f'RAW_{line}'] = self.density_df.loc[::-1, f'exeqa_{line}'] / self.density_df.loc[::-1, 'loss'] * \ # df.loc[::-1, 'gp_total'] if efficient: # need to get to T.M and T.Q for pricing... laser in on those... # duplicated and edited code from below df['exag_total'] = df.gS.shift(1, fill_value=0).cumsum() * self.bs df['M.M_total'] = (df.gS - df.S) df['M.Q_total'] = (1 - df.gS) # hummmm.aliases, but...? # df['M.L_total'] = df['S'] # df['M.P_total'] = df['gS'] # df['T.L_total'] = df['exa_total'] # df['T.P_total'] = df['exag_total'] # critical insight is the layer ROEs are the same for all lines by law invariance # lhopital's rule estimate of g'(1) = ROE(1) # this could blow up... ϵ = 1e-4 gprime1 = g_prime(1) # (g(1 - ϵ) - (1 - ϵ)) / (1 - g(1 - ϵ)) df['M.ROE_total'] = np.where(df['M.Q_total']!=0, df['M.M_total'] / df['M.Q_total'], gprime1) # where is the ROE zero? need to handle separately else Q will blow up roe_zero = (df['M.ROE_total'] == 0.0) # print(f"g'(0)={gprime1:.5f}\nroe zero vector {roe_zero}") for line in self.line_names_ex: df[f'T.M_{line}'] = df[f'exag_{line}'] - df[f'exa_{line}'] mm_l = df[f'T.M_{line}'].diff().shift(-1) / self.bs # careful about where ROE==0 mq_l = mm_l / df['M.ROE_total'] mq_l.iloc[-1] = 0 mq_l.loc[roe_zero] = np.nan df[f'T.Q_{line}'] = mq_l.shift(1).cumsum() * self.bs df.loc[0, f'T.Q_{line}'] = 0 if create_augmented: self.augmented_df = df return Answer(augmented_df=df) # sum of parts: careful not to include the total twice! # not used # df['exag_sumparts'] = df.filter(regex='^exag_[^η]').sum(axis=1) # LEV under distortion g # originally # df['exag_total'] = self.cumintegral(df['gS']) # revised cumintegral does: v.shift(1, fill_value=0).cumsum() * bs df['exag_total'] = df.gS.shift(1, fill_value=0).cumsum() * self.bs # df.loc[0, 'exag_total'] = 0 # comparison of total and sum of parts # Dec 2019 added info to compute the total margin and capital allocation by layer # [MT].[L LR Q P M]_line: marginal or total (ground-up) loss, loss ratio etc. # do NOT divide MARGINAL versions by bs because they are per 1 wide layer # df['lookslikemmtotalxx'] = (df.gS - df.S) df['M.M_total'] = (df.gS - df.S) df['M.Q_total'] = (1 - df.gS) # hummmm.aliases, but...? df['M.L_total'] = df['S'] df['M.P_total'] = df['gS'] # df['T.L_total'] = df['exa_total'] # df['T.P_total'] = df['exag_total'] # critical insight is the layer ROEs are the same for all lines by law invariance # lhopital's rule estimate of g'(1) = ROE(1) gprime1 = g_prime(1) # (g(1 - ϵ) - (1 - ϵ)) / (1 - g(1 - ϵ)) df['M.ROE_total'] = np.where(df['M.Q_total']!=0, df['M.M_total'] / df['M.Q_total'], gprime1) # where is the ROE zero? need to handle separately else Q will blow up roe_zero = (df['M.ROE_total'] == 0.0) # print(f"g'(0)={gprime1:.5f}\nroe zero vector {roe_zero}") for line in self.line_names_ex: # these are not used # df[f'exa_{line}_pcttotal'] = df['exa_' + line] / df.exa_total # df[f'exag_{line}_pcttotal'] = df['exag_' + line] / df.exag_total # hummm more aliases df[f'T.L_{line}'] = df[f'exa_{line}'] df[f'T.P_{line}'] = df[f'exag_{line}'] df.loc[0, f'T.P_{line}'] = 0 # TOTALs = ground up cumulative sums # exag is the layer (marginal) premium and exa is the layer (marginal) loss df[f'T.LR_{line}'] = df[f'exa_{line}'] / df[f'exag_{line}'] df[f'T.M_{line}'] = df[f'exag_{line}'] - df[f'exa_{line}'] df.loc[0, f'T.M_{line}'] = 0 # MARGINALs # MM should be per unit width layer so need to divide by bs # prepend=0 satisfies: if # d['B'] = d.A.cumsum() # d['C'] = np.diff(d.B, prepend=0) # then d.C == d.A, which is what you want. # note this overwrites M.M_total set above # T.M starts at zero, by previous line and sense: no assets ==> no prem or loss # old # df[f'M.M_{line}'] = np.diff(df[f'T.M_{line}'], prepend=0) / self.bs # new: df[f'M.M_{line}'] = df[f'T.M_{line}'].diff().shift(-1) / self.bs # careful about where ROE==0 df[f'M.Q_{line}'] = df[f'M.M_{line}'] / df['M.ROE_total'] df[f'M.Q_{line}'].iloc[-1] = 0 df.loc[roe_zero, f'M.Q_{line}'] = np.nan # WHAT IS THE LAYER AT ZERO? Should it have a price? What is that price? # TL and TP at zero are both 1 if line != 'total': df[f'M.L_{line}'] = df[f'exi_xgta_{line}'] * df['S'] df[f'M.P_{line}'] = df[f'exi_xgtag_{line}'] * df['gS'] df[f'M.LR_{line}'] = df[f'M.L_{line}'] / df[f'M.P_{line}'] # for total need to reflect layer width... # Jan 2020 added shift down df[f'T.Q_{line}'] = df[f'M.Q_{line}'].shift(1).cumsum() * self.bs df.loc[0, f'T.Q_{line}'] = 0 df[f'T.ROE_{line}'] = df[f'T.M_{line}'] / df[f'T.Q_{line}'] # leverage df[f'T.PQ_{line}'] = df[f'T.P_{line}'] / df[f'T.Q_{line}'] df[f'M.PQ_{line}'] = df[f'M.P_{line}'] / df[f'M.Q_{line}'] # in order to undo some numerical issues, things will slightly not add up # but that appears to be a numerical issue around the calc of exag df['T.L_total'] = df['exa_total'] df['T.P_total'] = df['exag_total'] df['T.Q_total'] = df.loss - df['exag_total'] df['T.M_total'] = df['exag_total'] - df['exa_total'] df['T.PQ_total'] = df['T.P_total'] / df['T.Q_total'] df['T.LR_total'] = df['T.L_total'] / df['T.P_total'] df['T.ROE_total'] = df['T.M_total'] / df['T.Q_total'] f_distortion = f_byline = f_bylineg = f_exas = None if plots == 'all': plots = ['basic', 'extended'] if plots: if 'basic' in plots: f_distortion, ax = plt.subplots(1, 1, figsize=(4, 4)) ax.plot(df.filter(regex='^exag_[^η]').sum(axis=1), label='Sum of Parts') ax.plot(df.exag_total, label='Total') ax.plot(df.exa_total, label='Loss') ax.legend() ax.set_title(f'Mass audit for {dist.name}') ax.legend(loc="upper right") ax.grid() if 'extended' in plots: # yet more graphics, but the previous one is the main event # truncate for graphics max_x = 1.1 * self.q(1 - 1e-6) df_plot = df.loc[0:max_x, :] f_exas, axs, axiter = AxisManager.make_figure(12, sharex=True) ax = next(axiter) df_plot.filter(regex='^p_').sort_index(axis=1).plot(ax=ax) ax.set_ylim(0, df_plot.filter(regex='p_[^η]').iloc[1:, :].max().max()) ax.set_title("Densities") ax.legend(loc="upper right") ax.grid() ax = next(axiter) df_plot.loc[:, ['p_total', 'gp_total']].plot(ax=ax) ax.set_title("Total Density and Distortion") ax.legend(loc="upper right") ax.grid() ax = next(axiter) df_plot.loc[:, ['S', 'gS']].plot(ax=ax) ax.set_title("S, gS") ax.legend(loc="upper right") ax.grid() # exi_xlea removed for prefix in ['exa', 'exag', 'exeqa', 'exgta', 'exi_xeqa', 'exi_xgta']: # look ahead operator: does not match n just as the next char, vs [^n] matches everything except n ax = next(axiter) df_plot.filter(regex=f'^{prefix}_(?!ημ)[a-zA-Z0-9]+$').sort_index(axis=1).plot(ax=ax) ax.set_title(f'{prefix} by line') ax.legend(loc="upper left") ax.grid() if prefix.find('xi_x') > 0: # fix scale for proportions ax.set_ylim(-0.025, 1.025) ax = next(axiter) # _ = df_plot[[f'exa_{i}' for i in self.line_names]] / df.exa_total # _.sort_index(axis=1).plot(ax=ax) ax.set_title('Proportion of loss: T.L_line / T.L_total') ax.set_ylim(0, 1.05) ax.legend(loc='upper left') ax.grid() ax = next(axiter) # _ = df_plot[[f'exag_{i}' for i in self.line_names]] / df.exag_total # _.sort_index(axis=1).plot(ax=ax) ax.set_title('Proportion of premium: T.P_line / T.P_total') # ax.set_ylim(0, 1.05) ax.legend(loc='upper left') ax.grid() ax = next(axiter) df_plot.filter(regex='^M.LR_').sort_index(axis=1).plot(ax=ax) ax.set_title('LR with the Natural (constant layer ROE) Allocation') ax.legend(loc='lower right') ax.grid() # by line plots nl = len(self.line_names_ex) f_byline, axs, axiter = AxisManager.make_figure(nl) for line in self.line_names: ax = next(axiter) df_plot.filter(regex=f'ex(le|eq|gt)a_{line}').sort_index(axis=1).plot(ax=ax) ax.set_title(f'{line} EXs') # ? ax.set(ylim=[0, self.q(0.999, 'lower')]) ax.legend(loc='upper left') ax.grid() AxisManager.tidy_up(f_byline, axiter) # compare exa with exag for all lines f_bylineg, axs, axiter = AxisManager.make_figure(nl) for line in self.line_names_ex: ax = next(axiter) df_plot.filter(regex=f'exa[g]?_{line}$').sort_index(axis=1).plot(ax=ax) ax.set_title(f'{line} T.L and T.P') ax.legend(loc='lower right') ax.grid() AxisManager.tidy_up(f_bylineg, axiter) if create_augmented: self.augmented_df = df # store associated distortion self._distortion = dist return Answer(augmented_df=df, f_distortion=f_distortion, f_byline=f_byline, f_bylineg=f_bylineg, f_exas=f_exas)
[docs] def var_dict(self, p, kind='lower', total='total', snap=False): """ make a dictionary of value at risks for each line and the whole portfolio. Returns: {line : var(p, kind)} and includes the total as self.name line if p near 1 and epd uses 1-p. Example: for p, arg in zip([.996, .996, .996, .985, .01], ['var', 'lower', 'upper', 'tvar', 'epd']): print(port.var_dict(p, arg, snap=True)) :param p: :param kind: var (defaults to lower), upper, lower, tvar, epd :param total: name for total: total=='name' gives total name self.name :param snap: snap tvars to index :return: """ if kind=='var': kind = 'lower' if kind=='tvar': d = {a.name: a.tvar(p) for a in self.agg_list} d['total'] = self.tvar(p) elif kind=='epd': if p >= 0.7: p = 1 - p logger.debug(f'Illogical value of p={1-p:.5g} passed for epd; using {p:.5g}') d = {ln: float(self.epd_2_assets[(ln, 0)](p)) for ln in self.line_names_ex } else: d = {a.name: a.q(p, kind) for a in self.agg_list} d['total'] = self.q(p, kind) if total != 'total': d[self.name] = d['total'] del d['total'] if snap and kind in ['tvar', 'epd']: d = {k: self.snap(v) for k, v in d.items()} return d
[docs] def gamma(self, a=None, p=None, kind='lower', compute_stand_alone=False, axs=None, plot_mode='return'): """ Return the vector gamma_a(x), the conditional layer effectiveness given assets a. Assets specified by percentile level and type (you need a in the index) gamma can be created with no base and no calibration - it does not depend on a distortion. It only depends on total losses. Returns the total and by layer versions, see "Main Result for Conditional Layer Effectiveness; Piano Diagram" in OneNote In total gamma_a(x) = E[ (a ^ X) / X | X > x] is the average rate of reimbursement for losses above x given capital a. It satisfies int_0^\infty gamma_a(x) S(x) dx = E[a ^ X]. Notice the integral is to infinity, regardless of the amount of capital a. By line gamma_{a,i}(x) = E[ E[X_i | X] / X {(X ^ a) / X} 1_{X>x} ] / E[ {E[X_i | X] / X} 1_{X>x} ]. The denominator equals total weights. It is the line-i recovery weighted layer effectiveness. It equals alpha_i(x) S(x). Now we have E[X_i(a)] = int_0^infty gamma_{a,i}(x) alpha_i(x) S(x) dx Note that you need upper and lower q's in aggs now too. Nov 2020: added arguments for plots; revised axes, separate plots by line :param a: input a or p and kind as usual :param p: asset level percentile :param kind: lower or upper :param compute_stand_alone: add stand-alone evaluation of gamma :param axs: enough axes; only plot if not None :param plot_mode: return or linear scale for y axis :return: """ if a is None: assert p is not None a = self.q(p, kind) else: p = self.cdf(a) ql = self.q(p, 'lower') qu = self.q(p, 'upper') logger.log(WL, f'Input a={a} to gamma; computed p={p:.8g}, lower and upper quantiles are {ql:.8g} and {qu:.8g}') # alter in place or return a copy? For now return a copy... temp = self.density_df.filter(regex='^(p_|e1xi_1gta_|exi_xgta_|exi_xeqa_|' 'exeqa_)[a-zA-Z]|^S$|^loss$').copy() # var dictionaries var_dict = self.var_dict(p, kind, 'total') extreme_var_dict = self.var_dict(1 - (1 - p) / 100, kind, 'total') # rate of payment factor min_xa = np.minimum(temp.loss, a) / temp.loss temp['min_xa_x'] = min_xa # total is a special case ln = 'total' gam_name = f'gamma_{self.name}_{ln}' # unconditional version avoid divide and multiply by a small number # exeqa is closest to the raw output... temp[f'exi_x1gta_{ln}'] = (temp[f'loss'] * temp.p_total / temp.loss).shift(-1)[::-1].cumsum() * self.bs # equals this temp.S?! s_ = temp.p_total.shift(-1)[::-1].cumsum() * self.bs # print(s_[:-1:-1], temp[f'exi_x1gta_{ln}'].iloc[:-2], temp.S.iloc[:-1] * self.bs) t1, t2 = np.allclose(s_[:-1:-1], temp[f'exi_x1gta_{ln}'].iloc[-1]), \ np.allclose(temp[f'exi_x1gta_{ln}'].iloc[:-1], temp.S.iloc[:-1] * self.bs) logger.debug(f'TEMP: the following should be close: {t1}, {t2} (expect True/True)') # this V 1.0 is exi_x for total temp[gam_name] = (min_xa * 1.0 * temp.p_total).shift(-1)[::-1].cumsum() / \ (temp[f'exi_x1gta_{ln}']) * self.bs for ln in self.line_names: # sa = stand alone; need to figure the sa capital from the agg objects hence var_dict if axs is not None or compute_stand_alone: a_l = var_dict[ln] a_l_ = a_l - self.bs xinv = temp[f'e1xi_1gta_{ln}'] gam_name = f'gamma_{ln}_sa' s = 1 - temp[f'p_{ln}'].cumsum() temp[f'S_{ln}'] = s temp[gam_name] = 0 temp.loc[0:a_l_, gam_name] = (s[0:a_l_] - s[a_l] + a_l * xinv[a_l]) / s[0:a_l_] temp.loc[a_l:, gam_name] = a_l * xinv[a_l:] / s[a_l:] temp[gam_name] = temp[gam_name].shift(1) # method unreliable in extreme right tail, but now changed plotting, not an issue # temp.loc[extreme_var_dict[ln]:, gam_name] = np.nan # actual computation for l within portfolio allowing for correlations gam_name = f'gamma_{self.name}_{ln}' # unconditional version avoid divide and multiply by a small number # exeqa is closest to the raw output... temp[f'exi_x1gta_{ln}'] = (temp[f'exeqa_{ln}'] * temp.p_total / temp.loss).shift(-1)[::-1].cumsum() * self.bs temp[gam_name] = (min_xa * temp[f'exi_xeqa_{ln}'] * temp.p_total).shift(-1)[::-1].cumsum() / \ (temp[f'exi_x1gta_{ln}']) * self.bs if axs is not None: axi = iter(axs.flat) nr, nc = axs.shape v = self.var_dict(.996, 'lower', 'total') ef = EngFormatter(3, True) if plot_mode == 'return': def transformer(x): # transform on y scale return np.where(x==0, np.nan, 1.0 / (1 - x)) yscale = 'log' else: def transformer(x): # transform on y scale return x yscale = 'linear' # 1/s is re-used s1 = 1 / temp.S for i, ln in enumerate(self.line_names_ex): r, c = i // nc, i % nc ax = next(axi) if ln != 'total': # line 1/s ls1 = 1/temp[f'S_{ln}'] ax.plot(ls1, transformer(temp[f'gamma_{ln}_sa']), c='C1', label=f'SA {ln}') ax.plot(s1, transformer(temp[f'gamma_{self.name}_total']), lw=1, c='C7', label='total') c = 'C1' else: ls1 = s1 c = 'C0' ax.plot(s1, transformer(temp[f'gamma_{self.name}_{ln}']), c='C0', label=f'Pooled {ln}') temp['WORST'] = np.maximum(0, 1 - (1 - p) * ls1) temp['BEST'] = 1 temp.loc[v[ln]:, 'BEST'] = v[ln] / temp.loc[v[ln]:, 'loss'] ax.fill_between(ls1, transformer(temp.WORST), transformer(temp.BEST), lw=.5, ls='--', alpha=.1, color=c, label="Possible range") ax.plot(ls1, transformer(temp.BEST), lw=.5, ls='--', alpha=1, c=c) ax.plot(ls1, transformer(temp.WORST), lw=.5, ls=':', alpha=1, c=c) ax.set(xlim=[1, 1e9], xscale='log', yscale=yscale, xlabel='Loss return period' if r==nr-1 else None, ylabel='Coverage Effectiveness (RP)' if c==0 else None, title=f'{ln}, a={ef(v[ln]).strip()}') ax.axvline(1/(1-.996), ls='--', c='C7', lw=.5, label='Capital p') ll = ticker.LogLocator(10, numticks=10) ax.xaxis.set_major_locator(ll) lm = ticker.LogLocator(base=10.0, subs=np.arange(1.0, 10.0) * 0.1, numticks = 10) ax.xaxis.set_minor_locator(lm) ax.grid(lw=.25) ax.legend(loc='upper right') # tidy up axes try: while 1: ax.figure.delaxes(next(axi)) except StopIteration: pass temp.drop(columns=['BEST', 'WORST']) return Answer(gamma_df=temp.sort_index(axis=1), base=self.name, assets=a, p=p, kind=kind)
[docs] def price(self, p, distortion=None, *, allocation='lifted', view='ask', efficient=True): """ Price using regulatory capital and pricing distortion functions. Compute E_price (X wedge E_reg(X) ) where E_price uses the pricing distortion and E_reg uses the regulatory distortion derived from p. p can be input as a probability level converted to assets using `kind`, a level of assets directly (snapped to index). Regulatory capital distortion is applied on unlimited basis. Do not attempt to use with a weight_df dataframe from Bounds. For that, use the bounds object logic directly which is much more efficient. The argument ``kind`` has been dropped, it is always ``'var'``. If that is not the case, convert your asset level to a VaR threshold. Updated: May 2023 Turns out, really awkward to return the dictionary. In most calls there is just one distortion passed in. The result of the last distortion are reported in ans.price, and there is a new price_dict for the price of each distortion. T :param p: float; if >1 assets if <1 a prob converted to quantile :param distortion: a distortion, list or dictionary (name: dist) of distortions. If None then ``self.dists`` dictionary is used. :param allocation: 'lifted' (default for legacy reasons) or 'linear': treatment in default scenarios. See PIR. :param view: bid or ask :param efficient: for apply_distortion, lifted only :return: PricingResult namedtuple with 'price', 'assets', 'reg_p', 'distortion', 'df' """ # warnings.warn('In 0.13.0 the default allocation will become linear not lifted.', DeprecationWarning) assert allocation in ('lifted', 'linear'), "allocation must be 'lifted' or 'linear'" PricingResult = namedtuple('PricingResult', ['df', 'price', 'price_dict', 'a_reg', 'reg_p']) if isinstance(distortion, Distortion): distortion = {str(distortion): distortion} elif isinstance(distortion, list): distortion = {str(d): d for d in distortion} elif distortion is None: assert self.dists is not None, 'Must pass a distortion or calibrate distortions prior to calling' distortion = self.dists # figure regulatory assets; applied to unlimited losses if p > 1: a_reg = self.snap(p) reg_p = self.cdf(a_reg) else: a_reg = self.q(p) reg_p = p if allocation == 'lifted': dfs = {} price = {} last_price = 0 for k, v in distortion.items(): # this code is unchanged from original logger.info(f'Executing for {k}, lifted') ans_ad = self.apply_distortion(v, view=view, create_augmented=False, efficient=efficient) if a_reg in ans_ad.augmented_df.index: aug_row = ans_ad.augmented_df.loc[a_reg] else: logger.warning('Regulatory assets not in augmented_df. Using last.') aug_row = ans_ad.augmented_df.iloc[-1] # holder for the answer df = pd.DataFrame(columns=['line', 'L', 'P', 'M', 'Q'], dtype=float) df.columns.name = 'statistic' df = df.set_index('line', drop=True) for line in self.line_names_ex: df.loc[line, :] = [aug_row[f'exa_{line}'], aug_row[f'exag_{line}'], aug_row[f'T.M_{line}'], aug_row[f'T.Q_{line}']] df['a'] = df.P + df.Q df['LR'] = df.L / df.P df['PQ'] = df.P / df.Q df['COC'] = df.M / df.Q price[k] = last_price = df.loc['total', 'P'] dfs[k] = df.sort_index() df = pd.concat(dfs.values(), keys=dfs.keys(), names=['distortion', 'unit']) ans = PricingResult(df, last_price, price, a_reg, reg_p) elif allocation == 'linear': # code mirrors pricing_bounds # slice for extracting # sle = slice(self.bs, a_reg) sle = slice(0, a_reg) S = self.density_df.loc[sle, ['S']].copy() loss = self.density_df.loc[sle, ['loss']] # deal losses for allocations exeqa = self.density_df.filter(regex='exeqa_').loc[sle] # last entry needs to include all remaining losses from a-bs onwards, hence: S.loc[a_reg, 'S'] = 0. ps = pd.DataFrame(-np.diff(S, prepend=1, axis=0), index=S.index) dfs = {} price = {} last_price = 0 for k, v in distortion.items(): logger.info(f'Executing for {k}, linear') if view == 'ask': gS = v.g(S) else: gS = 1 - v.g(1 - S) gS = pd.DataFrame(gS, index=S.index, columns=['S']) gps = pd.DataFrame(-np.diff(gS, prepend=1, axis=0), index=S.index) if self.sf(a_reg) > (1 - self.density_df.p_total.sum()): print('Adjusting tail losses, but skipping\n' f'Triggering sf(areg) > 1 - p_total: {self.sf(a_reg):.5g} code ') # logger.info(f'Triggering sf(areg) > 1 - p_total: {1-self.sf(a_reg):.5g} code ') # NOTE: this adjustment requires the whole tail; it has been computed in # density_df. However, when you come to risk adjusted version it hasn't # been computed. That's why the code above falls back to apply distortion. # see notes below in slow method if 1: # painful issue here with the naming leading to rner = lambda x: x.replace('exi_xgta_', 'exeqa_') # this regex does not capture the sum column if present exeqa.loc[a_reg, :] = self.density_df.filter(regex='exi_xgta_.+$(?<!exi_xgta_sum)').\ rename(columns=rner).loc[a_reg - self.bs] * a_reg # there is no exi_xgta_total, so that comes out as missing # need to fill in value if np.isnan(exeqa.loc[a_reg, 'exeqa_total']): exeqa.loc[a_reg, 'exeqa_total'] = exeqa.loc[a_reg].fillna(0).sum() # the lifted/natural difference is that here scenarios in the tail are not re- # weighted using risk adjusted probabilities. They are collapsed with objective # probs. # these are at the layer level, these compute Eq 14.20 p. 372 # note that by construction S(a) = 0 so there is no extra mass at the end exp_loss = ((ps.to_numpy() * self.bs) / loss.to_numpy() * exeqa )[::-1].cumsum()[::-1] alloc_prem = ((gps.to_numpy() * self.bs) / loss.to_numpy() * exeqa)[::-1].cumsum()[::-1] margin = alloc_prem - exp_loss # deal with last row KLUDGE, s=0, coc = gs-s/(1-gs)=0 # think about what this should be... poss shift? # reciprocal cost of capital = capital / margin = 1 - gS / (gS - S) rcoc = (1 - gS) / (gS - S) # compute 1/roe at s=1 gprime = v.g_prime(1) fv = gprime / (1 - gprime) # print(f'computed s=1 capital factor={fv}') # if gS-S=0 then gS=S=1 is possible (certain small losses); then fully loss funded, no equity, hence: rcoc = rcoc.fillna(fv).shift(1, fill_value=fv) # at S=0 also have gS-S=0, could have infinite capital = margin * rcoc.values # from IPython.display import display as d2 # d2(pd.concat((S, gS, rcoc, self.density_df.filter(regex='exi_xgta_').loc[sle], margin, capital), axis=1, # keys=['S', 'gS', 'rcoc', 'alpha', 'margin', 'capital'])) # these are integrals of alpha S and beta gS exp_loss_sum = exp_loss.replace([np.inf, -np.inf, np.nan], 0).sum() alloc_prem_sum = alloc_prem.replace([np.inf, -np.inf, np.nan], 0).sum() capital_sum = capital.replace([np.inf, -np.inf, np.nan], 0).sum() df = pd.concat((exp_loss_sum, alloc_prem_sum, capital_sum), axis=1, keys=['L', 'P', 'Q']) . \ rename(index=lambda x: x.replace('exeqa_', '')). \ sort_index() df['M'] = df.P - df.L df['LR'] = df.L / df.P df['PQ'] = df.P / df.Q df['COC'] = df.M / df.Q df['a'] = df.P + df.Q price[k] = last_price = df.loc['total', 'P'] df = df[['L', 'P', 'M', 'Q', 'a', 'LR', 'PQ', 'COC']] dfs[k] = df df = pd.concat(dfs.values(), keys=dfs.keys(), names=['distortion', 'unit']) ans = PricingResult(df, last_price, price, a_reg, reg_p) return ans
[docs] def price_ccoc(self, p, ccoc): """ Convenience function to price with a constant cost of captial equal ``ccoc`` at VaR level ``p``. Does not invoke a Distortion. Returns standard DataFrame format. """ a = self.q(p) el = self.density_df.loc[a, 'exa_total'] p = (el + ccoc * a) / (1 + ccoc) q = a - p m = p - el df = pd.DataFrame([[el, p, p - el, a - p, a, el / p, p / q, m / q]], columns=['L', 'P', "M", 'Q', 'a', 'LR', 'PQ', 'COC'], index=['total']) return df
[docs] def analyze_distortions(self, a=0, p=0, kind='lower', efficient=True, augmented_dfs=None, regex='', add_comps=True): """ Run analyze_distortion on self.dists :param a: :param p: the percentile of capital that the distortions are calibrated to :param kind: var, upper var, tvar, epd :param efficient: :param augmented_dfs: input pre-computed augmented_dfs (distortions applied) :param regex: apply only distortion names matching regex :param add_comps: add traditional pricing comps to the answer :return: """ import re a, p = self.set_a_p(a, p) dfs = [] ans = Answer() if augmented_dfs is None: augmented_dfs = {} ans['augmented_dfs'] = augmented_dfs for k, d in self.dists.items(): if regex == '' or re.match(regex, k): if augmented_dfs is not None and k in augmented_dfs: use_self = True self.augmented_df = augmented_dfs[k] logger.info(f'Found {k} in provided augmented_dfs...') else: use_self = False logger.info(f'Running distortion {d} through analyze_distortion, p={p}...') # first distortion...add the comps...these are same for all dists ad_ans = self.analyze_distortion(d, p=p, kind=kind, add_comps=(len(dfs) == 0) and add_comps, efficient=efficient, use_self=use_self) dfs.append(ad_ans.exhibit) ans[f'{k}_exhibit'] = ad_ans.exhibit ans[f'{k}_pricing'] = ad_ans.pricing if not efficient and k not in augmented_dfs: # remember, augmented_dfs is part of ans augmented_dfs[k] = ad_ans.augmented_df ans['comp_df'] = pd.concat(dfs).sort_index() return ans
[docs] def analyze_distortion(self, dname, dshape=None, dr0=.025, ddf=5.5, LR=None, ROE=None, p=None, kind='lower', A=None, use_self=False, plot=False, a_max_p=1-1e-8, add_comps=True, efficient=True): """ Graphic and summary DataFrame for one distortion showing results that vary by asset level. such as increasing or decreasing cumulative premium. Characterized by the need to know an asset level, vs. apply_distortion that produced values for all asset levels. Returns DataFrame with values upto the input asset level...differentiates from apply_distortion graphics that cover the full range. analyze_pricing will then zoom in and only look at one asset level for micro-dynamics... Logic of arguments: :: if data_in == 'self' use self.augmented_df; this implies a distortion self.distortion else need to build the distortion and apply it if dname is a distortion use it else built one calibrated to input data LR/ROE/a/p: if p then a=q(p, kind) else p = MESSY if LR then P and ROE; if ROE then Q to P to LR these are used to calibrate distortion A newly made distortion is run through apply_distortion with no plot Logic to determine assets similar to calibrate_distortions. Can pass in a pre-calibrated distortion in dname Must pass LR or ROE to determine profit Must pass p or A to determine assets Output is an `Answer` class object containing :: Answer(augmented_df=deets, trinity_df=df, distortion=dist, fig1=f1 if plot else None, fig2=f2 if plot else None, pricing=pricing, exhibit=exhibit, roe_compare=exhibit2, audit_df=audit_df) Originally `example_factory`. example_factory_exhibits included: do the work to extract the pricing, exhibit and exhibit 2 DataFrames from deets Can also accept an ans object with an augmented_df element (how to call from outside) POINT: re-run exhibits at different p/a thresholds without recalibrating add relevant items to audit_df a = q(p) if both given; if not both given derived as usual Figures show :param dname: name of distortion :param dshape: if input use dshape and dr0 to make the distortion :param dr0: :param ddf: r0 and df params for distortion :param LR: otherwise use loss ratio and p or a loss ratio :param ROE: :param p: p value to determine capital. :param kind: type of VaR, upper or lower :param A: :param use_self: if true use self.augmented and self.distortion...else recompute :param plot: :param a_max_p: percentile to use to set the right hand end of plots :param add_comps: add old-fashioned method comparables (included = True as default to make backwards comp.) :param efficient: :return: various dataframes in an Answer class object """ # setup: figure what distortion to use, apply if necessary if use_self: # augmented_df was called deets before, FYI augmented_df = self.augmented_df if type(dname) == str: dist = self.dists[dname] elif isinstance(dname, Distortion): dist = dname else: raise ValueError(f'Unexpected dname={dname} passed to analyze_distortion') a_cal = self.q(p, kind) exa = self.density_df.loc[a_cal, 'exa_total'] exag = augmented_df.loc[a_cal, 'exag_total'] K = a_cal - exag LR = exa / exag ROE = (exag - exa) / K else: # figure assets a_cal (for calibration) from p or A if p: # have p a_cal = self.q(p, kind) exa = self.density_df.loc[a_cal, 'exa_total'] else: # must have A...must be in index assert A is not None p = self.cdf(A) a_cal = self.q(p) exa = self.density_df.loc[a_cal, 'exa_total'] if a_cal != A: logger.info(f'a_cal:=q(p)={a_cal} is not equal to A={A} at p={p}') if dshape or isinstance(dname, Distortion): # specified distortion, fill in if isinstance(dname, Distortion): dist = dname else: dist = Distortion(dname, dshape, dr0, ddf) _x = self.apply_distortion(dist, create_augmented=False, efficient=efficient) augmented_df = _x.augmented_df exa = self.density_df.loc[a_cal, 'exa_total'] exag = augmented_df.loc[a_cal, 'exag_total'] profit = exag - exa K = a_cal - exag ROE = profit / K LR = exa / exag else: # figure distortion from LR or ROE and apply if LR is None: assert ROE is not None delta = ROE / (1 + ROE) nu = 1 - delta exag = nu * exa + delta * a_cal LR = exa / exag else: exag = exa / LR profit = exag - exa K = a_cal - exag ROE = profit / K # was wasteful # cd = self.calibrate_distortions(LRs=[LR], As=[a_cal], r0=dr0, df=ddf) dist = self.calibrate_distortion(dname, r0=dr0, df=ddf, roe=ROE, assets=a_cal) _x = self.apply_distortion(dist, create_augmented=False, efficient=efficient) augmented_df = _x.augmented_df # other helpful audit values # keeps track of details of calc for Answer audit_df = pd.DataFrame(dict(stat=[p, LR, ROE, a_cal, K, dist.name, dist.shape]), index=['p', 'LR', "ROE", 'a_cal', 'K', 'dname', 'dshape']) audit_df.loc['a'] = a_cal audit_df.loc['kind'] = kind # make the pricing summary DataFrame and exhibit: this just contains info about dist # in non-efficient the renamer and existing columns collide and you get duplicates # transpose turns into rows... pricing = augmented_df.rename(columns=self.tm_renamer).loc[[a_cal]].T. \ filter(regex='^(T)\.(L|P|M|Q)_', axis=0).copy() pricing = pricing.loc[~pricing.index.duplicated()] # put in exact Q rather than add up the parts...more accurate pricing.at['T.Q_total', a_cal] = a_cal - exag # TODO EVIL! this reorders the lines and so messes up when port has lines not in alpha order pricing.index = pricing.index.str.split('_|\.', expand=True) pricing = pricing.sort_index() pricing = pd.concat((pricing, pricing.xs(('T','P'), drop_level=False).rename(index={'P': 'PQ'}) / pricing.xs(('T', 'Q')).to_numpy(), pricing.xs(('T', 'L'), drop_level=False).rename(index={'L': 'LR'}) / pricing.xs(('T', 'P')).to_numpy(), pricing.xs(('T', 'M'), drop_level=False).rename(index={'M': 'ROE'}) / pricing.xs(('T', 'Q')).to_numpy() )) pricing.index.set_names(['Method', 'stat', 'line'], inplace=True) pricing = pricing.sort_index() # make nicer exhibit exhibit = pricing.unstack(2).copy() exhibit.columns = exhibit.columns.droplevel(level=0) exhibit.loc[('T', 'a'), :] = exhibit.loc[('T', 'P'), :] + exhibit.loc[('T', 'Q'), :] exhibit.loc[('T', 'a'), :] = exhibit.loc[('T', 'a'), :] * a_cal / exhibit.at[('T', 'a'), 'total'] ans = Answer(augmented_df=augmented_df, distortion=dist, audit_df=audit_df, pricing=pricing, exhibit=exhibit) if add_comps: logger.debug('Adding comps...') ans = self.analyze_distortion_add_comps(ans, a_cal, p, kind, ROE) if plot: ans = self.analyze_distortion_plots(ans, dist, a_cal, p, self.q(a_max_p), ROE, LR) # ans['exhibit'] = ans.exhibit.swaplevel(0,1).sort_index() ans['exhibit'] = ans.exhibit.rename(index={'T': f'Dist {dist.name}'}).sort_index() return ans
[docs] def analyze_distortion_add_comps(self, ans, a_cal, p, kind, ROE): """ make exhibit with comparison to old-fashioned methods: equal risk var/tvar, scaled var/tvar, stand-alone var/tvar, merton perold, co-TVaR. Not all of these methods is additive. covar method = proportion of variance (additive) Other methods could be added, e.g. a volatility method? **Note on calculation** Each method computes allocated assets a_i (which it calls Q_i) = Li + Mi + Qi All methods are constant ROEs for capital We have Li in exhibit. Hence: L = Li P = (Li + ROE ai) / (1 + ROE) = v Li + d ai Q = a - P M = P - L ratios In most cases, ai is computed directly, e.g. as a scaled proportion of total assets etc. The covariance method is slightly different. Mi = vi M, vi = Cov(Xi, X) / Var(X) Pi = Li + Mi Qi = Mi / ROE ai = Pi + Qi and sum ai = sum Li + sum Mi + sum Qi = L + M + M/ROE = L + M + Q = a as required. To fit it in the same scheme as all other methods we compute qi = Li + Mi + Qi = Li + vi M + vi M / ROE = li + viM(1+1/ROE) = Li + vi M/d, d=ROE/(1+ROE) :param ans: answer containing dist and augmented_df elements :param a_cal: :param p: :param kind: :param LR: :param ROE: :return: ans Answer object with updated elements """ exhibit = ans.exhibit.copy() # display(exhibit) total_margin = exhibit.at[('T', 'M'), 'total'] logger.debug(f'Total margin = {total_margin}') # shut the style police up: done = [] # some reasonable traditional metrics # tvar threshold giving the same assets as p on VaR logger.debug(f'In analyze_distortion_comps p={p} and a_cal={a_cal}') try: p_t = self.tvar_threshold(p, kind) except (ZeroDivisionError, ValueError) as e: logger.warning(f'Error computing p_t threshold for VaR at p={p}') logger.warning(str(e)) p_t = np.nan try: pv, pt = self.equal_risk_var_tvar(p, p_t) except (ZeroDivisionError, ValueError) as e: logger.warning(f'Error computing pv, pt equal_risk_var_tvar for VaR, p={p}, kind={kind}, p_t={p_t}') logger.warning(str(e)) pv = np.nan pt = np.nan try: pe = self.assets_2_epd[('total', 0)](a_cal) p_e = self.equal_risk_epd(a_cal) except Exception as e: logger.warning(f'Error {e} calibrating EPDs') pe = p_e = 1 - p try: temp = exhibit.loc[('T', 'L'), :] exhibit.loc[('EL', 'Q'), :] = temp / temp['total'] * a_cal done.append('EL') exhibit.loc[('VaR', 'Q'), self.line_names_ex] = [float(a.q(p, kind)) for a in self] + [self.q(p, kind)] done.append('var') exhibit.loc[('TVaR', 'Q'), self.line_names_ex] = [float(a.tvar(p_t)) for a in self] + [self.tvar(p_t)] done.append('tvar') # print(done) exhibit.loc[('ScaledVaR', 'Q'), :] = exhibit.loc[('VaR', 'Q'), :] exhibit.loc[('ScaledTVaR', 'Q'), :] = exhibit.loc[('TVaR', 'Q'), :] exhibit.loc[('ScaledVaR', 'Q'), 'total'] = 0 exhibit.loc[('ScaledTVaR', 'Q'), 'total'] = 0 sumvar = exhibit.loc[('ScaledVaR', 'Q'), :].sum() sumtvar = exhibit.loc[('ScaledTVaR', 'Q'), :].sum() exhibit.loc[('ScaledVaR', 'Q'), :] = \ exhibit.loc[('ScaledVaR', 'Q'), :] * exhibit.at[('VaR', 'Q'), 'total'] / sumvar exhibit.loc[('ScaledTVaR', 'Q'), :] = \ exhibit.loc[('ScaledTVaR', 'Q'), :] * exhibit.at[('TVaR', 'Q'), 'total'] / sumtvar exhibit.at[('ScaledVaR', 'Q'), 'total'] = exhibit.at[('VaR', 'Q'), 'total'] exhibit.at[('ScaledTVaR', 'Q'), 'total'] = exhibit.at[('TVaR', 'Q'), 'total'] # these are NOT additive exhibit.at[('VaR', 'Q'), 'total'] = sumvar exhibit.at[('TVaR', 'Q'), 'total'] = sumtvar exhibit.loc[('EqRiskVaR', 'Q'), self.line_names_ex] = [float(a.q(pv, kind)) for a in self] + [self.q(p)] done.append('eqvar') # print(done) exhibit.loc[('EqRiskTVaR', 'Q'), self.line_names_ex] = [float(a.tvar(pt)) for a in self] + [self.tvar(p_t)] done.append('eqtvar') # print(done) exhibit.loc[('MerPer', 'Q'), self.line_names_ex] = self.merton_perold(p) done.append('merper') # print(done) # co-tvar at threshold to match capital exhibit.loc[('coTVaR', 'Q'), self.line_names_ex] = self.cotvar(p_t) done.append('cotvar') # print(done) # epd and scaled epd methods at 1-p threshold vd = self.var_dict(pe, kind='epd', total='total') logger.debug(f'Computing EPD at {pe:.3%} threshold, total = {vd["total"]}') exhibit.loc[('EPD', 'Q'), vd.keys()] = vd.values() exhibit.loc[('EPD', 'Q'), 'total'] = 0. exhibit.loc[('ScaledEPD', 'Q'), :] = exhibit.loc[('EPD', 'Q'), :] * vd['total'] / \ exhibit.loc[('EPD', 'Q')].sum() exhibit.loc[('ScaledEPD', 'Q'), 'total'] = vd['total'] exhibit.loc[('EPD', 'Q'), 'total'] = exhibit.loc[('EPD', 'Q')].iloc[:-1].sum() exhibit.loc[('EqRiskEPD', 'Q'), self.line_names_ex] = [float( self.epd_2_assets[(l, 0)](p_e) ) for l in self.line_names] + \ [float(self.epd_2_assets[('total',0)](pe))] done.append('epd') # print(done) # covariance = percent of variance allocation # qi = Li + Mi + Qi = Li + vi M + vi M / ROE = li + viM(1+1/ROE) = Li + vi M/d, d=ROE/(1+ROE) d = ROE / (1 + ROE) vars_ = self.audit_df['EmpEX2'] - self.audit_df['EmpEX1']**2 vars_ = vars_ / vars_.iloc[-1] exhibit.loc[('covar', 'Q'), :] = exhibit.loc[('T', 'L'), :] + vars_ * total_margin / d done.append('covar') except Exception as e: logger.warning('Some general out of bounds error on VaRs and TVaRs, setting all equal to zero. ' f'Last completed steps {done[-1] if len(done) else "none"}, ' 'out of var, tvar, eqvar, eqtvar merper. ') logger.warning(f'The built in warning is type {type(e)} saying {e}') for c in ['VaR', 'TVaR', 'ScaledVaR', 'ScaledTVaR', 'EqRiskVaR', 'EqRiskTVaR', 'MerPer', 'coTVaR']: if c.lower() in done: # these numbers have been computed pass else: # put in placeholder exhibit.loc[(c, 'Q'), :] = np.nan exhibit = exhibit.sort_index() # subtract the premium to get the actual capital try: for m in ['EL', 'VaR', 'TVaR', 'ScaledVaR', 'ScaledTVaR', 'EqRiskVaR', 'EqRiskTVaR', 'MerPer', 'coTVaR', 'EPD', 'ScaledEPD', 'EqRiskEPD', 'covar']: # Q as computed above gives assets not equity...so adjust # usual calculus: P = (L + ra)/(1+r); Q = a-P, remember Q above is a (sorry) exhibit.loc[(m, 'L'), :] = exhibit.loc[('T', 'L'), :] exhibit.loc[(m, 'P'), :] = (exhibit.loc[('T', 'L'), :] + ROE * exhibit.loc[(m, 'Q'), :]) / (1 + ROE) exhibit.loc[(m, 'Q'), :] -= exhibit.loc[(m, 'P'), :].values exhibit.loc[(m, 'M'), :] = exhibit.loc[(m, 'P'), :] - exhibit.loc[('T', 'L'), :].values exhibit.loc[(m, 'LR'), :] = exhibit.loc[('T', 'L'), :] / exhibit.loc[(m, 'P'), :] exhibit.loc[(m, 'ROE'), :] = exhibit.loc[(m, 'M'), :] / exhibit.loc[(m, 'Q'), :] exhibit.loc[(m, 'PQ'), :] = exhibit.loc[(m, 'P'), :] / exhibit.loc[(m, 'Q'), :] exhibit.loc[(m, 'a'), :] = exhibit.loc[(m, 'P'), :] + exhibit.loc[(m, 'Q'), :] except Exception as e: logger.error(f'Exception {e} creating LR, P, Q, ROE, or PQ') # print(ans.distortion.name) # display(exhibit) ans.audit_df.loc['TVaR@'] = p_t ans.audit_df.loc['erVaR'] = pv ans.audit_df.loc['erTVaR'] = pt ans.audit_df.loc['EPD@'] = pe ans.audit_df.loc['erEPD'] = p_e ans.update(exhibit=exhibit) return ans
[docs] def analyze_distortion_plots(self, ans, dist, a_cal, p, a_max, ROE, LR): """ Create plots from an analyze_distortion ans class note: this only looks at distortion related items...it doesn't use anything from the comps :param ans: :param dist: :param a_cal: :param p: :param a_max: :param ROE: :param LR: :return: """ augmented_df = ans.augmented_df # top down stats: e.g. T.P[a_cal] - T.P and zero above a_cal # these are only going to apply to total...will not bother with by line # call this series V.P with v indicating a down arrow and a letter after and close to T # hack out space for nc in ['L', 'P', 'M', 'Q', 'LR', 'ROE', 'PQ']: augmented_df[f'V.{nc}_total'] = 0 augmented_df.loc[0:a_cal, 'V.L_total'] = \ augmented_df.at[a_cal, 'T.L_total'] - augmented_df.loc[0:a_cal, 'T.L_total'] augmented_df.loc[0:a_cal, 'V.P_total'] = \ augmented_df.at[a_cal, 'T.P_total'] - augmented_df.loc[0:a_cal, 'T.P_total'] augmented_df.loc[0:a_cal, 'V.M_total'] = \ augmented_df.at[a_cal, 'T.M_total'] - augmented_df.loc[0:a_cal, 'T.M_total'] augmented_df.loc[0:a_cal, 'V.Q_total'] = \ augmented_df.at[a_cal, 'T.Q_total'] - augmented_df.loc[0:a_cal, 'T.Q_total'] augmented_df.loc[0:a_cal, 'V.LR_total'] = \ augmented_df.loc[0:a_cal, 'V.L_total'] / augmented_df.loc[0:a_cal, 'V.P_total'] augmented_df.loc[0:a_cal, 'V.PQ_total'] = \ augmented_df.loc[0:a_cal, 'V.P_total'] / augmented_df.loc[0:a_cal, 'V.Q_total'] augmented_df.loc[0:a_cal, 'V.ROE_total'] = \ augmented_df.loc[0:a_cal, 'V.M_total'] / augmented_df.loc[0:a_cal, 'V.Q_total'] # bottom up calc is already done: it is the T. series in augmented_df # marginal calc also done: M. series # f_6_part = f_trinity = f_8_part = f_distortion = f_close = None # plot the distortion f_distortion, ax = plt.subplots(1, 1) dist.plot(ax=ax) # six part up and down plot def tidy(a, y=True): """ function to tidy up the graphics """ n = 6 a.set(xlabel='Assets') a.xaxis.set_major_locator(FixedLocator([a_cal])) ff = f'A={a_cal:,.0f}' a.xaxis.set_major_formatter(FixedFormatter([ff])) a.xaxis.set_minor_locator(MaxNLocator(n)) a.xaxis.set_minor_formatter(StrMethodFormatter('{x:,.0f}')) if y: a.yaxis.set_major_locator(MaxNLocator(n)) a.yaxis.set_minor_locator(AutoMinorLocator(4)) # gridlines with various options # https://matplotlib.org/3.1.0/gallery/color/named_colors.html a.grid(which='major', axis='x', c='cornflowerblue', alpha=1, linewidth=1) a.grid(which='major', axis='y', c='lightgrey', alpha=0.5, linewidth=1) a.grid(which='minor', axis='x', c='lightgrey', alpha=0.5, linewidth=1) a.grid(which='minor', axis='y', c='gainsboro', alpha=0.25, linewidth=0.5) # tick marks a.tick_params('x', which='major', labelsize=7, length=10, width=0.75, color='cornflowerblue', direction='out') a.tick_params('y', which='major', labelsize=7, length=5, width=0.75, color='black', direction='out') a.tick_params('both', which='minor', labelsize=7, length=2, width=0.5, color='black', direction='out') # plots f_6_part, axs = plt.subplots(3, 2, figsize=(8, 10), sharex=True, constrained_layout=True) axi = iter(axs.flatten()) # ONE ax = next(axi) # df[['Layer Loss', 'Layer Prem', 'Layer Capital']] augmented_df.filter(regex='^F|^gS|^S').rename(columns=self.renamer). \ plot(xlim=[0, a_max], ylim=[-0.025, 1.025], logy=False, title='F, S, gS: marginal premium and loss', ax=ax) tidy(ax) ax.legend(frameon=True, loc='center right') # TWO ax = next(axi) # df[['Layer Capital', 'Layer Margin']].plot(xlim=xlim, ax=ax) augmented_df.filter(regex='^M\.[QM]_total').rename(columns=self.renamer). \ plot(xlim=[0, a_max], ylim=[-0.05, 1.05], logy=False, title='Marginal Capital and Margin', ax=ax) tidy(ax) ax.legend(frameon=True, loc='center right') # THREE ax = next(axi) # df[['Premium↓', 'Loss↓', 'Capital↓', 'Assets↓', 'Risk Margin↓']].plot(xlim=xlim, ax=ax) augmented_df.filter(regex='^V\.(L|P|Q|M)_total').rename(columns=self.renamer). \ plot(xlim=[0, a_max], ylim=[0, a_max], logy=False, title=f'Decreasing LPMQ from {a_cal:.0f}', ax=ax) (a_cal - augmented_df.loc[:a_cal, 'loss']).plot(ax=ax, label='Assets') tidy(ax) ax.legend(frameon=True, loc='upper right') # FOUR ax = next(axi) augmented_df.filter(regex='^(T|V|M)\.LR_total').rename(columns=self.renamer). \ plot(xlim=[0, a_cal * 1.1], ylim=[-0.05, 1.05], ax=ax, title='Increasing, Decreasing and Marginal LRs') tidy(ax) ax.legend(frameon=True, loc='lower left') # FIVE ax = next(axi) augmented_df.filter(regex='^T\.(L|P|Q|M)_total|loss').rename(columns=self.renamer). \ plot(xlim=[0, a_max], ylim=[0, a_max], logy=False, title=f'Increasing LPMQ to {a_cal:.0f}', ax=ax) tidy(ax) ax.legend(frameon=True, loc='upper left') # SIX # could include leverage? ax = next(axi) augmented_df.filter(regex='^(M|T|V)\.ROE_(total)?$').rename(columns=self.renamer). \ plot(xlim=[0, a_max], # ylim=[-0.05, 1.05], logy=False, title=f'Increasing, Decreasing and Marginal ROE to {a_cal:.0f}', ax=ax) # df[['ROE↓', '*ROE↓', 'ROE↑', 'Marginal ROE', ]].plot(xlim=xlim, logy=False, ax=ax, ylim=ylim) # df[['ROE↓', 'ROE↑', 'Marginal ROE', 'P:S↓', 'P:S↑']].plot(xlim=xlim, logy=False, ax=a, ylim=[0,_]) ax.plot([0, a_max], [ROE, ROE], ":", linewidth=2, alpha=0.75, label='Avg ROE') # print('plot 6 completed\n' * 6) try: tidy(ax) ax.legend(loc='upper right') title = f'{self.name} with {str(dist)} Distortion\nCalibrated to LR={LR:.3f} and p={p:.3f}, ' \ f'Assets={a_cal:,.1f}, ROE={ROE:.3f}' f_6_part.suptitle(title, fontsize='x-large') except Exception as e: logger.error(f'Formatting error in last plot...\n{e}\n...continuing') # trinity plots def tidy2(a, k, xloc=0.25): n = 4 a.xaxis.set_major_locator(MultipleLocator(xloc)) a.xaxis.set_minor_locator(AutoMinorLocator(4)) a.xaxis.set_major_formatter(StrMethodFormatter('{x:.2f}')) a.yaxis.set_major_locator(MaxNLocator(2 * n)) a.yaxis.set_minor_locator(AutoMinorLocator(4)) a.grid(which='major', axis='x', c='lightgrey', alpha=0.5, linewidth=1) a.grid(which='major', axis='y', c='lightgrey', alpha=0.5, linewidth=1) a.grid(which='minor', axis='x', c='gainsboro', alpha=0.25, linewidth=0.5) a.grid(which='minor', axis='y', c='gainsboro', alpha=0.25, linewidth=0.5) # tick marks a.tick_params('both', which='major', labelsize=7, length=4, width=0.75, color='black', direction='out') a.tick_params('both', which='minor', labelsize=7, length=2, width=0.5, color='black', direction='out') # line to show where capital lies a.plot([0, 1], [k, k], linewidth=1, c='black', label='Assets') plots_done = [] try: f_trinity, axs = plt.subplots(1, 5, figsize=(8, 3), constrained_layout=True, sharey=True) axi = iter(axs.flatten()) xr = [-0.05, 1.05] audit = augmented_df.loc[:a_max, :] # ONE ax = next(axi) ax.plot(audit.gS, audit.loss, label='M.P_total') ax.plot(audit.S, audit.loss, label='M.L_total') ax.set(xlim=xr, title='Marginal Prem & Loss') ax.set(xlabel='Loss = S = Pr(X>a)\nPrem = g(S)', ylabel="Assets, a") tidy2(ax, a_cal) ax.legend(loc="upper right", frameon=True, edgecolor=None) plots_done.append(1) # TWO ax = next(axi) m = audit.F - audit.gF ax.plot(m, audit.loss, linewidth=2, label='M') ax.set(xlim=-0.01, title='Marginal Margin', xlabel='M = g(S) - S') tidy2(ax, a_cal, m.max() * 1.05 / 4) plots_done.append(2) # THREE ax = next(axi) ax.plot(1 - audit.gS, audit.loss, label='Q') ax.set(xlim=xr, title='Marginal Equity') ax.set(xlabel='Q = 1 - g(S)') tidy2(ax, a_cal) plots_done.append(3) # FOUR ax = next(axi) temp = audit.loc[self.q(1e-5):, :] r = (temp.gS - temp.S) / (1 - temp.gS) ax.plot(r, temp.loss, linewidth=2, label='ROE') ax.set(xlim=-0.05, title='Layer ROE') ax.set(xlabel='ROE = M / Q') tidy2(ax, a_cal, r.max() * 1.05 / 4) plots_done.append(4) # FIVE ax = next(axi) ax.plot(audit.S / audit.gS, audit.loss) ax.set(xlim=xr, title='Layer LR') ax.set(xlabel='LR = S / g(S)') tidy2(ax, a_cal) plots_done.append(5) except Exception as e: logger.error(f'Plotting error in trinity plots\n{e}\nPlots done {plots_done}\n...continuing') # # # # from original example_factory_sublines try: temp = augmented_df.filter(regex='exi_xgtag?_(?!sum)|^S|^gS|^(M|T)\.').copy() renamer = self.renamer augmented_df.index.name = 'Assets a' temp.index.name = 'Assets a' f_8_part, axs = plt.subplots(4, 2, figsize=(8, 10), constrained_layout=True, squeeze=False) ax = iter(axs.flatten()) # ONE a = (1 - augmented_df.filter(regex='p_').cumsum()).rename(columns=renamer).sort_index(1). \ plot(ylim=[0, 1], xlim=[0, a_max], title='Survival functions', ax=next(ax)) a.grid('b') # TWO a = augmented_df.filter(regex='exi_xgtag?').rename(columns=renamer).sort_index(1). \ plot(ylim=[0, 1], xlim=[0, a_max], title=r'$\alpha=E[X_i/X | X>a],\beta=E_Q$ by Line', ax=next(ax)) a.grid('b') # THREE total margins a = augmented_df.filter(regex=r'^T\.M').rename(columns=renamer).sort_index(1). \ plot(xlim=[0, a_max], title='Total Margins by Line', ax=next(ax)) a.grid('b') # FOUR marginal margins was dividing by bs end of first line # for some reason the last entry in M.M_total can be problematic. a = (augmented_df.filter(regex=r'^M\.M').rename(columns=renamer).sort_index(1).iloc[:-1, :]. plot(xlim=[0, a_max], title='Marginal Margins by Line', ax=next(ax))) a.grid('b') # FIVE a = augmented_df.filter(regex=r'^M\.Q|gF').rename(columns=renamer).sort_index(1). \ plot(xlim=[0, a_max], title='Capital = 1-gS = gF', ax=next(ax)) a.grid('b') for _ in a.lines: if _.get_label() == 'gF': _.set(linewidth=5, alpha=0.3) # recreate legend because changed lines a.legend() # SIX see apply distortion, line 1890 ROE is in augmented_df a = augmented_df.filter(regex='^ROE$|exi_xeqa').rename(columns=renamer).sort_index(1). \ plot(xlim=[0, a_max], title='M.ROE Total and $E[X_i/X | X=a]$ by line', ax=next(ax)) a.grid('b') # SEVEN improve scale selection a = augmented_df.filter(regex='M\.LR').rename(columns=renamer).sort_index(1). \ plot(ylim=[-.05, 1.5], xlim=[0, a_max], title='Marginal LR', ax=next(ax)) a.grid('b') # EIGHT a = augmented_df.filter(regex='T.LR_').rename(columns=renamer).sort_index(1). \ plot(ylim=[-.05, 1.25], xlim=[0, a_max], title='Increasing Total LR by Line', ax=next(ax)) a.grid('b') a.legend(loc='center right') except Exception as e: logger.error('Error', e) # # close up of plot 2 # bit = augmented_df.query(f'loss < {a_max}').filter(regex='exi_xgtag?_.*(?<!sum)$') f_close, ax = plt.subplots(1, 1, figsize=(8, 5)) ax = bit.rename(columns=renamer).plot(ylim=[-0.025, 1.025], ax=ax) ax.grid() nl = len(self.line_names) for i, l in enumerate(ax.lines[nl:]): ax.lines[i].set(linewidth=1, linestyle='--') l.set(color=ax.lines[i].get_color(), linewidth=2) ax.legend(loc='upper left') # slightly evil ans.update(fig_distortion=f_distortion, fig_six_up_down=f_6_part, fig_trinity=f_trinity, fig_eight=f_8_part, fig_close=f_close) return ans
[docs] def analysis_priority(self, asset_spec, output='df'): """ Create priority analysis report_ser. Can be called multiple times with different ``asset_specs`` asset_spec either a float used as an epd percentage or a dictionary. Entering an epd percentage generates the dictionary base = {i: self.epd_2_assets[('not ' + i, 0)](asset_spec) for i in self.line_names} :param asset_spec: epd :param output: df = pandas data frame; html = nice report, markdown = raw markdown text :return: """ ea = self.epd_2_assets ae = self.assets_2_epd if isinstance(asset_spec, dict): base = asset_spec else: if type(asset_spec) != float: raise ValueError("Input dictionary or float = epd target") base = {i: ea[('not ' + i, 0)](asset_spec) for i in self.line_names} if output == 'df': priority_analysis_df = pd.DataFrame( columns=['a', 'chg a', 'not_line epd sa @a', 'line epd @a 2pri', 'not_line epd eq pri', 'line epd eq pri', 'total epd'], index=pd.MultiIndex.from_arrays([[], []], names=['Line', 'Scenario'])) for col in set(self.line_names).intersection(set(base.keys())): notcol = 'not ' + col a_base = base[col] a = a_base e0 = ae[(notcol, 0)](a_base) e = e0 priority_analysis_df.loc[(col, 'base'), :] = ( a, a - a_base, e, ae[(col, 2)](a), ae[(notcol, 1)](a), ae[(col, 1)](a), ae[('total', 0)](a)) a = ea[(col, 2)](e0) priority_analysis_df.loc[(col, '2pri line epd = not line sa'), :] = ( a, a - a_base, ae[(notcol, 0)](a), ae[(col, 2)](a), ae[(notcol, 1)](a), ae[(col, 1)](a), ae[('total', 0)](a)) a = ea[(col, 2)](priority_analysis_df.ix[(col, 'base'), 'line epd eq pri']) priority_analysis_df.loc[(col, 'thought buying (line 2pri epd = base not line eq pri epd'), :] = ( a, a - a_base, ae[(notcol, 0)](a), ae[(col, 2)](a), ae[(notcol, 1)](a), ae[(col, 1)](a), ae[('total', 0)](a)) a = ea[(notcol, 1)](e0) priority_analysis_df.loc[(col, 'fair to not line, not line eq pri epd = base sa epd'), :] = ( a, a - a_base, ae[(notcol, 0)](a), ae[(col, 2)](a), ae[(notcol, 1)](a), ae[(col, 1)](a), ae[('total', 0)](a)) a = ea[(col, 1)](e0) priority_analysis_df.loc[(col, 'line eq pri epd = base not line sa'), :] = ( a, a - a_base, ae[(notcol, 0)](a), ae[(col, 2)](a), ae[(notcol, 1)](a), ae[(col, 1)](a), ae[('total', 0)](a)) a = ea[('total', 0)](e0) priority_analysis_df.loc[(col, 'total epd = base sa not line epd'), :] = ( a, a - a_base, ae[(notcol, 0)](a), ae[(col, 2)](a), ae[(notcol, 1)](a), ae[(col, 1)](a), ae[('total', 0)](a)) priority_analysis_df['pct chg'] = priority_analysis_df['chg a'] / priority_analysis_df.a return priority_analysis_df # else HTML or markdown output ans = [] for line in set(self.line_names).intersection(set(base.keys())): a = base[line] e = ae[(f'not {line}', 0)](a) a0 = float(ea[('total', 0)](e)) eb0a0 = ae[(f'not {line}', 0)](a0) eba0 = ae[(f'not {line}', 1)](a0) e2a0 = ae[(line, 2)](a0) e1a0 = ae[(line, 1)](a0) e2 = ae[(line, 2)](a) e1 = float(ae[(line, 1)](a)) a2 = float(ea[(line, 2)](e1)) af = float(ea[(f'not {line}', 1)](e)) af2 = float(ea[(line, 1)](e)) a3 = float(ea[(line, 2)](e)) a4 = float(ea[(f'not {line}', 1)](e)) story = f""" Consider adding **{line}** to the existing portfolio. The existing portfolio has capital {a:,.1f} and and epd of {e:.4g}. <ul> <li> If {line} is added as second priority to the existing lines with no increase in capital it has an epd of {e2:.4g}. <li> If the regulator requires the overall epd be a constant then the firm must increase capital to {a0:,.1f} or by {(a0 / a - 1) * 100:.2f} percent. - At the higher capital {line} has an epd of {e2a0:.4g} as second priority and the existing lines have an epd of {eb0a0:.4g} as first priority. - The existing and {line} epds under equal priority are {eba0:.4g} and {e1a0:.4g}. <li> If {line} *thought* it was added at equal priority it would have expected an epd of {e1:.4g}. In order to achieve this epd as second priority would require capital of {a2:,.1f}, an increase of {(a2 / a - 1) * 100:.2f} percent. <li> In order for {line} to have an epd equal to the existing lines as second priority would require capital of {a3:,.1f}, and increase of {(a3 / a - 1) * 100:.2f} percent. <li> In order for {line} to be added at equal priority and for the existing lines to have an unchanged epd requires capital of {af:,.1f}, an increase of {(af / a - 1) * 100:.2f} percent. <li> In order for {line} to be added at equal priority and to have an epd equal to the existing line epd requires capital of {af2:,.1f}, an increase of {(af2 / a - 1) * 100:.2f} percent. <li> In order for the existing lines to have an unchanged epd at equal priority requires capital of {a4:,.1f}, an increase of {(a4 / a - 1) * 100:.2f} percent. <ul> """ ans.append(story) ans = '\n'.join(ans) if output == 'html': display(HTML(ans)) else: return ans
[docs] def analysis_collateral(self, line, c, a, debug=False): """ E(C(a,c)) expected value of line against not line with collateral c and assets a, c <= a :param line: line of business with collateral, analyzed against not line :param c: collateral, c <= a required; c=0 reproduces exa, c=a reproduces lev :param a: assets, assumed less than the max loss (i.e. within the square) :param debug: :return: """ assert (c <= a) xs = self.density_df['loss'].values pline = self.density_df['p_' + line].values notline = self.density_df['ημ_' + line].values ans = [] gt, incr, int1, int2, int3 = 0, 0, 0, 0, 0 c1, c2, c3 = 0, 0, 0 n_c = int(c / self.bs) n_max = len(xs) # this works as a rhs array[0:n_max] is the whole array, array[n_max] is an error err_count = 0 for loss in np.arange(a + self.bs, 2 * xs.max(), self.bs): n_loss = int(loss / self.bs) # 0...loss INCLUSIVE c1 = c / a * loss n_c1 = min(n_max, int(c1 / self.bs)) # need to adjust for trimming when loss > max loss # loss indexes...see notes in blue book la = max(0, n_loss - (n_max - 1)) lc = min(n_loss, n_max - 1) lb = lc + 1 if la == 0: ld = None else: ld = la - 1 try: s1 = slice(la, max(la, min(lb, n_c))) s2 = slice(max(la, min(lb, n_c)), max(la, min(lb, n_c1))) s3 = slice(max(la, min(lb, n_c1)), lb) if ld is None: # means go all the way to zero, do not have to worry about n_loss - n_c > 0 being smaller s1r = slice(lc, min(lc, n_loss - n_c), -1) s2r = slice(min(lc, n_loss - n_c), min(lc, n_loss - n_c1), -1) s3r = slice(min(lc, n_loss - n_c1), ld, -1) else: s1r = slice(lc, max(ld, min(lc, n_loss - n_c)), -1) s2r = slice(max(ld, min(lc, n_loss - n_c)), max(ld, min(lc, n_loss - n_c1)), -1) s3r = slice(max(ld, min(lc, n_loss - n_c1)), ld, -1) int1 = np.sum(xs[s1] * pline[s1] * notline[s1r]) int2 = c * np.sum(pline[s2] * notline[s2r]) int3 = a / loss * np.sum(xs[s3] * pline[s3] * notline[s3r]) ptot = np.sum(pline[s3] * notline[s3r]) except ValueError as e: logger.error(e) logger.error(f"Value error: loss={loss}, loss/b={loss / self.bs}, c1={c1}, c1/b={c1 / self.bs}") logger.error(f"n_loss {n_loss}, n_c {n_c}, n_c1 {n_c1}") logger.error(f'la={la}, lb={lb}, lc={lc}, ld={ld}') logger.error('ONE:', *map(len, [xs[s1], pline[s1], notline[s1r]])) logger.error('TWO:', *map(len, [pline[s2], notline[s2r]])) logger.error('THR:', *map(len, [xs[s3], pline[s3], notline[s3r]])) err_count += 1 if err_count > 3: break if n_loss < n_max: p = self.density_df.loc[loss, 'p_total'] else: p = np.nan incr = (int1 + int2 + int3) gt += incr c1 += int1 c2 += int2 c3 += int3 if debug: ans.append([loss, int1, int2, int3, int3 * loss / a / ptot, ptot, incr, c1, c2, c3, gt, p]) if incr / gt < 1e-12: if debug: logger.info(f'incremental change {incr / gt:12.6f}, breaking') break exlea = self.density_df.loc[a, 'exlea_' + line] exgta = self.density_df.loc[a, 'exgta_' + line] exix = self.density_df.loc[a, 'exi_xgta_' + line] exeqa = self.density_df.loc[a, 'exeqa_' + line] p_total = self.density_df.loc[a, 'p_total'] F = self.density_df.loc[a, 'F'] exa = self.density_df.loc[a, 'exa_' + line] lev = self.density_df.loc[a, 'lev_' + line] df = pd.DataFrame( [(line, a, c, p_total, F, gt, a * exix * (1 - F), exeqa, exlea, exgta, exix, exa, gt + exlea * F, lev)], columns=['line', 'a', 'c', 'p_total', 'F', 'gt', 'exa_delta', 'exeqa', 'exlea', 'exgta', 'exix', 'exa', 'ecac', 'lev'], ) if debug: ans = pd.DataFrame(ans, columns=['loss', 'int1', 'int2', 'int3', 'exeqa', 'ptot', 'incr', 'c1', 'c2', 'c3', 'gt', 'log']) ans = ans.set_index('loss', drop=True) ans.index.name = 'loss' else: ans = None return df, ans
[docs] def uat_differential(self, line): """ Check the numerical and theoretical derivatives of exa agree for given line :param line: :return: """ test = self.density_df[f'exa_{line}'] dtest = np.gradient(test, test.index) dtest2 = self.density_df[f'exi_xgta_{line}'] * self.density_df.S ddtest = np.gradient(dtest) ddtest2 = -self.density_df[f'exeqa_{line}'] / self.density_df.loss * self.density_df.p_total f, axs = plt.subplots(1, 3, figsize=(12, 4)) axs[0].plot(test.index, test, label=f'exa_{line}') axs[1].plot(test.index, dtest, label='numdiff') axs[1].plot(test.index, dtest2, label='xi_xgta S(x)') axs[1].legend() axs[2].plot(test.index, ddtest, label='numdiff') axs[2].plot(test.index, ddtest2, label='-EXi(a)/a') axs[2].legend()
[docs] def uat(self, As=None, Ps=[0.98], LRs=[0.965], r0=0.03, num_plots=1, verbose=False): """ Reconcile apply_distortion(s) with price and calibrate :type Ps: object :param As: Asset levels :param Ps: probability levels used to determine asset levels using quantile function :param LRs: loss ratios used to determine profitability :param r0: r0 level for distortions :param verbose: controls level of output :return: """ # figure As if As is None: As = [] for p in Ps: As.append(self.q(p)) # 0. Calibrate params = self.calibrate_distortions(LRs=LRs, As=As, r0=r0) # 1. Apply and compare to calibration K = As[0] LR = LRs[0] idx = (K, LR) dd = Distortion.distortions_from_params(params, index=idx, r0=r0) if num_plots == 2: axiter = axiter_factory(None, len(dd)) elif num_plots == 3: axiter = axiter_factory(None, 30) else: axiter = None table, stacked = self.apply_distortions(dd, As, axiter, num_plots) table['lr err'] = table['lr_total'] - LR # 2. Price and compare to calibration pdfs = [] # pricing data frmes for name in Distortion.available_distortions(): pdf, _ = self.price(reg_g=K, pricing_g=dd[name]) pdf['dist'] = name pdfs.append(pdf) p = pd.concat(pdfs) p['lr err'] = p['lr'] - LR # a from apply, p from price a = table.query(f' loss=={K} ') # easier tests # sum of parts = total logger.info( f'Portfolio.uat | {self.name} Sum of parts all close to total: ' f'{np.allclose(a.exag_total, a.exag_sumparts)}') logger.info( f'Portfolio.uat | {self.name} Sum of parts vs total: ' f'{np.sum(np.abs(a.exag_total - a.exag_sumparts)):15,.1f}') pp = p[['dist', 'exag']] pp = pp.pivot(columns='dist').T.loc['exag'] aa = a.filter(regex='exa|method').set_index('method') test = pd.concat((aa, pp), axis=1, sort=True) for c in self.line_names_ex: test[f'err_{c}'] = test[c] / test[f'exag_{c}'] - 1 test['err sum/total'] = test['exag_sumparts'] / test['exag_total'] - 1 test = test[ [f'{i}{j}' for j in self.line_names_ex for i in ['exag_', '', 'err_']] + ['exag_sumparts', 'err sum/total']] lr_err = pd.DataFrame({'applyLR': a.lr_total, 'method': a.method, 'target': LR, 'errs': a.lr_total - LR}) lr_err = lr_err.reset_index(drop=False).set_index('method') lr_err = lr_err.rename(columns={'index': 'a'}) test = pd.concat((test, lr_err), axis=1, sort=True) overall_test = (test.filter(regex='err').abs()).sum().sum() if verbose: html_title(f'Combined, overall error {overall_test:.3e}') # (exag=apply)') display(test) if lr_err.errs.abs().max() > 1e-4: logger.error('Portfolio.uat | {self.name} UAT Loss Ratio Error {lr_err.errs.abs().max()}') if overall_test < 1e-7: logger.info(f'Portfolio.uat | {self.name} UAT All good, total error {overall_test:6.4e}') else: s = f'{self.name} UAT total error {overall_test:6.4e}' logger.error(f'Portfolio.uat | {s}') logger.error(f'Portfolio.uat | {s}') logger.error(f'Portfolio.uat | {s}') return a, p, test, params, dd, table, stacked
@property def renamer(self): """ write a sensible renamer for the columns to use thusly self.density_df.rename(columns=renamer) write a tex version separately Create once per item...assume lines etc. never change :return: dictionary that can be used to rename columns """ if self._renamer is None: # make it # key : (before, after) # ημ needs to come last because of epd_nu etc. # keep lEV and EX(a) separate because otherwise index has non-unique values self._renamer = {} meta_namer = dict(p_=('', ' density'), lev_=('LEV[', 'a]'), exag_=('EQ[', '(a)]'), exa_=('E[', '(a)]'), exlea_=('E[', ' | X<=a]'), exgta_=('E[', ' | X>a]'), exeqa_=('E[', ' | X=a]'), e1xi_1gta_=('E[1/', ' 1(X>a)]'), exi_x_=('E[', '/X]'), exi_xgta_sum=('Sum Xi/X gt', ''), # exi_xgta_sum=('Sum of E[Xi/X|X>a]', ''), exi_xeqa_sum=("Sum Xi/X eq", ''), # exi_xeqa_sum=("Sum of E[Xi/X|X=a]", ''), exi_xgta_=('α=E[', '/X | X>a]'), exi_xeqa_=('E[', '/X | X=a]'), exi_xlea_=('E[', '/X | X<=a]'), epd_0_=('EPD(', ') stand alone'), epd_1_=('EPD(', ') within X'), epd_2_=('EPD(', ') second pri'), e2pri_=('E[X', '(a) second pri]'), ημ_=('All but ', ' density') ) for l in self.density_df.columns: if re.search('^ημ_', l): # nu_line -> not line density self._renamer[l] = re.sub('^ημ_([0-9A-Za-z\-_.,]+)', r'not \1 density', l) else: l0 = l.replace('ημ_', 'not ') for k, v in meta_namer.items(): d1 = l0.find(k) if d1 >= 0: d1 += len(k) b, a = v self._renamer[l] = f'{b}{l0[d1:]}{a}'.replace('total', 'X') break # augmented df items for l in self.line_names_ex: self._renamer[f'exag_{l}'] = f'EQ[{l}(a)]' self._renamer[f'exi_xgtag_{l}'] = f'β=EQ[{l}/X | X>a]' self._renamer[f'exi_xleag_{l}'] = f'EQ[{l}/X | X<=a]' self._renamer[f'e1xi_1gta_{l}'] = f'E[{l}/X 1(X >a)]' self._renamer['exag_sumparts'] = 'Sum of EQ[Xi(a)]' # more post-processing items for pre, m1 in zip(['M', 'T'], ['Marginal', 'Total']): for post, m2 in zip(['L', 'P', 'LR', 'Q', 'ROE', "PQ", "M"], ['Loss', 'Premium', 'Loss Ratio', 'Equity', 'ROE', 'Leverage (P:S)', "Margin"]): self._renamer[f'{pre}.{post}'] = f'{m1} {m2}' for line in self.line_names_ex: for pre, m1 in zip(['M', 'T'], ['Marginal', 'Total']): for post, m2 in zip(['L', 'P', 'LR', 'Q', 'ROE', "PQ", "M"], ['Loss', 'Premium', 'Loss Ratio', 'Equity', 'ROE', 'Leverage (P:S)', "Margin"]): self._renamer[f'{pre}.{post}_{line}'] = f'{m1} {m2} {line}' self._renamer['A'] = 'Assets' # self._renamer['exi/xgta'] = 'α=E[Xi/X | X > a]' # self._renamer['exi/xgtag'] = 'β=E_Q[Xi/X | X > a]' # gamma files for l in self.line_names: self._renamer[f'gamma_{l}_sa'] = f{l} stand-alone" self._renamer[f'gamma_{self.name}_{l}'] = f{l} part of {self.name}" self._renamer[f'p_{l}'] = f'{l} stand-alone density' self._renamer[f'S_{l}'] = f'{l} stand-alone survival' self._renamer['p_total'] = f'{self.name} total density' self._renamer['S_total'] = f'{self.name} total survival' self._renamer[f'gamma_{self.name}_total'] = f{self.name} total" # for enhanced exhibits --- these are a bit specialized! self._renamer['mv'] = "$\\mathit{MVL}(a)$??" for orig in self.line_names_ex: l = self.line_renamer.get(orig, orig).replace('$', '') if orig == 'total': self._renamer[f'S'] = f"$S_{{{l}}}(a)$" else: self._renamer[f'S_{orig}'] = f"$S_{{{l}}}(a)$" self._renamer[f'lev_{orig}'] = f"$E[{l}\\wedge a]$" self._renamer[f'levg_{orig}'] = f"$\\rho({l}\\wedge a)$" self._renamer[f'exa_{orig}'] = f"$E[{l}(a)]$" self._renamer[f'exag_{orig}'] = f"$\\rho({l}\\subseteq X^c\\wedge a)$" self._renamer[f'ro_da_{orig}'] = "$\\Delta a_{ro}$" self._renamer[f'ro_dmv_{orig}'] = "$\\Delta \\mathit{MVL}_{ro}(a)$" self._renamer[f'ro_eva_{orig}'] = "$\\mathit{EGL}_{ro}(a)$" self._renamer[f'gc_da_{orig}'] = "$\\Delta a_{gc}$" self._renamer[f'gc_dmv_{orig}'] = "$\\Delta \\mathit{MVL}_{gc}(a)$" self._renamer[f'gc_eva_{orig}'] = "$\\mathit{EGL}_{gc}(a)$" return self._renamer @property def line_renamer(self): """ plausible defaults for nicer looking names replaces . or : with space and capitalizes (generally don't use . because it messes with analyze distortion.... leaves : alone converts X1 to tex converts XM1 to tex with minus (for reserves) :return: """ def rename(ln): # guesser ... if ln == 'total': return 'Total' if ln.find('.') > 0: return ln.replace('.', ' ').title() if ln.find(':') > 0: return ln.replace(':', ' ').title() # numbered lines ln = re.sub('([A-Z])m([0-9]+)', r'$\1_{-\2}$', ln) ln = re.sub('([A-Z])([0-9]+)', r'$\1_{\2}$', ln) return ln if self._line_renamer is None: self._line_renamer = { ln: rename(ln) for ln in self.line_names_ex} return self._line_renamer @property def tm_renamer(self): """ rename exa -> TL, exag -> TP etc. :return: """ if self._tm_renamer is None: self._tm_renamer = { f'exa_{l}' : f'T.L_{l}' for l in self.line_names_ex} self._tm_renamer.update({ f'exag_{l}' : f'T.P_{l}' for l in self.line_names_ex}) return self._tm_renamer @property def stat_renamer(self): return dict('')
[docs] def set_a_p(self, a, p): """ sort out arguments for assets and prob level and make them consistent neither => set defaults a only set p p only set a both do nothing """ if a == 0 and p== 0: p = 0.995 a = self.q(p) # click to exact p = self.cdf(a) elif a: p = self.cdf(a) # snap to index a = self.q(p) elif p: a = self.q(p) # click to exact p = self.cdf(a) return a, float(p)
[docs] @staticmethod def from_DataFrame(name, df): """ create portfolio from pandas dataframe uses columns with appropriate names Can be fed the agg output of uw.write_test( agg_program ) :param name: :param df: :return: """ # ...and this is why we love pandas so much spec_list = [g.dropna(axis=1).to_dict(orient='records') for n, g in df.groupby('name')] spec_list = [i[0] for i in spec_list] return Portfolio(name, spec_list)
[docs] @staticmethod def from_Excel(name, ffn, sheet_name, **kwargs): """ read in from Excel works via a Pandas dataframe; kwargs passed through to pd.read_excel drops all blank columns (mostly for auditing purposes) delegates to from_dataFrame :param name: :param ffn: full file name, including path :param sheet_name: :param kwargs: :return: """ df = pd.read_excel(ffn, sheet_name=sheet_name, **kwargs) df = df.dropna(axis=1, how='all') return Portfolio.from_DataFrame(name, df)
[docs] @staticmethod def from_dict_of_aggs(prefix, agg_dict, sub_ports=None, uw=None, bs=0, log2=0, padding=2, **kwargs): """ Create a portfolio from any iterable with values aggregate code snippets e.g. agg_dict = {label: agg_snippet } will create all the portfolios specified in subsets, or all if subsets=='all' labels for subports are concat of keys in agg_dict, so recommend you use A:, B: etc. as the snippet names. Portfolio names are prefix_[concat element names] agg_snippet is line agg blah without the tab or newline :param prefix: :param agg_dict: :param sub_ports: :param bs, log2, padding, kwargs: passed through to update; update if bs * log2 > 0 :return: """ agg_names = list(agg_dict.keys()) # holder for created portfolios ports = Answer() if sub_ports == 'all': sub_ports = subsets(agg_names) for sub_port in sub_ports: name = ''.join(sub_port) if prefix != '': name = f'{prefix}_{name}' pgm = f'port {name}\n' for l in agg_names: if l in sub_port: pgm += f'\t{agg_dict[l]}\n' if uw: ports[name] = uw.write(pgm) else: ports[name] = pgm if uw and bs * log2 > 0: ports[name].update(log2=log2, bs=bs, padding=padding, **kwargs) return ports
# enhanced portfolio methods (see description in class doc string)
[docs] def premium_capital(self, a=0, p=0): """ at a if given else p level of capital pricing story allows two asset levels...handle that with a concat was premium_capital_detail """ a, p = self.set_a_p(a, p) # story == run off # pricing report from adf dm = self.augmented_df.filter(regex=f'T.[MPQLROE]+.({self.line_name_pipe})').loc[[a]].T dm.index = dm.index.str.split('_', expand=True) self._raw_premium_capital = dm.unstack(1) self._raw_premium_capital = self._raw_premium_capital.droplevel(axis=1, level=0) # .drop(index=['Assets']) self._raw_premium_capital.loc['T.A', :] = self._raw_premium_capital.loc['T.Q', :] + \ self._raw_premium_capital.loc['T.P', :] self._raw_premium_capital.index.name = 'Item' self.EX_premium_capital = self._raw_premium_capital. \ rename(index=self.premium_capital_renamer, columns=self.line_renamer).sort_index()
[docs] def multi_premium_capital(self, As, keys=None): """ concatenate multiple prem_capital exhibits """ if keys is None: keys = [f'a={i:.1f}' for i in As] ans = [] for a in As: self.premium_capital(a) ans.append(self.EX_premium_capital.copy()) self.EX_multi_premium_capital = pd.concat(ans, axis=1, keys=keys, names=['Assets', "Line"])
[docs] def accounting_economic_balance_sheet(self, a=0, p=0): """ story version assumes line 0 = reserves and 1 = prospective....other than that identical usual a and p rules """ # check for update self.premium_capital(a, p) aebs = pd.DataFrame(0.0, index=self.line_names_ex + ['Assets', 'Equity'], columns=['Statutory', 'Objective', 'Market', 'Difference']) slc = slice(0, len(self.line_names_ex)) aebs.iloc[slc, 0] = self.audit_df['Mean'].values aebs.iloc[slc, 1] = self._raw_premium_capital.loc['T.L'] aebs.iloc[slc, 2] = self._raw_premium_capital.loc['T.P'] aebs.loc['Assets', :] = self._raw_premium_capital.loc['T.A', 'total'] aebs.loc['Equity', :] = aebs.loc['Assets'] - \ aebs.loc['total'] aebs[ 'Difference'] = aebs.Market - aebs.Objective # put in accounting order aebs = aebs.iloc[ [-2] + list(range(len(self.line_names) + 1)) + [-1]] aebs.index.name = 'Item' self.EX_accounting_economic_balance_sheet = aebs.rename(index=self.line_renamer)
[docs] def make_all(self, p=0, a=0, As=None): """ make all exhibits with sensible defaults if not entered, paid line is selected as the LAST line """ a, p = self.set_a_p(a, p) # exhibits that require a distortion if self.distortion is not None: self.premium_capital(a=a, p=p) if As is not None: self.multi_premium_capital(As) self.accounting_economic_balance_sheet(a=a, p=p)
[docs] def show_enhanced_exhibits(self, fmt='{:.5g}'): """ show all the exhibits created by enhanced_portfolio methods """ display(HTML(f'<h2>Exhibits for {self.name.replace("_", " ").title()} Portfolio</h2>')) for x in dir(self): if x[0:3] == 'EX_': ob = getattr(self, x) if isinstance(ob, pd.DataFrame): # which they all will be... display(HTML(f'<h3>{x[3:].replace("_", " ").title()}</h3>')) display(ob.style.format(fmt, subset=ob.select_dtypes(np.number).columns)) display(HTML(f'<hr>'))
[docs] def profit_segment_plot(self, ax, p, line_names, dist_name, colors=None, translations=None): """ Lee diagram for each requested line on a stand-alone basis, loss and risk adj premium using the dist_name distortion. Optionally specify colors, using C{n}. Optionally specify translations applied to each line. Generally, this applies to shift the cat line up by E[non cat] losses to show it overlays the total. For a Portfolio with line names CAT and NC:: port.gross.profit_segment_plot(ax, 0.99999, ['total', 'CAT', 'NC'], 'wang', [2,0,1]) add translation to cat line:: port.gross.profit_segment_plot(ax, 0.99999, ['total', 'CAT', 'NC'], 'wang', [2,0,1], [0, E[NC], 0]) :param ax: axis on which to render :param p: probability level to set upper and lower y axis limits (p and 1-p quantiles) :param line_names: :param dist_name: :param colors: :param translations: :return: """ dist = self.dists[dist_name] if colors is None: colors = range(len(line_names)) if translations is None: translations = [0] * len(line_names) for line, cn, translation in zip(line_names, colors, translations): c = f'C{cn}' f1 = self.density_df[f'p_{line}'].cumsum() idx = (f1 < p) * (f1 > 1.0 - p) f1 = f1[idx] gf = 1 - dist.g(1 - f1) x = self.density_df.loss[idx] + translation ax.plot(gf, x, '-', c=c, label=f'Risk Adj {line}' if translation == 0 else None) ax.plot(f1, x, '--', c=c, label=line if translation == 0 else None) if translation == 0: ax.fill_betweenx(x, gf, f1, color=c, alpha=0.5) else: ax.fill_betweenx(x, gf, f1, color=c, edgecolor='black', alpha=0.5) # if you plot a small line this needs to start at zer0! ax.set(ylim=[0, self.q(p)]) ax.legend(loc='upper left')
[docs] def natural_profit_segment_plot(self, ax, p, line_names, colors, translations): """ Plot the natural allocations between 1-p and p th percentiles and optionally translate line(s). Works with augmented_df, no input distortion. User must ensure the correct distortion has been applied. :param ax: :param p: :param line_names: :param colors: :param translations: :return: """ lw, up = self.q(1 - p), self.q(p) # common extract for all lines bit = self.augmented_df.query(f' {lw} <= loss <= {up} ') F = bit[f'F'] gF = bit[f'gF'] for line, cn, translation in zip(line_names, colors, translations): c = f'C{cn}' ser = bit[f'exeqa_{line}'] ax.plot(F, ser, ls='dashed', c=c) ax.plot(gF, ser, c=c) if translation == 0: ax.fill_betweenx(ser + translation, gF, F, color=c, alpha=0.5, label=line) else: ax.fill_betweenx(ser, gF, F, color=c, alpha=0.5, label=line) # see comment above. ax.set(ylim=[0, up], title=self.distortion) ax.legend(loc='upper left')
[docs] def density_sample(self, n=20, reg="loss|p_|exeqa_"): """ sample of equally likely points from density_df with interesting columns reg - regex to select the columns """ ps = np.linspace(0.001, 0.999, n) xs = [self.q(i) for i in ps] return self.density_df.filter(regex=reg).loc[xs, :].rename(columns=self.renamer)
[docs] def biv_contour_plot(self, fig, ax, min_loss, max_loss, jump, log=True, cmap='Greys', min_density=1e-15, levels=30, lines=None, linecolor='w', colorbar=False, normalize=False, **kwargs): """ Make contour plot of line A vs line B. Assumes port only has two lines. Works with an extract density_df.loc[np.arange(min_loss, max_loss, jump), densities] (i.e., jump is the stride). Jump = 100 * bs is not bad...just think about how big the outer product will get! :param fig: :param ax: :param min_loss: the density for each line is sampled at min_loss:max_loss:jump :param max_loss: :param jump: :param log: :param cmap: :param min_density: smallest density to show on underlying log region; not used if log :param levels: number of contours or the actual contours if you like :param lines: iterable giving specific values of k to plot X+Y=k :param linecolor: :param colorbar: show color bar :param normalize: if true replace Z with Z / sum(Z) :param kwargs: passed to contourf (e.g., use for corner_mask=False, vmin,vmax) :return: """ # careful about origin when big prob of zero loss npts = np.arange(min_loss, max_loss, jump) ps = [f'p_{i}' for i in self.line_names] bit = self.density_df.loc[npts, ps] n = len(bit) Z = bit[ps[1]].to_numpy().reshape(n, 1) @ bit[ps[0]].to_numpy().reshape(1, n) if normalize: Z = Z / np.sum(Z) X, Y = np.meshgrid(bit.index, bit.index) if log: z = np.log10(Z) mask = np.zeros_like(z) mask[z == -np.inf] = True mz = np.ma.array(z, mask=mask) cp = ax.contourf(X, Y, mz, levels=levels, cmap=cmap, **kwargs) # cp = ax.contourf(X, Y, mz, levels=np.linspace(-12, 0, levels), cmap=cmap, **kwargs) if colorbar: cb = fig.colorbar(cp, fraction=.1, shrink=0.5, aspect=10) cb.set_label('Log10(Density)') else: mask = np.zeros_like(Z) mask[Z < min_density] = True mz = np.ma.array(Z, mask=mask) cp = ax.contourf(X, Y, mz, levels=levels, cmap=cmap, **kwargs) if colorbar: cb = fig.colorbar(cp) cb.set_label('Density') # put in X+Y=c lines if lines is None: lines = np.arange(max_loss / 4, 2 * max_loss, max_loss / 4) try: for x in lines: ax.plot([0, x], [x, 0], lw=.75, c=linecolor, label=f'Sum = {x:,.0f}') except: pass title = (f'Bivariate Log Density Contour Plot\n{self.name.replace("_", " ").title()}' if log else f'Bivariate Density Contour Plot\n{self.name.replace("_", " ")}') # previously limits set from min_loss, but that doesn't seem right ax.set(xlabel=f'Line {self.line_names[0]}', ylabel=f'Line {self.line_names[1]}', xlim=[-max_loss / 50, max_loss], ylim=[-max_loss / 50, max_loss], title=title, aspect=1) # for post-processing self.X = X self.Y = Y self.Z = Z
[docs] def nice_program(self, wrap_col=90): """ return wrapped version of port program :return: """ return fill(self.program, wrap_col, subsequent_indent='\t\t', replace_whitespace=False)
def short_renamer(self, prefix='', postfix=''): if prefix: prefix = prefix + '_' if postfix: postfix = '_' + postfix knobble = lambda x: 'Total' if x == 'total' else x return {f'{prefix}{i}{postfix}': knobble(i).title() for i in self.line_names_ex}
[docs] def twelve_plot(self, fig, axs, p=0.999, p2=0.9999, xmax=0, ymax2=0, biv_log=True, legend_font=0, contour_scale=10, sort_order=None, kind='two', cmap='viridis'): """ Twelve-up plot for ASTIN paper and book, by rc index: Greys for grey color map 11 density 12 log density 13 biv density plot 21 kappa 22 alpha (from alpha beta plot 4) 23 beta (?with alpha) row 3 = line A, row 4 = line B from alpha beta four 2 1 S, gS, aS, bgS 32 margin 33 shift margin 42 cumul margin 43 natural profit compare **Args** self = portfolio or enhanced portfolio object p control xlim of plots via quantile; used if xmax=0 p2 controls ylim for 33 and 34: stand alone M and natural M; used if ymax2=0 biv_log - bivariate plot on log scale legend_font - fine tune legend font size if necessary sort_order = plot sorts by column and then .iloc[:, sort_order], if None [1,2,0] from common_scripts.py """ a11, a12, a13, a21, a22, a23, a31, a32, a33, a41, a42, a43 = axs.flat col_list = plt.rcParams['axes.prop_cycle'].by_key()['color'] lss = ['solid', 'dashed', 'dotted', 'dashdot'] if sort_order is None: sort_order = [1, 2, 0] # range(len(self.line_names_ex)) if xmax == 0: xmax = self.q(p) ymax = xmax # density and log density temp = self.density_df.filter(regex='p_').rename(columns=self.short_renamer('p')).sort_index(axis=1).loc[:xmax] temp = temp.iloc[:, sort_order] temp.index.name = 'Loss' l1 = temp.plot(ax=a11, lw=1) l2 = temp.plot(ax=a12, lw=1, logy=True) l1.lines[-1].set(linewidth=1.5) l2.lines[-1].set(linewidth=1.5) a11.set(title='Density') a12.set(title='Log density') a11.legend() a12.legend() # biv den plot if kind == 'two': # biv den plot # min_loss, max_loss, jump = 0, xmax, (2 ** (self.log2 - 8)) * self.bs xmax = self.snap(xmax) min_loss, max_loss, jump = 0, xmax, self.snap(xmax / 255) min_density = 1e-15 levels = 30 color_bar = False # careful about origin when big prob of zero loss # handle for discrete distributions ps = [f'p_{i}' for i in self.line_names] title = 'Bivariate density' query = ' or '.join([f'`p_{i}` > 0' for i in self.line_names]) if self.density_df.query(query).shape[0] < 512: logger.info('Contour plot has few points...going discrete...') bit = self.density_df.query(query) n = len(bit) Z = bit[ps[1]].to_numpy().reshape(n, 1) @ bit[ps[0]].to_numpy().reshape(1, n) X, Y = np.meshgrid(bit.index, bit.index) norm = mpl.colors.Normalize(vmin=-10, vmax=np.log10(np.max(Z.flat))) cm = mpl.cm.ScalarMappable(norm=norm, cmap=cmap) mapper = cm.to_rgba a13.scatter(x=X.flat, y=Y.flat, s=1000 * Z.flatten(), c=mapper(np.log10(Z.flat))) # edgecolor='C2', lw=1, facecolors='none') a13.set(xlim=[min_loss - (max_loss - min_loss) / 10, max_loss], ylim=[min_loss - (max_loss - min_loss) / 10, max_loss]) else: npts = np.arange(min_loss, max_loss, jump) bit = self.density_df.loc[npts, ps] n = len(bit) Z = bit[ps[1]].to_numpy().reshape(n, 1) @ bit[ps[0]].to_numpy().reshape(1, n) Z = Z / np.sum(Z) X, Y = np.meshgrid(bit.index, bit.index) if biv_log: z = np.log10(Z) mask = np.zeros_like(z) mask[z == -np.inf] = True mz = np.ma.array(z, mask=mask) cp = a13.contourf(X, Y, mz, levels=np.linspace(-17, 0, levels), cmap=cmap) if color_bar: cb = fig.colorbar(cp) cb.set_label('Log10(Density)') else: mask = np.zeros_like(Z) mask[Z < min_density] = True mz = np.ma.array(Z, mask=mask) cp = a13.contourf(X, Y, mz, levels=levels, cmap=cmap) if color_bar: cb = fig.colorbar(cp) cb.set_label('Density') a13.set(xlim=[min_loss, max_loss], ylim=[min_loss, max_loss]) # put in X+Y=c lines lines = np.arange(contour_scale / 4, 2 * contour_scale + 1, contour_scale / 4) logger.debug(f'Contour lines based on {contour_scale} gives {lines}') for x in lines: a13.plot([0, x], [x, 0], ls='solid', lw=.35, c='k', alpha=0.5, label=f'Sum = {x:,.0f}') a13.set(xlabel=f'Line {self.line_names[0]}', ylabel=f'Line {self.line_names[1]}', title=title, aspect=1) else: # kind == 'three' plot survival functions...bivariate plots elsewhere l1 = (1 - temp.cumsum()).plot(ax=a13, lw=1) l1.lines[-1].set(linewidth=1.5) a13.set(title='Survival Function') a13.legend() # kappa bit = self.density_df.loc[:xmax]. \ filter(regex=f'^exeqa_({self.line_name_pipe})$') bit = bit.iloc[:, sort_order] bit.rename(columns=self.short_renamer('exeqa')).replace(0, np.nan). \ sort_index(axis=1).iloc[:, sort_order].plot(ax=a21, lw=1) a21.set(title='$\\kappa_i(x)=E[X_i\\mid X=x]$') # ugg for ASTIN paper example # a21.set(xlim=[0,5], ylim=[0,5]) a21.set(xlim=[0, xmax], ylim=[0, xmax], aspect='equal') a21.legend(loc='upper left') # alpha and beta aug_df = self.augmented_df aug_df.filter(regex=f'exi_xgta_({self.line_name_pipe})'). \ rename(columns=self.short_renamer('exi_xgta')). \ sort_index(axis=1).plot(ylim=[-0.05, 1.05], ax=a22, lw=1) for ln, ls in zip(a22.lines, lss[1:]): ln.set_linestyle(ls) a22.legend() a22.set(xlim=[0, xmax], title='$\\alpha_i(x)=E[X_i/X\\mid X>x]$'); bit = aug_df.query(f'loss < {xmax}').filter(regex=f'exi_xgtag?_({self.line_name_pipe})') bit.rename(columns=self.short_renamer('exi_xgtag')). \ sort_index(axis=1).plot(ylim=[-0.05, 1.05], ax=a23) for i, l in enumerate(a23.lines[len(self.line_names):]): if l.get_label()[0:3] == 'exi': a23.lines[i].set(linewidth=2, ls=lss[1 + i]) l.set(color=f'C{i}', linestyle=lss[1 + i], linewidth=1, alpha=.5, label=None) a23.legend(loc='upper left'); a23.set(xlim=[0, xmax], title="$\\beta_i(x)=E_{Q}[X_i/X \\mid X> x]$"); aug_df.filter(regex='M.M').rename(columns=self.short_renamer('M.M')). \ sort_index(axis=1).iloc[:, sort_order].plot(ax=a32, lw=1) a32.set(xlim=[0, xmax], title='Margin density $M_i(x)$') aug_df.filter(regex='T.M').rename(columns=self.short_renamer('T.M')). \ sort_index(axis=1).iloc[:, sort_order].plot(ax=a42, lw=1) a42.set(xlim=[0, xmax], title='Margin $\\bar M_i(x)$'); # by line S, gS, aS, bgS adf = self.augmented_df.loc[:xmax] if kind == 'two': zipper = zip(range(2), sorted(self.line_names), [a31, a41]) else: zipper = zip(range(3), sorted(self.line_names), [a31, a41, a33]) for i, line, a in zipper: a.plot(adf.loss, adf.S, c=col_list[2], ls=lss[1], lw=1, alpha=0.5, label='$S$') a.plot(adf.loss, adf.gS, c=col_list[2], ls=lss[0], lw=1, alpha=0.5, label='$g(S)$') a.plot(adf.loss, adf.S * adf[f'exi_xgta_{line}'], c=col_list[i], ls=lss[1], lw=1, label=f'$\\alpha S$ {line}') a.plot(adf.loss, adf.gS * adf[f'exi_xgtag_{line}'], c=col_list[i], ls=lss[0], lw=1, label=f"$\\beta g(S)$ {line}") a.set(xlim=[0, ymax]) a.set(title=f'Line = {line}') a.legend() a.set(xlim=[0, ymax]) a.legend(loc='upper right') alpha = 0.05 if kind == 'two': # three mode this is used for the third line # a33 from profit segment plot # special ymax for last two plots ymax = ymax2 if ymax2 > 0 else self.q(p2) # if could have changed p2 = self.cdf(ymax) for cn, ln in enumerate(sort_order): line = sorted(self.line_names_ex)[ln] c = col_list[cn] s = lss[cn] # print(line, s) f1 = self.density_df[f'p_{line}'].cumsum() idx = (f1 < p2) * (f1 > 1.0 - p2) f1 = f1[idx] gf = 1 - self.distortion.g(1 - f1) x = self.density_df.loss[idx] a33.plot(gf, x, c=c, ls=s, lw=1, label=None) a33.plot(f1, x, ls=s, c=c, lw=1, label=None) # a33.plot(f1, x, ls=lss[1], c=c, lw=1, label=None) a33.fill_betweenx(x, gf, f1, color=c, alpha=alpha, label=line.title()) a33.set(ylim=[0, ymax], title='Stand-alone $M$') a33.legend(loc='upper left') # .set_visible(False) # a43 from natural profit segment plot # common extract for all lines lw, up = self.q(1 - p2), ymax bit = self.augmented_df.query(f' {lw} <= loss <= {up} ') F = bit[f'F'] gF = bit[f'gF'] # x = bit.loss for cn, ln in enumerate(sort_order): line = sorted(self.line_names_ex)[ln] c = col_list[cn] s = lss[cn] ser = bit[f'exeqa_{line}'] if kind == 'three': a43.plot(1 / (1 - F), ser, lw=1, ls=lss[1], c=c) a43.plot(1 / (1 - gF), ser, lw=1, c=c) a43.set(xlim=[1, 1e4], xscale='log') a43.fill_betweenx(ser, 1 / (1 - gF), 1 / (1 - F), color=c, alpha=alpha, lw=0.5, label=line.title()) else: a43.plot(F, ser, lw=1, ls=s, c=c) a43.plot(gF, ser, lw=1, ls=s, c=c) a43.fill_betweenx(ser, gF, F, color=c, alpha=alpha, lw=0.5, label=line.title()) a43.set(ylim=[0, ymax], title='Natural $M$') a43.legend(loc='upper left') # .set_visible(False) if legend_font: for ax in axs.flat: try: if ax is not a13: ax.legend(prop={'size': 7}) except: pass
[docs] def stand_alone_pricing_work(self, dist, p, kind, roe, S_calc='cumsum'): """ Apply dist to the individual lines of self, with capital standard determined by a, p, kind=VaR, TVaR, etc. Return usual data frame with L LR M P PQ Q ROE, and a Dist can be a distortion, traditional, or defaut pricing modes. For latter two you have to input an ROE. ROE not required for a distortion. :param self: a portfolio object :param dist: "traditional", "default", or a distortion (already calibrated) :param p: probability level for assets :param kind: var (or lower, upper), tvar or epd (note, p should be small for EPD, to pander, if p is large we use 1-p) :param roe: for traditional methods input roe :return: exhibit is copied and augmented with the stand-alone statistics from common_scripts.py """ assert S_calc in ('S', 'cumsum') var_dict = self.var_dict(p, kind=kind, total='total', snap=True) exhibit = pd.DataFrame(0.0, index=['L', 'LR', 'M', 'P', "PQ", 'Q', 'ROE'], columns=['sop']) def tidy_and_write(exhibit, ax, exa, prem): """ finish up calculation and store answer """ roe_ = (prem - exa) / (ax - prem) exhibit.loc[['L', 'LR', 'M', 'P', 'PQ', 'Q', 'ROE'], l] = \ (exa, exa / prem, prem - exa, prem, prem / (ax - prem), ax - prem, roe_) if dist == 'traditional - no default': # traditional roe method, no allowance for default method = dist d = roe / (1 + roe) v = 1 - d for l in self.line_names_ex: ax = var_dict[l] # no allowance for default exa = self.audit_df.at[l, 'EmpMean'] prem = v * exa + d * ax tidy_and_write(exhibit, ax, exa, prem) elif dist == 'traditional': # traditional but allowing for default method = dist d = roe / (1 + roe) v = 1 - d for l in self.line_names_ex: ax = var_dict[l] exa = self.density_df.loc[ax, f'lev_{l}'] prem = v * exa + d * ax tidy_and_write(exhibit, ax, exa, prem) else: # distortion method method = f'sa {str(dist)}' for l, ag in zip(self.line_names_ex, self.agg_list + [None]): # use built in apply distortion if ag is None: # total if S_calc == 'S': S = self.density_df.S else: # revised S = (1 - self.density_df['p_total'].cumsum()) # some dist return np others don't this converts to numpy... gS = pd.Series(dist.g(S), index=S.index) exag = gS.shift(1, fill_value=0).cumsum() * self.bs else: ag.apply_distortion(dist) exag = ag.density_df.exag ax = var_dict[l] exa = self.density_df.loc[ax, f'lev_{l}'] prem = exag.loc[ax] tidy_and_write(exhibit, ax, exa, prem) exhibit.loc['a'] = exhibit.loc['P'] + exhibit.loc['Q'] exhibit['sop'] = exhibit.filter(regex='[A-Z]').sum(axis=1) exhibit.loc['LR', 'sop'] = exhibit.loc['L', 'sop'] / exhibit.loc['P', 'sop'] exhibit.loc['ROE', 'sop'] = exhibit.loc['M', 'sop'] / (exhibit.loc['a', 'sop'] - exhibit.loc['P', 'sop']) exhibit.loc['PQ', 'sop'] = exhibit.loc['P', 'sop'] / exhibit.loc['Q', 'sop'] exhibit['method'] = method exhibit = exhibit.reset_index() exhibit = exhibit.set_index(['method', 'index']) exhibit.index.names = ['method', 'stat'] exhibit.columns.name = 'line' exhibit = exhibit.sort_index(axis=1) return exhibit
[docs] def stand_alone_pricing(self, dist, p=0, kind='var', S_calc='cumsum'): """ Run distortion pricing, use it to determine and ROE and then compute traditional and default pricing, then consolidate the answer :param self: :param roe: :param p: :param kind: :return: from common_scripts.py """ assert isinstance(dist, (Distortion, list)) if type(dist) != list: dist = [dist] ex1s = [] for d in dist: ex1s.append(self.stand_alone_pricing_work(d, p=p, kind=kind, roe=0, S_calc=S_calc)) if len(ex1s) == 1: roe = ex1s[0].at[(f'sa {str(d)}', 'ROE'), 'total'] ex2 = self.stand_alone_pricing_work('traditional - no default', p=p, kind=kind, roe=roe, S_calc=S_calc) ex3 = self.stand_alone_pricing_work('traditional', p=p, kind=kind, roe=roe, S_calc=S_calc) return pd.concat(ex1s + [ex2, ex3])
[docs] def calibrate_blends(self, a, premium, s_values, gs_values=None, spread_values=None, debug=False): """ Input s values and gs values or (market) yield or spread. A bond with prob s (small) of default is quoted with a yield (to maturity) of r over risk free (e.g., a cat bond spread, or a corporate bond spread over the appropriate Treasury). As a discount bond, the price is v = 1 - d. B(s) = bid price for 1(U<s) (bond residual value) A(s) = ask price for 1(U<s) (insurance policy) By no arb A(s) + B(1-s) = 1. By definition g(s) = A(s) (using LI so the particular U doesn't matter. Applied to U = F(X)). Let v = 1 / (1 + r) and d = 1 - v be the usual theory of interest quantities. Hence B(1-s) = v = 1 - A(s) = 1 - g(s) and therefore g(s) = 1 - v = d. The rate of risk discount δ and risk discount factor (nu) ν are defined so that B(1-s) = ν * (1 - s), it is the extra discount applied to the actuarial value that is bid for the bond. It is a function of s. Therefore ν = (1 - d) / (1 - s) = price of bond / actuarial value of payment. Then, g(s) = 1 - B(1-s) = 1 - ν (1 - s) = ν s + δ. Thus, if return (i.e., market yield spreads) are input, they convert to discount factors to define g points. Blend can be defined by extrapolating the last points in a credit curve. If that fails, increase the return on the highest s point and fill in with a constant return to 1. The ROE on the investment is not the promised return, because the latter does not allow for default. Set up to be a function of the Portfolio = self. Calibrated to hit premium at asset level a. a must be in the index. a = self.pricing_summary.at['a', kind] premium = self.pricing_summary.at['P', kind] method = extend or roe Input blend_d0 is the Book's blend, with roe above the equity point blend_d is calibrated to the same premium as the other distortions method = extend if f_blend_extend or ccoc ccoc = pick and equity point and back into its required roe. Results in a poor fit to the calibration data extend = extrapolate out the last slope from calibrtion data Initially tried interpolating the bond yield curve up, but that doesn't work. (The slope is too flat and it interpolates too far. Does not look like a blend distortion.) Now, adding the next point off the credit yield curve as the "equity" point and solving for ROE. If debug, returns more output, for diagnostics. """ global logger # corresponding gs values to s_values if gs_values is None: gs_values = 1 - 1 / (1 + np.array(spread_values)) # figure out the convex hull points out of input s, gs s_values, gs_values = convex_points(s_values, gs_values) if len(s_values) < 4: raise ValueError('Input s,gs points do not generate enough separate points.') # calibration prefob to compute rho df = self.density_df bs = self.bs # survival function points S = (1 - df.p_total[0:a-bs].cumsum()) def pricer(g): nonlocal S, bs return np.sum(g(S)) * bs # figure the four extreme values: # extrapolate or interp to 0 and l or r end # it appears you need to do this extra step to lock in the parameters. def make_g(s, gs): i = interp1d(s, gs, bounds_error=False, fill_value='extrapolate') def f(x): return np.minimum(1, i(x)) return f ans = [] dists = {} for left in ['e', '0']: for right in ['e', '1']: s = s_values.copy() gs = gs_values.copy() if left == 'e': s = s[1:] gs = gs[1:] if right == 'e': s = s[:-1] gs = gs[:-1] dists[(left, right)] = make_g(s, gs) new_p = pricer(dists[(left, right)]) ans.append((left, right, new_p, new_p > premium)) # horrible but small and short... df = pd.DataFrame(ans, columns=['left', 'right', 'premium', 'gt']) wts = {} wdists = {} for i, j in [(0,1), (0,2), (0,3), (1,2), (1,3), (2,3)]: pi = df.iat[i, 2] pj = df.iat[j, 2] if min(pi, pj) <= premium <= max(pi, pj): il = df.iat[i, 0] ir = df.iat[i, 1] jl = df.iat[j, 0] jr = df.iat[j, 1] w = (premium - pj) / (pi - pj) wts[(il, ir, jl, jr)] = w # feed into a dummy distortion temp = Distortion('ph', .599099) temp.name = 'blend' # temp.display_name = f'Extension ({il}, {ir}), ({jl}, {jr})' temp.g = lambda x: ( w * dists[(il, ir)](x) + (1 - w) * dists[(jl, jr)](x)) temp.g_inv = None wdists[(il, ir, jl, jr)] = temp if len(wdists) == 0: # failed to extrapolate, but still want a reasonable blend logger.warning('Failed to fit blend') # TODO placeholder - FIX! wdists[0] = Distortion('ph', .599099) wdists[0].name = 'blend' if debug is True: return wdists, df, pricer, dists, wts else: return wdists
[docs] def bodoff(self, *, p=0.99, a=0): """ Determine Bodoff layer asset allocation at asset level a or VaR percentile p, one of which must be provided. Uses formula 14.42 on p. 284 of Pricing Insurance Risk. :param p: VaR percentile :param a: asset level :return: Bodoff layer asset allocation by unit """ if p==0 and a==0: raise ValueError('Must provide either p or a') if p > 0: a = self.q(p) ans = self.density_df.filter(regex='exi_xgta_') \ .loc[:a - self.bs, :].sum() * self.bs ans = ans.to_frame().T ans.index = [a] ans.index.name = 'a' ans = ans.drop(columns='exi_xgta_sum') ans.columns = [i.replace('exi_xgta_', '') for i in ans.columns] return ans
[docs] def sample(self, n, replace=True, desired_correlation=None, keep_total=True): """ Pull multivariate sample. Apply Iman Conover to induce correlation if required. """ df = pd.DataFrame(index=range(n)) for c in self.line_names: pc = f'p_{c}' df[c] = self.density_df[['loss', pc]].\ query(f'`{pc}` > 0').\ sample(n, replace=replace, weights=pc, ignore_index=True, random_state=ar.RANDOM).\ drop(columns=pc) if desired_correlation is not None: df = iman_conover(df, desired_correlation) else: df['total'] = df.sum(axis=1) df = df.set_index('total', drop=not keep_total) df = df.reset_index(drop=True) return df
@property def unit_names(self): # what these should have been called! return self.line_names @property def unit_names_ex(self): # what these should have been called! return self.line_names_ex @property def n_units(self): return len(self.line_names)
[docs]def check01(s): """ add 0 1 at start end """ if 0 not in s: s = np.hstack((0, s)) if 1 not in s: s = np.hstack((s, 1)) return s
[docs]def make_array(s, gs): """ convert to np array and pad with 0 1 """ s = np.array(s) gs = np.array(gs) s = check01(s) gs = check01(gs) return np.array((s, gs)).T
[docs]def convex_points(s, gs): """ Extract the points that make the convex envelope, including 0 1 Testers:: %%sf 1 1 5 5 s_values, gs_values = [.001,.0011, .002,.003, 0.005, .008, .01], [0.002,.02, .03, .035, 0.036, .045, 0.05] s_values, gs_values = [.001, .002,.003, .009, .011, 1], [0.02, .03, .035, .05, 0.05, 1] s_values, gs_values = [.001, .002,.003, .009, .01, 1], [0.02, .03, .035, .0351, 0.05, 1] s_values, gs_values = [0.01, 0.04], [0.03, 0.07] points = make_array(s_values, gs_values) ax.plot(points[:, 0], points[:, 1], 'x') s_values, gs_values = convex_points(s_values, gs_values) ax.plot(s_values, gs_values, 'r+') ax.set(xlim=[-0.0025, .1], ylim=[-0.0025, .1]) hull = ConvexHull(points) for simplex in hull.simplices: ax.plot(points[simplex, 0], points[simplex, 1], 'k-', lw=.25) """ points = make_array(s, gs) hull = ConvexHull(points) hv = hull.vertices[::-1] hv = np.roll(hv, -np.argmin(hv)) return points[hv, :].T
[docs]def make_awkward(log2, scale=False): """ Decompose a uniform random variable on range(2**log2) into two parts using Eamonn Long's base 4 method. Usage: :: awk = make_awkward(16) awk.density_df.filter(regex='p_[ABt]').cumsum().plot() awk.density_df.filter(regex='exeqa_[AB]|loss').plot() """ n = 1 << (log2 // 2) sc = 1 << log2 xs = [int(bin(i)[2:], 4) for i in range(n)] ys = [2 * i for i in xs] ps = [1 / n] * n if scale is True: xs = np.array(xs) / sc ys = np.array(ys) / sc A = Aggregate('A', exp_en=1, sev_name='dhistogram', sev_xs=xs, sev_ps=ps, freq_name='empirical', freq_a=np.array([1]), freq_b=np.array([1])) B = Aggregate('B', exp_en=1, sev_name='dhistogram', sev_xs=ys, sev_ps=ps, freq_name='empirical', freq_a=np.array([1]), freq_b=np.array([1])) awk = Portfolio('awkward', [A, B]) awk.update(log2, 1/sc if scale else 1, remove_fuzz=True, padding=0) return awk