Source code for aggregate.extensions.case_studies

# History
# Removed hacks aimed at reproducing book exhibits (color, roe hack, blend fix etc.) these are
# now all applied by default. Set up for new syntax case specification.
# gridoff removed and grid lines cut out.
# Integrates code from common_header, common_scripts, and hack

import aggregate as agg
from aggregate import Aggregate, round_bucket, make_mosaic_figure, FigureManager, Bounds, plot_max_min
import argparse
from collections import OrderedDict
from datetime import datetime
from inspect import signature
from itertools import product
from jinja2 import Environment, FileSystemLoader
import json
import logging
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib as mpl
from matplotlib.colors import ListedColormap
from numbers import Number
import numpy as np
import os
import pandas as pd
from pandas.io.formats.format import EngFormatter
from pathlib import Path
from platform import platform
import psutil
import re
from scipy.integrate import cumtrapz
import shlex
from subprocess import Popen
from titlecase import titlecase as title
import webbrowser

from IPython.display import HTML, display

from .. constants import *

# general set up
pd.set_option("display.float_format", EngFormatter(3, True))
pd.set_option('display.max_rows', 500)

# get the logger
logger = logging.getLogger('aggregate.case_studies')

# up to the user to deal with warnings
# logging.captureWarnings(True)  -> all warnings emitted by the warnings module
# will automatically be logged at level WARNING
# warnings.filterwarnings('ignore', category=UserWarning)
# warnings.filterwarnings('ignore', category=RuntimeWarning)
# warnings.filterwarnings('ignore', category=pd.core.common.SettingWithCopyWarning)
# warnings.filterwarnings('ignore', category=pd.errors.PerformanceWarning)
# warnings.simplefilter(action='ignore', category=FutureWarning)

# utilities
stat_renamer = {'L': 'Expected Loss', 'P': "Premium", 'LR': 'Loss Ratio',
                'M': "Margin", 'PQ': 'Leverage', 'Q': 'Capital', 'ROE': "ROE", 'a': 'Assets',
                'rp': "Return Period", 'epdr': 'EPD Ratio', 'EPDR': 'EPD Ratio'}


[docs]def add_defaults(dict_in, kind='agg'): """ add default values to dict_inin. Leave existing values unchanged Used to output to a data frame, where you want all columns completed :param dict_in: :param kind: :return: """ print('running add_defaults\n' * 10) # use inspect to get the defaults # obtain signature sig = signature(Aggregate.__init__) # self and name --> bound signature bs = sig.bind(None, '') bs.apply_defaults() # remove self bs.arguments.pop('self') defaults = bs.arguments if kind == 'agg': defaults.update(dict_in) elif kind == 'sev': for k, v in defaults.items(): if k[0:3] == 'sev' and k not in dict_in and k != 'sev_wt': dict_in[k] = v
def distortion_namer(x): if len(x) < 4: return x x = x.split(',')[0] x = x[3:] x = {'Wang-normal': "Wang", "Proportional Hazard": "PH", "Constant ROE": "CCoC", "roe": 'CCoC', "blend": "Blend", "Tail VaR": "TVaR", "Dual Moment": "Dual", "Blend": "Blend"}.get(x, x) return x
[docs]def universal_renamer(x): """ """ abbreviations = dict(LR='Loss Ratio', L='Loss', P='Premium', M='Margin', Q='Capital', a='Assets', PQ='Leverage', ROE='Rate of Return', MM='Miller-Modigliani', ER='Expense Ratio', EL='Expected Loss', CR='Combined Ratio', RP='Return Period', EmpMean='Net', HS='Hu-SCS') # expand pure abbreviations, only do one replacement if isinstance(x, Number): return x x = abbreviations.get(x, x) # de underscore x = x.replace('p_', '') x = x.replace('_', ' ') # stip off starting T. and V. x = re.sub(r'^([TMV])\.', lambda m: { 'M': 'Marginal ', 'T': 'Primary ', 'V': 'Excess '}.get(m[1]), x) # sensible title case x = title(x) # deal with embedded abbreviations (e.g. plan lr -> Plan Lr or Plan ROE -> Plan Roe) x = re.sub(r'\b(Roe|Cv|Lr|Rol|Uw|Epd|Ph|Hs)\b', lambda x: x[1].upper(), x) x = x.replace('Scaled', 'Scaled ') x = x.replace('Sop', 'SoP') x = x.replace('EqRisk', 'Equal Risk ') x = x.replace('MerPer', 'Merton-Perold ') return x
[docs]def urn(df): """ apply universal renamer """ return df.rename(index=universal_renamer).rename(columns=universal_renamer)
class CaseStudy(object): _stats_ = ['L', 'M', 'P', 'LR', 'Q', 'ROE', 'PQ', 'a'] _dist_ = ['EL', 'Dist ccoc', 'Dist ph', 'Dist wang', 'Dist dual', 'Dist tvar', 'Dist blend', ] _classical_ = ['EL', 'ScaledEPD', 'ScaledTVaR', 'ScaledVaR', 'EqRiskEPD', 'EqRiskTVaR', 'EqRiskVaR', 'coTVaR', 'covar'] def __init__(self): """ Create an empty CaseStudy. Use ``factory`` to populate. """ # variables set in other functions self.gs_values = None self.s_values = None self.blend_distortions = None self.blend_d = None self.roe_d = None self.dist_dict = None self.uw = agg.Underwriter(update=False) self.tab13_1 = None self.sop = None self.lrs = None self.classic_pricing = None self.lns = None self.prices = None self.dgross = None self.dnet = None self.distortion_pricing = None self.classical_pricing = None self.summaries = None self.cap_table = None self.cap_table_total = None self.cap_table_marginal = None self.debt_stats = None self.walk = None self.progression = None self.p_stars = None self.diff_g = None self.exeqa = None self.boundss = None self.ad_compss = None self.case_id = "" self.case_name = "" self.case_description = "" self.a_name = "" self.b_name = "" self.a_distribution = "" self.b_distribution = "" self.b_distribution_gross = "" self.b_distribution_net = "" self.re_line = "" self.re_type = "" self.re_description = '' self.reg_p = 0. self.roe = 0. self.d2tc = 0. self.f_discrete = False self.bs = 0. self.log2 = 0 self.padding = 1 # graphics defaults self.fw = FIG_W self.fh = FIG_H color_mode = 'color' cycle_mode = 'c' self.smfig = FigureManager(cycle=cycle_mode, color_mode=color_mode, font_size=10, legend_font='small', default_figsize=(self.fw, self.fh)) self.colormap = 'viridis' self.figure_bg_color = 'aliceblue' plt.rcParams["axes.facecolor"] = 'lightsteelblue' plt.rc('legend', fc='lightsteelblue', ec='lightsteelblue') # discounting and return self.v = 0. self.d = 0. self.gross = None self.net = None self.ports = None self.pricings = None self.re_summary = None # graphics and presentation self.show = True self.figtype = 'png' # output related self.cache_base = Path.home() / 'aggregate/cases' self.cache_base.mkdir(exist_ok=True) self.cache_dir = None def to_json(self): """ Persist to json in file aggregate/cases/case_id.json :param fn: :return: """ json.dump(self.to_dict(), (Path.home() / f'aggregate/cases/{self.case_id}.json'). open('w', encoding='utf-8')) def to_dict(self): """ Definition to dictionary :return: """ ans = {} # use inspect to get the arguments sig = signature(self.factory) for k in sig.parameters.keys(): ob = getattr(self, k) # these are not json serializable?!! # TODO is this the best? if isinstance(ob, np.ndarray): ob = list(ob) ans[k] = ob return ans def read_json(self, fn): """ Load from json object. Files in aggregate/cases. :param fn: :return: """ args = json.load((Path.home() / f'aggregate/cases/{fn}.json').open('r', encoding='utf=8')) self.factory(**args) @staticmethod def list(): """ List cases with stored json files. :return: """ for p in (Path.home() / 'aggregate/cases').glob('*.json'): print(p) def factory(self, case_id, case_name, case_description, a_distribution, b_distribution_gross, b_distribution_net, reg_p, roe, d2tc, f_discrete, s_values, gs_values, bs, log2, padding): """ Create CaseStudy from case_id and all arguments in generic format with explicit reinsurance. The reinsured unit is always B. Once created, run ``self.full_monty()s to create all exhibits and ``self.browse_exhibits()` to view them. :param case_id: unique id for case, must be acceptable as a filename :param case_name: name of case :param case_description: description of case :param a_distribution: DecL program for unit A :param b_distribution_gross: DecL program for unit B gross of reinsurance :param b_distribution_net: DecL program for unit B net of reinsurance :param reg_p: regulatory capital probability threshold :param roe: target cost of capital :param d2tc: debt to total capital limit, for better blend distortion :param f_discrete: True if the output is a discrete distribution :param s_values: list of s values used to calibrate the blended distributions. They correspond to the return period and prices for cat bonds with small s values. :param gs_values: list of g(s) values :param bs: bin size for discrete distributions :param log2: log2 of the number of bins for discrete distributions :param padding: for update. """ self.case_id = case_id # originally option_id self.case_name = case_name self.case_description = case_description # DecL programs self.a_distribution = a_distribution self.b_distribution_gross = b_distribution_gross self.b_distribution_net = b_distribution_net # derivable self.a_name = a_distribution.split(' ')[1] self.b_name = b_distribution_gross.split(' ')[1] self.re_line = self.b_name self.reg_p = reg_p self.roe = roe self.v = 1 / (1 + self.roe) self.d = 1 - self.v self.d2tc = d2tc self.f_discrete = f_discrete self.blend_distortions = None self.s_values = s_values self.gs_values = gs_values self.log2 = log2 self.bs = bs self.padding = padding # new style output from uw as list of Answer(...) object out = self.uw.write(f'port Gross_{self.case_id} {self.a_distribution} {self.b_distribution_gross}') self.gross = out[0].object # sort out better bucket if self.bs == 0 or np.isnan(self.bs): self.bs = self.gross.best_bucket(self.log2) self.gross.update(log2=self.log2, bs=self.bs, padding=self.padding, remove_fuzz=True) out = self.uw.write(f'port Net_{self.case_id} {self.a_distribution} {self.b_distribution_net}') self.net = out[0].object self.net.update(log2=self.log2, bs=self.bs, remove_fuzz=True) self.ports = OrderedDict(gross=self.gross, net=self.net) self.pricings = OrderedDict() self.pricings['gross'] = pricing(self.gross, reg_p, roe) self.pricings['net'] = pricing(self.net, reg_p, roe) # are these used? self.re_type = self.net[self.re_line].reinsurance_kinds() self.re_description = self.net[self.re_line].reinsurance_description(kind='both') # TODO replace! Output as Table G in Ch 7 # self.re_summary = pd.DataFrame([self.re_line, self.re_type, self.re_attach_p, # self.re_attach, self.re_detach_p, # self.re_detach - self.re_attach], index=[ # 'Reinsured Line', 'Reinsurance Type', 'Attachment Probability', 'Attachment', 'Exhaustion Probability', # 'Limit'], columns=[self.case_id]) # self.re_summary.index.name = 'item' # cap table needs pricing_summary self.make_audit_exhibits() # set up the common distortions # make the cap table logger.info('Calibrating blend distortions') self.make_cap_table() prem = self.pricings['gross'].at['P', 'total'] a = self.pricings['gross'].at['a', 'total'] self.blend_distortions = self.gross.calibrate_blends(a, prem, s_values, gs_values, debug=False) self.roe_d = self.approx_roe(e=1e-10) self.dist_dict = OrderedDict(ccoc=self.roe_d, **self.blend_distortions) k = list(self.blend_distortions.keys())[0] self.blend_d = self.blend_distortions[k] self.cache_dir = self.cache_base / f'{self.case_id}' self.cache_dir.mkdir(exist_ok=True) def __repr__(self): return f'''Case Study object {self.case_id} @ {self.cache_dir} ({super().__repr__()}) Portfolios: {self.gross.name} (EL={self.gross.agg_m:.2f}) and {self.net.name} ({self.net.est_m:.2f}). Lines: {", ".join(self.gross.line_names)} (ELs={", ".join([f"{a.agg_m:.2f}" for a in self.gross])}). ''' def browse_exhibits(self): # book exhibits webbrowser.open(Path.home() / f'aggregate/cases/{self.case_id}_book.html') # extended exhibits webbrowser.open(Path.home() / f'aggregate/cases/{self.case_id}_extended.html') def full_monty(self): """ All updating and exhibit generation. No output. For use with command line. :param self: :return: """ logger.info('Start Full Monty Update') self.make_all() process_memory() logger.info('display exhibits') self.show = False self.show_exhibits('all') # process_memory() self.show_extended_exhibits() process_memory() logger.info('create graphics') self.show_graphs('all') process_memory() self.show_extended_graphs() process_memory() logger.info(f'{self.case_id} computed') # save the results mrr = ManualRenderResults(self) mrr.render_custom('[A-Y]', suffix='book') mrr.render_custom('Z*', suffix='extended') logger.info(f'{self.case_id} saved to HTML...complete!') def approx_roe(self, e=1e-15): """ Make an approximation to the ccoc distortion with no mass, using a slope from ``e`` to 0. :param e: :return: """ aroe = pd.DataFrame({'col_x': [0, e, 1], 'col_y': [0, self.v * e + self.d, 1]}) approx_roe_di = agg.Distortion('convex', None, None, df=aroe, col_x='col_x', col_y='col_y', display_name='ccoc') return approx_roe_di def make_audit_exhibits(self): # Chapter 7 Case Study Intro Exhibits # 1. MVSK # 2. VaR, TVaR, EPD, sop vs. total # 3. Density plots, linear and log scale # 4. Bivariate density plot # 5. TVaR and VaR Graphs cols = ['EmpMean', 'EmpCV', 'EmpSkew', 'EmpKurt', 'P90.0', 'P95.0', 'P99.0', 'P99.6', 'P99.9'] audit_all = pd.concat((port.audit_df.T.loc[cols] for port in self.ports.values( )), keys=self.ports.keys(), names=['view', 'line'], axis=1) audit_all.index.name = 'statistic' for k, port in self.ports.items(): p = 0.975 vd = port.var_dict(p, 'lower') s = pd.Series(list(vd.values()), index=[(k, j) for j in vd.keys()]) audit_all.loc[f'VaR {100 * p:.1f}', s.index] = s vd = port.var_dict(p, 'tvar') s = pd.Series(list(vd.values()), index=[(k, j) for j in vd.keys()]) audit_all.loc[f'TVaR {100 * p:.1f}', s.index] = s for p in [.9, .95, 0.975, .99, .996, .999]: vd = port.var_dict(p, 'tvar') s = pd.Series(list(vd.values()), index=[(k, j) for j in vd.keys()]) audit_all.loc[f'TVaR {100 * p:.1f}', s.index] = s vd = port.var_dict(1 - p, 'epd') s = pd.Series(list(vd.values()), index=[(k, j) for j in vd.keys()]) audit_all.loc[f'EPD {100 * (1 - p):.1f}', s.index] = s audit_all = audit_all.rename( index={f'P{i}': f'VaR {i:.1f}' for i in [ 90., 95., 0.975, 99., 99.6, 99.9]} ).rename( index=dict(EmpMean='Mean', EmpCV='CV', EmpSkew='Skewness', EmpKurt='Kurtosis')) bit = audit_all.filter(regex='[RE]', axis=0).copy() if len(self.gross.line_names) == 2: ln1, ln2 = self.gross.line_names bit[('gross', 'sum')] = bit[('gross', ln1)] + bit[('gross', ln2)] bit[('net', 'sum')] = bit[('net', ln1)] + bit[('net', ln2)] bit[('gross', 'benefit')] = bit[('gross', 'sum')] / \ bit[('gross', 'total')] - 1 bit[('net', 'benefit')] = bit[('net', 'sum')] / bit[('net', 'total')] - 1 bit = bit.sort_index(axis=0) bit = bit.sort_index(axis=1) bit = bit.iloc[[12, 13, 14, 15, 16, 17, 6, 7, 8, 9, 10, 11, 3, 5, 4, 2, 1, 0], :] pricing_summary = pd.concat(self.pricings.values(), keys=self.pricings.keys(), names=['portfolio', 'xx'], axis=1).droplevel(1, axis=1) self.audit_all = audit_all self.audit_var_tvar = bit self.pricing_summary = pricing_summary # return audit_all, bit, pricing_summary @classmethod def _display_work(cls, exhibit_id, df, caption, ff=None, save=True, cache_dir=None, show=False, align='right'): """ Allow calling without creating an object, e.g. to consistently format the _cases_ database additional_properties is a list of selector, property pairs E.g. [(col-list, dict-of-properties)] :param exhibit_id: :param df: :param caption: :param ff: :param save: :return: """ # revised set: used in agg, greys look better cell_hover = { 'selector': 'td:hover', 'props': [('background-color', '#ffffb3')] } index_names = { 'selector': '.index_name', 'props': 'font-style: italic; color: white; background-color: #777777; ' 'font-weight:bold; border: 1px solid white; text-transform: capitalize; ' 'text-align:left;' } headers = { 'selector': 'th:not(.index_name)', 'props': 'background-color: #DDDDDD; color: black; border: 1px solid #ffffff;' } center_heading = { 'selector': 'th.col_heading', 'props': 'text-align: center;' } left_index = { 'selector': '.row_heading', 'props': 'text-align: left;' } # note: this is only difference with generic aggregate method: td = { 'selector': 'td', 'props': f'text-align: {align};' } all_styles = [cell_hover, index_names, headers, center_heading, left_index, td] # do the styling styled_df = df.style.set_table_styles(all_styles) if exhibit_id != '': caption = f'({exhibit_id}) {caption}' styled_df = styled_df.format(ff).set_caption(caption) # .set_table_attributes(f'id={exhibit_id}') if show is True: # ? and save is True: display(styled_df) if save is True: styled_df.to_html(buf=Path(cache_dir / f'{exhibit_id}.html')) # shut up already... # process_memory() return styled_df def _display(self, exhibit_id, df, caption, ff=None, save=True): """ caption = string caption ff = float format function """ # figure the optimal by column formatting fc = lambda x: f'{x:,.1f}' f3 = lambda x: f'{x:.3f}' f5g = lambda x: f'{x:.5g}' # fp = lambda x: f'{x:.1%}' # guess sensible defaults if ff is None: fmts = {} for n, r in df.agg([np.mean, np.min, np.max, np.std]).T.iterrows(): if r.amax < 1 and r.amin < 1e-4 and r.amin > 0: fmts[n] = f5g elif r.amax < 1 and r.amin > 1e-4: fmts[n] = f3 elif r.amax < 10 and r.amin >= 0: fmts[n] = f3 else: # was fc, but none of these numbers is that large fmts[n] = f3 ff = fmts return self._display_work(exhibit_id, df, f'<div id="{exhibit_id}" /> ' + caption, ff, save, self.cache_dir, self.show) # return self._display_work(exhibit_id, df, caption, ff, save, self.cache_dir, self.show) logger.info(f'Exhibit {exhibit_id} processed') def show_exhibits(self, *chapters): if chapters[0] == 'all': chapters = [2, 4, 7, 9, 11, 13, 15] # ================================================================================== # ================================================================================== if 2 in chapters: if self.show: display(HTML('<hr><h2>Chapter 2: Case Study Intro</h2>')) ff = lambda x: f'{x:,.3f}' # table_no = [2.5, 2.6, 2.7][self.case_number] # caption = f'Table {table_no}: ' caption = f"""{self.case_name} estimated mean, CV, skewness and kurtosis by line and in total, gross and net. {self.re_description} applied to {self.re_line}.""" self._display("A", urn(self.audit_all.iloc[:4]), caption, ff) # ================================================================================== # ================================================================================== if 4 in chapters: if self.show: display(HTML('<hr><h2>Chapter 4: Risk Measures, VaR and TVaR</h2>')) ff = lambda x: f'{x:,.1f}' if abs(x) > 1 else f'{x:.3g}' # table_no = [4.6, 4.7, 4.8][self.case_number] # caption = f'Table {table_no}: ' caption = f"""{self.case_name} estimated VaR, TVaR and EPD by line and in total, gross and net. EPD shows assets required for indicated EPD percentage. Sum column shows sum of parts by line with no diversification and benefit shows percentage reduction compared to total. {self.re_description} applied to {self.re_line}.""" self._display("E", urn(self.audit_var_tvar), caption, ff) # ================================================================================== # ================================================================================== if 7 in chapters: if self.show: display(HTML('<hr><h2>Chapter 7: Guide to Practice Chapters</h2>')) # table_no = 7.2 # caption = f'Table {table_no}: ' caption = f"""Pricing summary for {self.case_name} using a a {self.reg_p} capital standard and {self.roe:.1%} constant cost of capital for all layers.""" self._display("F", urn(self.pricing_summary), caption, lambda x: f'{x:,.3f}') if self.show: display(HTML('<hr>')) # table_no = 7.3 # caption = f'Table {table_no}: ' # TODO come up with a replacement reinsurance using reinsurance_audit_df # caption = f"""Reinsurance summary for {self.case_name}.""" # self._display("G", urn(self.re_summary), caption, None) # ================================================================================== # ================================================================================== if 9 in chapters: if self.show: display(HTML('<hr><h2>Chapter 9: Classical Portfolio Pricing</h2>')) # table_no = [9.2, 9.5, 9.8][self.case_number] # caption = f'Table {table_no}: ' caption = f"""Classical pricing by method for {self.case_name}. Pricing calibrated to total gross portfolio and applied to each line on a stand-alone basis. Sorted by gross premium for {self.lns[1]}.""" ff = lambda x: '' if np.isnan(x) else (f'{x:,.3f}' if abs(x) < 5 else f'{x:,.1f}') self._display("H", urn(self.classic_pricing), caption, ff) # table_no = [9.3, 9.6, 9.9][self.case_number] # caption = f'Table {table_no}: ' caption = f"""Sum of parts (SoP) stand-alone vs. diversified classical pricing by method for {self.case_name}. Delta columns show the difference.""" if self.show: display(HTML('<hr>')) self._display("I", urn(self.sop), caption, ff) if self.show: display(HTML('<hr>')) # table_no = [9.4, 9.7, 9.10][self.case_number] # caption = f'Table {table_no}: ' caption = f"""Implied loss ratios from classical pricing by method for {self.case_name}. Pricing calibrated to total gross portfolio and applied to each line on a stand-alone basis.""" self._display("J", urn(self.lrs), caption, ff) if self.show: display(HTML('<hr>')) # table_no = 9.11 # caption = f'Table {table_no}: ' caption = f"""Comparison of stand-alone and sum of parts (SoP) premium for {self.case_name}.""" tab911 = self.modern_monoline_sa.loc[ [('No Default', 'Loss'), ('No Default', 'Premium'), ('No Default', 'Capital'), ('With Default', 'Loss'), ('With Default', 'Premium'), ('With Default', 'Capital')]].iloc[:, [2, 3, 5, 6]] tab911.columns = ['Gross SoP', 'Gross Total', 'Net SoP', 'Net Total'] tab911['Gross Redn'] = tab911['Gross Total'] / tab911['Gross SoP'] - 1 tab911['Net Redn'] = tab911['Net Total'] / tab911['Net SoP'] - 1 tab911 = tab911.iloc[:, [0, 1, 4, 2, 3, 5]] self._display("K", tab911, caption, lambda x: f'{x:,.1f}' if x > 10 else f'{x:.1%}') if self.show: display(HTML('<hr>')) # table_no = [9.12, 9.13, 9.14][self.case_number] # caption = f'Table {table_no}: ' caption = f"""Constant CoC pricing by unit for {self.case_name}, with {self.roe} cost of capital and p={self.reg_p}. The column sop shows the sum by unit. {self.re_description} All units produce the same rate of return, by construction.""" self._display("L", urn(self.modern_monoline_sa), caption, ff) # lambda x: f'{x:.4g}') # ================================================================================== # ================================================================================== if 11 in chapters: if self.show: display(HTML('<hr><h2>Chapter 11: Modern Portfolio Pricing</h2>')) # table_no = 11.5 # caption = f'Table {table_no}: ' caption = f"""Parameter estimates for the five base spectral risk measures.""" tab115 = self.gross.dist_ans.droplevel((0, 1), axis=0).loc[ ['ccoc', 'ph', 'wang', 'dual', 'tvar'], ['param', 'error', '$P$', '$K$', 'ROE', '$S$']]. \ rename(columns={'$P$': 'P', '$K$': 'K', '$S$': 'S', 'ROE': 'ι'}) self._display("N", urn(tab115), caption, None) # table_no = [11.7, 11.8, 11.9][self.case_number] # caption = f'Table {table_no}: ' if self.show: display(HTML('<hr>')) caption = f"""Pricing by unit and distortion for {self.case_name}, calibrated to CCoC pricing with {self.roe} cost of capital and p={self.reg_p}. Losses and assets are the same for all distortions. The column sop shows sum of parts by unit, the difference with the total shows the impact of diversification. {self.re_description}""" self._display("Q", urn(self.modern_monoline_by_distortion), caption, None) # ================================================================================== # ================================================================================== if 13 in chapters: if self.show: display(HTML('<hr><h2>Chapter 13: Classical Pricing Allocation</h2>')) # caption = 'Table 13.1: ' caption = f"""Comparison of gross expected losses by line. Second column shows allocated recovery with total assets. Third column shows stand-alone limited expected value with stand-alone {self.reg_p}-VaR assets.""" self._display("R", self.tab13_1, caption, None) # table_no = [13.2, 13.3, 13.4][self.case_number] # caption = f'Table {table_no}: ' caption = f"""Constant {self.roe:.2f} ROE pricing for {self.case_name}, classical PCP methods.""" self._display("S", urn(self.classical_pricing), caption, None) # ================================================================================== # ================================================================================== if 15 in chapters: if self.show: display(HTML('<hr><h2>Chapter 15: Modern Pricing Allocation</h2>')) # table_no = [13.35, 13.36, 13.37][self.case_number] # caption = f'Table {table_no}: ' caption = f"""Constant {self.roe:.2f} ROE pricing for {self.case_name}, distortion, SRM methods.""" self._display("V", urn(self.distortion_pricing), caption, lambda x: f'{x:.2f}' if abs(x) > 5 else f'{x:.3f}') if self.show: display(HTML('<hr>')) # table_no = [15.38, 15.39, 15.40][self.case_number] # caption = f'Table {table_no}: ' caption = f"""{self.case_name} percentile layer of capital allocations compared to distortion allocations.""" self._display("Y", urn(self.bodoff_compare), caption, lambda x: f'{x:.4g}') def show_extended_exhibits(self): """ make the "extra" discrete and one-off exhibits handles creation (unlike exhibits....?) :param arvv: :return: """ if self.f_discrete is True: bit = self.make_discrete() caption = 'Tables 11.1 and 11.2: computing expected loss and premium, PH 0.5 distortion.' self._display("Z-11-1", bit, caption) # , self.default_float_format2) bit = self.gross.augmented_df.query('loss==80').filter(regex='^(exag?_total)$').copy() bit.index.name = 'a' bit.columns = ['Limited Loss', 'Premium'] caption = 'Table 11.3 outcomes with assets a=80 (answers only), PH 0.5 distortion.' self._display("Z-11-2", bit, caption) # , self.default_float_format2) bit = self.make_discrete('tvar') caption = 'Table 11.4 (Exercise): computing expected loss and premium, calibrated TVaR distortion.' self._display("Z-11-3", bit, caption) #, self.default_float_format2) bit = self.gross.augmented_df.query('loss==80').filter(regex='^(exag?_total)$').copy() bit.index.name = 'a' bit.columns = ['Limited Loss', 'Premium'] caption = 'Table (new) outcomes with assets a=80 (answers only), TVaR distortion.' self._display("Z-11-4", bit, caption) # , self.default_float_format2) else: # progress of benefits - always show self.make_progression() caption = f'Table (new): Premium stand-alone by unit, sum, and total, and natural allocation by distortion.' self._display('Z-15-1', urn(self.progression.xs('Premium', axis=0, level=1)['gross']), caption) f1 = lambda x: f'{x:.3f}' fp = lambda x: f'{x:.0%}' fp1 = lambda x: f'{x:.1%}' ff = dict(zip(self.walk.columns, (f1, f1, f1, fp1, fp1, fp1, fp, fp, fp))) # TODO be explicit about the distortion caption = 'Table (new): Progression of premium benefit by distortion. ' \ 'Differences between allocated and stand-alone, ' \ 'the implied premium reduction, and the split by unit.' self._display('Z-15-2', self.walk, caption, ff) # cap table related for kind in ['gross', 'net']: self.make_cap_table(kind=kind) self.ports[kind].apply_distortion(self.blend_d) caption = f'Table (new): {kind.title()} asset tranching with S&P bond default rates and a {self.d2tc:.1%} debt to ' \ 'total capital limit.' self._display(f'Z-TR-{kind[0]}', self.cap_table, caption); self._display(f'Z-TRT-{kind[0]}', self.cap_table_total, f'Return by tranche, total {kind} view, blend distortion.'); self._display(f'Z-TRM-{kind[0]}', self.cap_table_marginal, f'Return by tranche, marginal {kind} view, blend distortion.'); self.show_tranching_graph(kind) def make_cap_table(self, kind='gross'): """ TODO: this should be a Portfolio method. Input should be a. prem and EL are computable given distortion has been applied. XXXX Don't want that... want to be able to apply without ref to augmented df Suggest reasonable debt tranching for kind=(gross|net) subject to self.d2tc debt to total capital limit. Uses S&P bond default analysis. Creates self.cap_table and self.debt_stats This is a cluster. There must be a better way... This version did all the difficult tranching...which is based on dubious info anyways... """ port = self.ports[kind] a, premium, el = self.pricing_summary.loc[['a', 'P', 'L'], kind] capital = a - premium debt = self.d2tc * capital equity = capital - debt debt_attach = a - debt prob_debt_attach = port.sf(debt_attach) cap_table = port.density_df.loc[[port.snap(i) for i in [a, premium+equity, premium, el]], ['loss', 'F', 'S']] cap_table['Amount'] = -np.diff(cap_table.loss, append=0) # add prob attaches cap_table['Pct Assets'] = cap_table.Amount / cap_table.iloc[0, 0] cap_table['Cumul Pct'] = cap_table.loss / cap_table.iloc[0, 0] cap_table['Attaches'] = cap_table.loss.shift(-1, fill_value=0) cap_table = cap_table.rename(columns={'loss': 'Exhausts', 'S': 'Pr Attaches'}) cap_table['Pr Attaches'] = cap_table['Pr Attaches'].shift(-1, fill_value=1) cap_table['Pr Exhausts'] = [port.sf(i) for i in cap_table.Exhausts] cap_table = cap_table[ ['Amount', 'Pct Assets', 'Attaches', 'Pr Attaches', 'Exhausts', 'Pr Exhausts', 'Cumul Pct']] cap_table.index = ['Debt', 'Equity', 'Premium', 'Loss'] cap_table.columns.name = 'Quantity' cap_table.index.name = 'Tranche' # make the total and incremental views if port.augmented_df is not None: # first call to create cap table is before any pricing.... total_renamer = {'F': 'Adequacy', 'capital': 'Capital', 'exa_total': 'Loss', 'exag_total': 'Premium', 'margin': 'Margin', 'lr': 'LR', 'coc': 'CoC', 'loss': 'Assets'} bit = port.augmented_df.loc[[port.snap(i) for i in cap_table.Exhausts], ['loss', 'F', 'exa_total', 'exag_total']].sort_index(ascending=False) bit['lr'] = bit.exa_total / bit.exag_total bit['margin'] = (bit.exag_total - bit.exa_total) bit['capital'] = bit.loss - bit.exag_total bit['coc'] = bit.margin / bit.capital bit['Discount'] = bit.coc / (1 + bit.coc) # leverage here does not make sense because of reserves bit.index = cap_table.index bit = bit.rename(columns=total_renamer) self.cap_table_total = bit marginal_renamer = { 'F': 'Adequacy', # that the loss is in the layer 'loss': 'Assets', 'exa_total': 'Loss', 'exag_total': 'Premium', 'margin': 'Margin', 'capital': 'Capital', 'lr': 'LR', 'coc': 'CoC', } bit = port.augmented_df.loc[[0] + [port.snap(i) for i in cap_table.Exhausts], ['loss', 'F', 'exa_total', 'exag_total']].sort_index(ascending=False) bit = bit.diff(-1).iloc[:-1] bit['lr'] = bit.exa_total / bit.exag_total bit['margin'] = (bit.exag_total - bit.exa_total) bit['capital'] = bit.loss - bit.exag_total bit['coc'] = bit.margin / bit.capital bit.index = self.cap_table_total.index bit.loc['Total', :] = bit.sum() bit.loc['Total', 'lr'] = bit.loc['Total', 'exa_total'] / bit.loc['Total', 'exag_total'] bit.loc['Total', 'coc'] = bit.loc['Total', 'margin'] / bit.loc['Total', 'capital'] bit['Discount'] = bit.coc / (1 + bit.coc) bit = bit.rename(columns=marginal_renamer) self.cap_table_marginal = bit # return ans, tranches, bit, spdf, cap_table # self.debt_stats = pd.Series( # [a, el, premium, capital, equity, debt, debt_attach, prob_debt_attach, attach_rating, exhaust_rating], # index=['a', 'EL', 'P', 'Capital', 'Equity', 'Debt', 'D_attach', 'Pr(D_attach)', 'Attach Rating', # 'Exhaust Rating']) self.cap_table = cap_table def make_discrete(self, distortion_name='default'): """ the extra exhibits for discrete cases (ch11) """ if self.f_discrete == 0: logger.error('Ch 11 extended exhibis for discrete-based only.') return if distortion_name == 'default': d = agg.Distortion('ph', 0.5) self.gross.apply_distortion(d) else: self.gross.apply_distortion(self.gross.dists[distortion_name]) bit = self.gross.augmented_df.query('p_total > 0').filter(regex='loss|p_t|S').copy() bit.index = range(1, len(bit) + 1) bit['X p'] = bit.loss * bit.p_total bit['X gp'] = bit.loss * bit.gp_total bit.columns = ['X', 'p', 'S', 'g(S)', 'Δg(S)', 'X p', 'X Δg(S)'] bit = bit.iloc[:, [0, 1, 2, -2, 3, 4, -1]] bit.columns.name = 'j' bit['ΔX'] = bit.X.shift(-1) - bit.X bit['S ΔX'] = bit.S * bit['ΔX'] bit['g(S) ΔX'] = bit['g(S)'] * bit['ΔX'] bit.loc['Sum'] = bit.sum() bit.loc['Sum', ['X', 'S', 'g(S)']] = np.nan bit = bit[['X', 'ΔX', 'p', 'S', 'X p', 'S ΔX', 'g(S)', 'Δg(S)', 'X Δg(S)', 'g(S) ΔX']] return bit def get_f_axs(self, nr, nc): w = nc * self.fw h = nr * self.fh return self.smfig(nr, nc, (w, h)) def apply_distortions(self, dnet_='', dgross_=''): """ make the 12 up plot """ # if nothing is passed in dgross = 'wang' dnet = 'wang' # book default distortions if self.case_id.startswith('discrete'): dnet = 'tvar' dgross = 'ccoc' elif self.case_id.startswith('tame'): dnet = 'tvar' dgross = 'ccoc' elif self.case_id.startswith('cnc'): dnet = 'ph' dgross = 'dual' # dnet = 'ccoc' # dgross = 'ccoc' elif self.case_id.startswith('hs'): dnet = 'wang' dgross = 'blend' # do what's asked for if dnet_ != '': dnet = dnet_ if dgross_ != '': dgross = dgross_ self.dnet = dnet self.dgross = dgross net = self.ports['net'] net.apply_distortion(net.dists[dnet]) gross = self.ports['gross'] gross.apply_distortion(gross.dists[dgross]) def _display_plot(self, plot_id, f, caption): """ Save the graphic """ # save the graph p = self.cache_dir / f'{plot_id}.{self.figtype}' f.savefig(p, dpi=600) if self.show is False: plt.close(f) # make the container html snippet pth = str(p.relative_to(self.cache_base).as_posix()) blob = f""" <figure id="{plot_id}"> <img src="/{pth}" width="100%" alt="Figure {f}" style="width:{100}%"> <figcaption class="caption">{caption}</figcaption> </figure> """ (self.cache_dir / f'{plot_id}.html').write_text(blob, encoding='utf-8') process_memory() def case_twelve_plot(self): """ make the 12 up plot must call apply_distortions first! """ # variables if self.case_id.startswith('discrete'): ylim = [-0.00025 / 4, .0075 / 4] xlim = [-5, 105] sort_order = [0, 1, 2] multiple_locator = .5e2 legend_loc = 'center right' elif self.case_id.startswith('tame'): ylim = [-0.00025 / 4, .0075 / 4] xlim = [-20, 175] sort_order = [2, 0, 1] multiple_locator = 1e2 legend_loc = 'lower left' elif self.case_id.startswith('cnc'): ylim = [-0.00025 / 4, .0075 / 4] xlim = [-20, 300] sort_order = [2, 0, 1] multiple_locator = 1e2 legend_loc = 'lower left' elif self.case_id.startswith('hs'): ylim = [-0.00025 / 4, .01 / 4] xlim = [-20, 2000] sort_order = [2, 0, 1] multiple_locator = 5e2 legend_loc = 'center right' else: # initial version... ylim = self.gross.limits(stat='density') xlim = self.gross.limits() xm = round_bucket(xlim[1]) xlim = [-xm / 50 , xm] sort_order = [2, 0, 1] multiple_locator = round_bucket(xm / 10) legend_loc = 'center right' # net / gross flavors 15.2-3 captions = [ f'(TN) {self.case_name}, net twelve plot with {self.dnet} distortion.', f'(TG) {self.case_name}, gross twelve plot with {self.dgross} distortion.' ] for port, nm, caption in zip((self.net, self.gross), ('T_net', 'T_gross'), captions): f, axs = self.smfig(4, 3, (10.8, 12.0)) # ad comps does not create augmented port.apply_distortion(port.dists['dual'], efficient=False) port.twelve_plot(f, axs, xmax=xlim[1], ymax2=xlim[1], contour_scale=port.q(self.reg_p), sort_order=sort_order, cmap=self.colormap) axs[0][0].set(ylim=ylim, xlim=xlim) axs[0][1].set(xlim=xlim) axs[0][2].yaxis.set_major_locator(ticker.MultipleLocator(multiple_locator)) axs[1][0].yaxis.set_major_locator(ticker.MultipleLocator(multiple_locator)) axs[2][2].yaxis.set_major_locator(ticker.MultipleLocator(multiple_locator)) axs[3][2].yaxis.set_major_locator(ticker.MultipleLocator(multiple_locator)) axs[2][2].legend().set(visible=False) axs[3][2].legend().set(visible=False) if self.case_id == 'hs': axs[3][1].set(ylim=[-10, 200]) axs[1][2].legend(loc=legend_loc) self._display_plot(nm, f, caption) def show_extended_graphs(self): """ Create relevant extended graphs TODO this is over the top and too slow. :return: """ if self.f_discrete: return # blended distortion f, axs = self.smfig(1, 3, (12.0, 4.0), ) ax0, ax1, ax2 = axs.flat ps = np.hstack((np.linspace(0, 0.1, 2000, endpoint=False), np.linspace(0.1, 1, 100))) ph = self.gross.dists['ph'].g(ps) # ax.plot(ps, blend, label='Naive blend') lsi = iter([':', '--', '-.'] * 5) for k, v in self.dist_dict.items(): temp = v.g(ps) ls = next(lsi) for ax in axs.flat: ax.plot(ps, temp, lw=1, ls=ls, label=k) for ax in axs.flat: ax.plot(ps, ps, c='k', lw=0.5) ax.plot(self.s_values, self.gs_values, 'x', lw=.5, label='Calibration pricing data') ax.plot(ps, ph, lw=0.5, label='PH') ax.legend() ax1.set(xlim=[0, 0.1], ylim=[0, 0.1]) ax2.set(xscale='log', yscale='log', xlim=[.8e-6, 1], ylim=[.8e-6, 1]) caption = f'(Z-BL-1) Figure (new): Calibrated blend distortions, compard to CCoC. ' \ 'The x marks show default and pricing data used in calibration. ' \ 'Middle plot zooms into 0 < s < 0.1. Righthand plot uses a log/log scale. ' \ 'PH distortion added for comparison.' self._display_plot('Z-BL-1', f, caption) return f def show_tranching_graph(self, kind): """ graph of tranching revised vertical view kind = gross or net called by show_extended_exhibits """ port = self.ports[kind] # new style = vertical, but focused on what is shown in the cap_table range f, axs = make_mosaic_figure('AB', w=4, h=6, return_array=True) ax0, ax1 = axs.flat for ax in axs.flat: if ax is ax0: ax.plot(port.density_df.F, port.density_df.loss, lw=1.5, label='Loss') else: ax.plot(1 / port.density_df.S, port.density_df.loss, lw=1.5, label='Loss') col = 0 for n, x in self.cap_table_total.iterrows(): col += 1 ax.axhline(x.Assets, c=f'C{col}', lw=1.5, label=f'{n}, ¤{x.Assets:,.0f} @ p={x.Adequacy:.3%}') pts = self.cap_table_total.Assets ax.yaxis.set_major_locator(ticker.FixedLocator(pts.values)) ax.yaxis.set_major_formatter(ticker.FixedFormatter([f'{i}: ¤{v:,.0f}' for i, v in pts.items()])) ax0.yaxis.set_major_formatter(ticker.StrMethodFormatter('{x:,.0f}')) ax0.axvline(0, lw=0.25, c='k') ax0.axvline(1, lw=0.25, c='k') ax1.axvline(1 / (1 - self.cap_table_total.Adequacy[0]), lw=0.25, c='k') ymin = port.q(0.0001) ymax = 1.05 * self.cap_table_total.Assets[0] ax0.set(ylabel='Loss', xlabel='Non exceedance probability') ax0.set(ylim=[ymin, ymax], yscale='linear') ax1.set(ylim=[ymin, ymax], xlim=[0.8, 2 / (1 - self.cap_table_total.Adequacy[0])], xscale='log', xlabel='Log return period') ax0.legend(loc='upper left') caption = f'(Z-TR-1) Figure (new): {kind} capital tranching with a {self.d2tc:.1%} debt ' \ 'to total capital limit.' self._display_plot(f'Z-TR-1-{kind[0]}', f, caption) def show_similar_risks_graphs(self, base='gross', new='net'): """ axd from mosaic Treats the bounds correctly in computing the tvars for the new portfolio Provenance : from make_port in Examples_2022_post_publish, similar_risks_graphs_sa """ bounds = self.boundss[base] df = bounds.weight_df.copy() df['test'] = df['t_upper'] * df.weight + df.t_lower * (1 - df.weight) # tvars for the new risk new_a = self.pricing_summary.at['a', new] new_prem = self.pricing_summary.at['P', new] tvarf = self.boundss[new].tvar_with_bound tvar1 = {p: float(tvarf(p, new_a)) for p in bounds.tps} df['t1_lower'] = [tvar1[p] for p in df.index.get_level_values(0)] df['t1_upper'] = [tvar1[p] for p in df.index.get_level_values(1)] df['t1'] = df.t1_upper * df.weight + df.t1_lower * (1 - df.weight) a = self.pricing_summary.at['a', base] prem = self.pricing_summary.at['P', base] roe_d = agg.Distortion('ccoc', self.roe) tvar_d = agg.Distortion('tvar', bounds.p_star('total', prem)) idx = df.index.get_locs(df.idxmax()['t1'])[0] pl, pu, tl, tu, w = df.reset_index().iloc[idx, :-4] max_d = agg.Distortion('wtdtvar', w, df=[pl, pu]) tmax = float(df.iloc[idx]['t1']) print('Ties for max: ', len(df.query('t1 == @tmax'))) print('Near ties for max: ', len(df.query('t1 >= @tmax - 1e-4'))) idn = df.index.get_locs(df.idxmin()['t1'])[0] pln, pun, tl, tu, wn = df.reset_index().iloc[idn, :-4] min_d = agg.Distortion('wtdtvar', wn, df=[pln, pun]) # make the plot mosaic f = plt.figure(constrained_layout=True, figsize=(16, 8)) axd = f.subplot_mosaic( """ AAAABBFF AAAACCFF AAAADDEE AAAADDEE """ ) ax = axd['A'] plot_max_min(bounds, ax) n = len(ax.lines) roe_d.plot(ax=ax, both=False) tvar_d.plot(ax=ax, both=False) max_d.plot(ax=ax, both=False) min_d.plot(ax=ax, both=False) ax.lines[n + 0].set(label='ccoc') ax.lines[n + 2].set(color='green', label='tvar') ax.lines[n + 4].set(color='red', label='max') ax.lines[n + 6].set(color='purple', label='min') ax.legend(loc='upper left') ax.set(title=f'Max ({pl:.4f}, {pu:.4f}), min ({pln:.4f}, {pun:.4f})') ax = axd['B'] bounds.weight_image(ax) ax.set(title=f'Weight for p1 on {base}') bit = df['t1'].unstack(1) ax = axd['C'] img = ax.contourf(bit.columns, bit.index, bit, cmap='viridis_r', levels=20) ax.set(xlabel='p1', ylabel='p0', title=f'Pricing on {new} Risk', aspect='equal') ax.get_figure().colorbar(img, ax=ax, shrink=.5, aspect=16, label='rho(X_new)') ax.plot(pu, pl, '.', c='w') ax.plot(pun, pln, 's', ms=3, c='white') ax = axd['D'] port = self.ports[base] pnew = self.ports[new] def plot_lee(port, ax, lw=2, scale='linear'): """ Lee diagram by hand """ p_ = np.linspace(0, 1, 10000, endpoint=False) qs = [port.q(p) for p in p_] if scale == 'linear': ax.step(p_, qs, lw=lw, label=port.name) ax.set(xlim=[-0.05, 1.05], ylim=[-0.05, port.q(0.9999) + .05]) else: ax.plot(1 / (1 - p_), qs, lw=lw, label=port.name) ax.set(xscale='log', xlim=[0.5, 2e4], ylim=[-0.05, port.q(0.9999) + .05]) plot_lee(pnew, ax, scale='log') plot_lee(port, ax, lw=1, scale='log') ax.set(title=f'Lee Diagrams - log return') ax.legend(loc='upper left') ax = axd['E'] plot_lee(pnew, ax) plot_lee(port, ax, lw=1) ax.set(title=f'Lee Diagrams - linear') ax.legend(loc='upper left') # port.density_df.p_total.plot(ax=ax, label=port.name) # pnew.density_df.p_total.plot(ax=ax, label=pnew.name) # ax.set(xlim=[0, pnew.q(0.9999)], title='Densities') # ax.legend(loc='upper right') ax = axd['F'] plot_max_min(bounds, ax) for c, dd in zip(['r', 'g', 'b'], ['ph', 'wang', 'dual']): port.dists[dd].plot(ax=ax, both=False, lw=1) ax.lines[n].set(c=c, label=dd) n += 2 ax.set(title='PH, Wang and Dual') ax.legend(loc='lower right') caption = f'(Z-TR-2) Figure (new): Analysis of extreme distortions for net portfolio based on gross calibration. ' \ 'Large plot shows the extreme distortions achieving the greatest and least premium for the net portfolio.' self._display_plot('Z-TR-2', f, caption) return df def loss_density_spectrum(self): """ Loss (exeqa) by line and the distortion spectrums. This is different for discrete distributions. :param self: :return: """ logger.info("15.11") f, axs = self.get_f_axs(3, 2) def diff(f): """ manual differentiation """ eps = 1e-9 def fprime(x): return (f(x + eps) - f(x - eps)) / (2 * eps) return fprime if self.case_id.startswith('discrete'): self.exeqa = {} for port in [self.gross, self.net]: bit = port.density_df.query('p_total > 0').filter(regex='exeqa_[tX]|F').set_index('F', drop=True) bit.loc[0] = 0. bit = bit.sort_index().rename(columns=lambda x: x.split('_')[1].title()) self.exeqa[port.name] = bit ps = bit.index gprime = {} dist_list = ['ccoc', 'ph', 'wang', 'dual', 'tvar', 'blend'] for dn in dist_list: gprime[dn] = diff(self.gross.dists[dn].g) self.diff_g = pd.DataFrame({dn: gprime[dn](1 - ps) for dn in dist_list}, index=bit.index) names = {'ccoc': 'CCoC', 'ph': 'Prop Hazard', 'wang': 'Wang', 'dual': "Dual", 'tvar': "TVaR", 'blend': "Blend"} lbl = 'Gross E[Xi | X]' ax0 = axs[0, 0] for (k, b), ax in zip(self.exeqa.items(), axs.flat): b.plot(drawstyle='steps-pre', ax=ax) ax.set(xlim=[-0.025, 1.025], ylabel=lbl, xlabel='p') ax.lines[0].set(lw=3, alpha=.5) ax.axhline(0, lw=.5) ax.xaxis.set_major_locator(ticker.MultipleLocator(.25)) ax.xaxis.set_minor_locator(ticker.MultipleLocator(1 / 16)) # gross and net have same y lim if ax is ax0: ylim_ = ax.get_ylim() else: ax.set(ylim=ylim_) lbl = 'Net E[Xi | X]' ax.legend(loc='upper left') axi = iter(axs.flat[2:]) ylim = 5 for d1, d2 in zip(*[iter(dist_list)] * 2): ax = next(axi) # removed c='k' ax.plot(self.diff_g.index, self.diff_g[d1], c='C2', lw=1, label=names[d1], drawstyle='steps-post') if d1 == 'ccoc': ax.plot([1], [ylim - 0.1], '^', c='C2', ms=10, label='CCoC mass') ax.plot(self.diff_g.index, self.diff_g[d2], c='C3', lw=1, label=names[d2], drawstyle='steps-post') # ax.yaxis.set_major_locator(ticker.MultipleLocator(.25)) ax.axhline(1, lw=.5, c='k') ax.set(xlim=[-0.025, 1.025], ylim=[0, ylim], xlabel=None, ylabel='Distortion weight (spectrum)') ax.legend(loc='upper left') axs[2, 0].set(xlabel='Loss non-exceedance probability') axs[1, 1].set(ylabel=None) axs[2, 1].set(ylabel=None) # plot them all together ax = axs[2, 1] # omit blend self.diff_g.iloc[:, :-1].plot(ax=ax, lw=1, drawstyle='steps-pre') ax.set(xlim=[-0.025, 1.025], yscale='linear', xlabel='Loss non-exceedance probability') ax.axhline(1, lw=.5, c='k') else: self.exeqa = {} for port in [self.gross, self.net]: bit = port.density_df.filter(regex='loss|^S$|^F$|exeqa_[A-Zt]') if self.case_id == 'hs': bit = bit.iloc[::8] bit['return'] = 1 / bit.S bit0 = bit.set_index('return').filter(regex='ex').rename(columns=lambda x: x.split('_')[1]).drop( 'total', axis=1).cumsum(axis=1) bit0 = bit0.loc[bit0.index < np.inf] ns = 1000 if self.case_id == 'hs': ns = 10000 qs = pd.qcut(bit0.index, ns, duplicates='drop') bit0['qs'] = qs bit0 = bit0.reset_index(drop=False).groupby('qs').agg(np.mean).set_index('return') self.exeqa[port.name] = bit0 ps = 1 / bit0.index gprime = {} dist_list = ['ccoc', 'ph', 'wang', 'dual', 'tvar', 'blend'] for dn in dist_list: gprime[dn] = diff(self.gross.dists[dn].g) self.diff_g = pd.DataFrame({dn: gprime[dn](ps) for dn in dist_list}, index=bit0.index) names = {'ccoc': 'CCoC', 'ph': 'Prop Hazard', 'wang': 'Wang', 'dual': "Dual", 'tvar': "TVaR", 'blend': "Blend"} lbl = 'Gross E[Xi | X]' for (k, b), ax in zip(self.exeqa.items(), axs.flat): b.plot(ax=ax) ylim = {'tame': 200, 'cnc': 500, 'hs': 2500}.get(self.case_id, self.gross.limits('range', 'log')[1]) ax.set(xlim=[1, .5e6], ylim=[0, ylim], xscale='log', ylabel=lbl, xlabel=None) lbl = 'Net E[Xi | X]' ax.legend(loc='upper left') axi = iter(axs.flat[2:]) ylim = 5 if k == 'hs': ylim = 50 for d1, d2 in zip(*[iter(dist_list)] * 2): ax = next(axi) # removed c='k' ax.plot(self.diff_g.index, self.diff_g[d1], c='C2', lw=1, label=names[d1]) if d1 == 'ccoc': ax.plot([.4e5], [ylim - 0.1], '^', c='C2', ms=10, label='CCoC mass') ax.plot(self.diff_g.index, self.diff_g[d2], c='C3', lw=1, label=names[d2]) ax.yaxis.set_major_locator(ticker.MultipleLocator(ylim / 5)) ax.axhline(1, lw=.5, c='k') ax.set(xlim=[0.5, .5e5], ylim=[0, ylim], xscale='log', xlabel=None, ylabel='Distortion weight (spectrum)') ax.legend(loc='upper left') axs[2, 0].set(xlabel='Loss return period') axs[1, 1].set(ylabel=None) axs[2, 1].set(ylabel=None) # plot them all together ax = axs[2, 1] # omit blend self.diff_g.iloc[:, :-1].plot(ax=ax, lw=1) ax.set(xlim=[0.5, 0.5e5], ylim=[.1, 200], xscale='log', yscale='log', xlabel='Loss return period') caption = ( f'(W) Figure 15.11: {self.case_name}, loss spectrum (gross/net top row). Rows 2 and show VaR weights ' 'by distortion. In the second row, the CCoC distortion includes a mass putting weight ' f'𝑑 = {self.roe}{1 + self.roe} at the maximum loss, corresponding to an infinite density. ' 'The lower right-hand plot compares all five distortions on a log-log scale.') self._display_plot('W', f, caption) def show_graphs(self, *chapters): if chapters[0] == 'all': chapters = [2, 4, 7, 9, 11, 13, 15] # ================================================================================================== # ================================================================================================== if 2 in chapters: # fig 2.2, 2.4, and 2.6 logger.info('Figures 2.2, 2.4, 2.6') # f, axs = self.get_f_axs(2, 2) # ax0, ax1, ax2, ax3 = axs.flat f, axd = make_mosaic_figure('AB\nCD') axd_top = {'A': axd['A'], 'B': axd['B']} axd_bottom = {'A': axd['C'], 'B': axd['D']} # order series so total is first self.gross.plot(axd_top) self.net.plot(axd_bottom) caption = f'(B) {self.case_name}, gross (top) and net (bottom) densities on a nominal (left) and log (right) scale.' self._display_plot('B', f, caption) # fig 2.3, 2.5, and 2.6 =================================================================================== logger.info('Figures 2.3, 2.5, 2.7') if self.case_id == 'cnc': xmax = 500 sim_xlim = [-10, 500] sim_ylim = [-10, 500] sim_scale = 250 elif self.case_id == 'tame': xmax = 250 / 2 sim_xlim = [-5, 250 / 2] sim_ylim = [-5, 250 / 2] sim_scale = 125 / 2 elif self.case_id == 'hs': xmax = 2000 sim_xlim = [-10, 2000] sim_ylim = [-10, 2000] sim_scale = 2000 elif self.case_id == 'discrete': xmax = 110 sim_xlim = [-5, 110] sim_ylim = [-5, 110] sim_scale = 100 else: logger.warning('Check scales are reasonable...') sim_xlim = self.gross.limits() xmax = round_bucket(sim_xlim[1]) sim_xlim = [-xmax / 50, xmax] sim_ylim = sim_xlim sim_scale = xmax // 4 # ? QDFC # need to make this work for the other version ln1, ln0 = self.gross.line_names pln0 = f'p_{ln0}' pln1 = f'p_{ln1}' port = self.gross sample = pd.concat((port.density_df[['loss', pln0]].sample(10 ** 6, replace=True, weights=pln0).reset_index( drop=True).rename(columns={'loss': ln0}), port.density_df[['loss', pln1]].sample(10 ** 6, replace=True, weights=pln1).reset_index( drop=True).rename(columns={'loss': ln1})), axis=1) sample['loss'] = sample[ln1] + sample[ln0] sample = sample.set_index('loss').sort_index() sample['e_total'] = sample[ln0] / sample.index sample['bin'] = pd.qcut(sample.index, 1000, duplicates='drop') # kappa_estimate = sample.reset_index().groupby('bin')[['loss', 'e_total']].agg(np.mean).set_index('loss') # make the image f, ax2 = self.smfig(1, 1, (3, 3)) bit = sample.sample(n=25000 if self.case_id == 'tame' else 250000) ax2.scatter(x=bit[ln1], y=bit[ln0], marker='.', s=1, alpha=.1) ax2.set(xlim=sim_xlim, ylim=sim_ylim) ax2.set(aspect='equal') ax2.axis('off') bit_fn = Path.home() / f'aggregate/temp/scatter_{self.case_id}.png' f.savefig(bit_fn, dpi=1000) plt.close(f) del f # assemble parts f, axs = self.smfig(1, 3, (9, 3)) ax0, ax1, ax2 = axs.flat axi = iter(axs.flat) bivariate_density_plots(axi, self.ports.values(), xmax=xmax, levels=15, biv_log=True, contour_scale=xmax, cmap=self.colormap) img = plt.imread(bit_fn) ax2.imshow(img, extent=[*sim_xlim, *sim_ylim]) ax2.set(xlim=sim_xlim, ylim=sim_ylim, ) tvar = self.gross.q(.99) ax2.plot([tvar, 0], [0, tvar], 'k', lw=1) for ax in axs.flat: ax.xaxis.set_major_locator(ticker.MultipleLocator(sim_scale)) ax.yaxis.set_major_locator(ticker.MultipleLocator(sim_scale)) ax2.set(aspect='equal', title=self.gross.name + ' sample') ax0.set(title='Gross') ax1.set(title='Net', ylabel=None) x, y = self.ports['gross'].line_names ax2.set(title='Gross sample', xlabel=f'Line {x}') caption = f'(C) {self.case_name}, bivariate densities: gross (left), net (center), and a sample from gross (right). Impact of reinsurance is clear in net plot.' self._display_plot('C', f, caption) # ================================================================================================== # ================================================================================================== if 4 in chapters: # fig 4.10, 11, 12 =================================================================================== logger.info('Figures 4.10, 4.11, 4.12') f, axs = self.get_f_axs(2, 2) # ax0, ax1, ax2, ax3 = axs.flat for (k, port), ax0, ax1 in zip(self.ports.items(), axs[0], axs[1]): bounds = self.boundss[k] a = self.pricing_summary.at['a', k] if self.case_id not in ('discrete', 'awkward', 'differentialble', 'notdifferentiable'): df = pd.DataFrame({'p': bounds.tps, 'TVaR': [float(port.tvar(i)) for i in bounds.tps], 'Bounded TVaR': bounds.tvars, 'VaR': [port.q(i) for i in bounds.tps]}).set_index('p') pu = df.index[-2] for i in range(3): pu = (1 + pu) / 2 tvarb = bounds.tvar_with_bound(pu, a) tvar = bounds.tvar_unlimited_function(pu) q = port.q(pu) df.loc[pu, :] = [tvar, tvarb, q] df = df.sort_index() ds = 'default' else: # for discrete need more careful, non-interpolating version of tvar df = pd.DataFrame({'p': bounds.tps, 'TVaR': [bounds.tvar_with_bound(i, a, 'tail') for i in bounds.tps], 'VaR': [port.q(i) for i in bounds.tps] }).set_index('p') ds = 'steps-pre' df['Bounded VaR'] = np.minimum(df.VaR, a) ax = ax0 df.plot(ax=ax, drawstyle=ds) # ax.lines[-1].set(lw=3, ls=':', alpha=.5) ax0.set(xlabel='$p$') if self.case_id == 'discrete': ax.set(ylim=[0, 110]) else: ax.set(ylim=[port.q(0.001), port.q(max(0.9999, self.reg_p))]) ax.set(title=f'{k.title()}, a={a:,.1f}') ax.legend(loc='upper left') ax = ax1 df.index = 1 / (1 - df.index) df.plot(ax=ax, drawstyle=ds) # ax.lines[-1].set(lw=3, ls=':', alpha=.5) if self.case_id == 'discrete': ax.set(ylim=[0, 110]) else: pass # ax.set(ylim=[0, 500]) # port.q(max(0.9999, reg_p))]) ax.set(xlabel='Return period', xscale='log', title='Return period view') ax.legend().set(visible=False) caption = f'(D) Figure 4.10: {self.case_name}, TVaR, and VaR for unlimited and limited variables, gross (left) and net (right). Lower view uses a log return period horizontal axis.' self._display_plot('D', f, caption) # ================================================================================================== # ================================================================================================== if 11 in chapters or 11.3 in chapters: # fig 11.3, 4, 5 # warning: quite slow! logger.info('Figures 11.3, 11.4, 11.5') f, axs = self.smfig(1, 3, (3 * 3, 1 * 3)) bounds = self.boundss['gross'] port = self.ports['gross'] premium, a = self.pricing_summary.loc[['P', 'a'], 'gross'] p_star = bounds.p_star('total', premium, a) bounds.cloud_view(axs=axs, n_resamples=0, alpha=1, pricing=True, title=f'Premium={premium:,.1f}, p={self.reg_p:.1g}\na={a:,.0f}, p*={p_star:.3f}', distortions=[{k: port.dists[k] for k in ['ccoc', 'tvar']}, {k: port.dists[k] for k in ['ph', 'wang', 'dual']}]) # for ax in axs.flat: # ax.set(aspect='equal') # f.suptitle(f'Portfolio {k}', fontsize='x-large') for ax in axs.flatten()[1:]: ax.legend(ncol=1, loc='lower right') for ax in axs.flatten(): ax.set(title=None) caption = f'(M) Distortion envelope for {self.case_name}, gross. Left plot shows the distortion envelope, middle overlays the CCoC and TVaR distortions, right overlays proportional hazard, Wang, and dual moment distortions.' self._display_plot('M', f, caption) if 11 in chapters or 11.6 in chapters: # fig 11.6, 7, 8 logger.info('Figures 11.6, 11.7, 11.8') f, axs = self.smfig(4, 3, (3 * 2.5, 4 * 2.5)) port = self.ports['gross'] for dn, ls in zip(['ph', 'wang', 'dual'], ['-', '--', ':']): axi = iter(axs.flatten()) g_ins_stats(axi, dn, port.dists[dn], ls=ls) for dn, ls in zip(['ccoc', 'tvar', 'blend'], ['-', '--', ':']): axi = iter(axs.flatten()[6:]) g_ins_stats(axi, dn, port.dists[dn], ls=ls) caption = f'(O) {self.case_name}, variation in premium, loss ratio, markup (premium to loss), margin, discount rate, and premium to capital leverage for six distortions, shown in two groups of three. Top six plots show proportional hazard, Wang, and dual moment; lower six: CCoC, TVaR, and Blend.' self._display_plot('O', f, caption) if 11 in chapters or 11.9 in chapters: # fig 11.9, 10, 11 logger.info('Figures 11.9, 11.10, 11.11') f, axs = self.smfig(6, 4, (4 * 2.5, 6 * 2.5)) port = self.ports['gross'] for i, dn in zip(range(6), ['ccoc', 'ph', 'wang', 'dual', 'tvar', 'blend']): axi = iter(axs.flatten()[4 * i:]) macro_market_graphs(axi, port, dn, 200) if i > 0: axi = iter(axs.flatten()[4 * i:]) next(axi).legend().set(visible=False) next(axi).legend().set(visible=False) next(axi).legend().set(visible=False) next(axi).legend().set(visible=False) caption = f'(P) {self.case_name}, variation in SRM properties as the asset limit (x-axis) is varied. Column 1: total premium and loss; 2: total assets, premium, and capital; 3; total and layer loss ratio; and 4: total and layer discount factor. ' \ 'By row CCoC, PH, Wang, Dual, TVaR, and Blend.' self._display_plot('P', f, caption) # ================================================================================================== # ================================================================================================== if 15 in chapters: # fig 15.2--15.7 (Twelve Plot) self.case_twelve_plot() # fig 15.8, 15.9, 15.10 logger.info("Figures 15.8, 15.9, 15.10") f, axs = self.get_f_axs(1, 2) ax0, ax1 = axs.flat if self.case_id == 'tame': xlim = [1, 1e9] elif self.case_id == 'cnc': xlim = [1, 1e5] elif self.case_id == 'hs': xlim = [1, 1e5] elif self.case_id == 'discrete': xlim = [0, 100] else: # TO DO! think about this! xlim = self.gross.limits() for port, kind, ax in zip([self.gross, self.net], ['Gross', 'Net'], axs.flat): bit = port.augmented_df.filter(regex='M\.Q|F|^S$') ax.plot(1 / bit.S, bit.filter(regex='Q')) # ['M.Q_A']) for ln, nm in zip(ax.lines, bit.filter(regex='Q').columns): ln.set(label=nm[4:]) ax.xaxis.set_minor_locator(ticker.LogLocator(numticks=10)) ax.axhline(0, lw=.5, c='k') ax.set(xscale='log', xlim=xlim, title=f'{kind} capital density', xlabel='Total loss return period', ylabel='Layer capital rate', ylim=[-0.1, 1.05]) if ax is ax0: ax.legend() # loc='lower right') # Figure 15.8 caption = f"(U) {self.case_name}, capital density for {self.case_name}, with {str(self.ports['gross'].dists[self.dgross])} gross and {str(self.ports['net'].dists[self.dnet])} net distortion." self._display_plot('U', f, caption) # fig 15.11 self.loss_density_spectrum() # fig 15.12, 15.13, 15.14 logger.info('Figures 15.12, 15.13, 15.14.') f, axs = self.get_f_axs(1, 2) ax0, ax1 = axs.flat # Bodoff allocation... lims = [0, self.gross.q(self.reg_p) * 1.1] for port, bit, ax in zip([self.gross, self.net], [self.bitg, self.bitn], axs.flat): bit.iloc[:, -2:].plot(ax=ax) ax.set(xlim=lims, ylim=lims, xlabel='Assets', ylabel='Allocation') ax.axvline(port.q(self.reg_p), ls=':', label='Capital') ax.legend() if ax is ax0: ax.set(title='Gross') else: ax.set(title='Net') # Figure 15.12 caption = f'(X) {self.case_name}, percentile layer of capital allocations by asset level, showing {self.reg_p} capital. (Same distortions.)' self._display_plot('X', f, caption) def make_boundss_p_stars(self, n_tps=1024, n_s=512): boundss = {} p_stars = {} for k, port in self.ports.items(): bounds = Bounds(port) # tvar_cloud -> compute_weights -> tvar_array ; tvar array used below for the graph # bounds.tvar_array('total', 1024, a) kind = 'interp' bounds.tvar_cloud( 'total', self.pricing_summary.at['P', k], self.pricing_summary.at['a', k], n_tps, n_s, kind) boundss[k] = bounds p_stars[k] = bounds.p_star('total', self.pricing_summary.at['P', k], self.pricing_summary.at['a', k], kind='tail' if k in ('discrete', 'awkward') else 'interp') self.boundss = boundss self.p_stars = p_stars # return boundss, p_stars def make_classic_pricing(self): port = self.ports['gross'] port.calibrate_distortions(ROEs=[self.roe], Ps=[self.reg_p], strict='ordered', df=[0, .98]) # f_roe_fix, put pre-computed approx back port.dists['ccoc'] = self.roe_d # and then SET the distortions for net self.ports['net'].dists = self.ports['gross'].dists.copy() cp = ClassicalPremium(self.ports, self.pricing_summary.loc['P', 'gross']) # set power for Fischer cp.p = 2 out = cp.calibrate('gross', 'total', self.pricing_summary.loc['P', 'gross']) if self.case_id not in ['cnc', 'tame']: if 'Exponential' in out: del out['Exponential'] logger.info('deleting Exponential method') # uses unlimited (audit stats mean) losses prices = cp.prices('gross', 'total', out) bits = {} lns = self.ports['gross'].line_names_ex bits['gross'] = pd.concat([pd.DataFrame(out.items(), columns=['method', 'parameter']).set_index('method')] + [pd.DataFrame(cp.prices('gross', ln, out).items(), columns=['method', 'premium']).set_index('method') for ln in lns] , axis=1, keys=['parameters'] + lns, names=['line', 'item']) bits['net'] = pd.concat([pd.DataFrame(out.items(), columns=['method', 'parameter']).set_index('method')] + [pd.DataFrame(cp.prices('net', ln, out).items(), columns=['method', 'premium']).set_index('method') for ln in lns] , axis=1, keys=['parameters'] + lns, names=['line', 'item']) try: bit = pd.concat([bits['gross'], bits['net'].iloc[:, [2, 3]]], keys=bits.keys(), names=['view', 'line', 'item'], axis=1) except IndexError: logger.warning('Alternative logic...') bit = pd.concat([bits['gross'], bits['net']], keys=bits.keys(), names=['view', 'line', 'item'], axis=1) bit = bit.droplevel('item', axis=1).swaplevel('view', 'line', axis=1).rename( columns={'parameters': ' parameters'}).sort_index(axis=1) ll = list(bit.columns) ll[0] = (ll[0][0], 'value') bit.columns = pd.MultiIndex.from_tuples(ll) bit[('total', 'ceded')] = bit[('total', 'gross')] - bit[('total', 'net')] bit = bit.sort_values((lns[1], 'gross')) emp_means = pd.concat((port.audit_df.T.loc[['EmpMean']] for port in self.ports.values()), keys=self.ports.keys(), names=['view', 'line'], axis=1) emp_means[('ceded', 'total')] = emp_means[('gross', 'total')] - emp_means[('net', 'total')] emp_means = emp_means.swaplevel(0, 1, axis=1).sort_index(axis=1) if self.case_id == 'hs': drop_line = 'SCS' elif self.case_id == 'tame': drop_line = 'A' elif self.case_id == 'cnc': drop_line = 'NonCat' elif self.case_id == 'discrete': drop_line = 'X1' else: drop_line = None if drop_line is not None: emp_means = emp_means.drop((drop_line, 'net'), axis=1) bit = pd.concat((emp_means, bit)).sort_index(ascending=[True, False], axis=1) lrs = (bit.loc['EmpMean'] / bit).iloc[:, 1:] if self.case_id in ['discrete', 'tame', 'cnc', 'hs'] or 1: # not sure why you wouldn't want this? no_re_line = self.ports['gross'].line_names.copy() no_re_line.remove(self.re_line) no_re_line = no_re_line[0] sop = bit.copy().drop(columns=[(' parameters', 'value'), ('total', 'ceded')]) sop[('sop', 'gross')] = sop[(self.re_line, 'gross')] + sop[(no_re_line, 'gross')] sop[('sop', 'net')] = sop[(self.re_line, 'net')] + sop[(no_re_line, 'gross')] sop[('delta', 'gross')] = sop[('sop', 'gross')] - sop[('total', 'gross')] sop[('delta', 'net')] = sop[('sop', 'net')] - sop[('total', 'net')] sop = sop[['total', 'sop', 'delta']] sop = sop.sort_index(axis=1, ascending=[False, True]) sop.index.name = 'method' else: sop = None bit.index.name = 'method' lrs.index.name = 'method' # self.port = port # self.cp = cp self.prices = prices # self.bits = bits self.lns = lns self.classic_pricing = bit self.lrs = lrs # self.no_re_line = no_re_line self.sop = sop # Table 13.1 tab13_1 = pd.DataFrame(index=self.gross.line_names_ex) tab13_1.index.name = 'Unit' vd = self.gross.var_dict(self.reg_p) tab13_1['a'] = vd.values() tab13_1['E[Xi(a)]'] = \ self.gross.density_df.filter(regex='exa_[A-Zt]').rename(columns=lambda x: x.replace('exa_', '')).loc[ vd['total']] tab13_1['E[Xi ∧ ai]'] = [self.gross.density_df.loc[v, f'lev_{k}'] for k, v in vd.items()] tab13_1 = tab13_1.rename(index={'total': 'Total'}) tab13_1.loc['SoP'] = tab13_1.iloc[:-1].sum() self.tab13_1 = tab13_1 def make_modern_monoline(self): bitsgross = [] for port in self.ports.values(): # unlike bitgross = port.stand_alone_pricing(self.roe_d, p=self.reg_p, kind='var') bitgross = bitgross.swaplevel(0, 1).sort_index(ascending=[1, 0]).rename( index={'traditional - no default': 'no default', f'sa {self.roe_d}': 'constant roe'}) # if len(bitsgross) == 0: # display(bitgross) bitsgross.append(bitgross) bit = pd.concat([bg.loc[['L', 'M', 'P', 'LR', 'Q', 'ROE', 'PQ', 'a']].drop( 'constant roe', axis=0, level=1) for bg in bitsgross], axis=1, keys=self.ports.keys(), names=['portfolio', 'line']) rob = bit.rename(index=universal_renamer).rename(index={'Traditional': 'With Default'}) \ .swaplevel(axis=0).sort_index() order = np.array([3, 5, 6, 4, 1, 7, 2, 0]) order = np.hstack((order, order + 8)) rob = rob.iloc[order, [0, 1, 2, 3, 4, 6, 7]] rob.index.names = ['method', 'statistic'] self.modern_monoline_sa = rob # add self.blend_d to list of distortions for port in self.ports.values(): if 'ccoc' not in port.dists: port.dists['ccoc'] = self.dist_dict['ccoc'] if 'blend' not in port.dists and 'blend' in self.dist_dict: port.dists['blend'] = self.dist_dict['blend'] elif 'blend' not in port.dists: # TODO arbitrary?! k = list(self.blend_distortions.keys())[-1] port.dists['blend'] = self.dist_dict[k] # Ch. Modern Mono Practice across distortions # all using GROSS calibrated distortions: distortions = [self.ports['gross'].dists[dn] for dn in ['ccoc', 'ph', 'wang', 'dual', 'tvar', 'blend']] bit_apply_gross = [] for port in self.ports.values(): temp = port.stand_alone_pricing(distortions, p=self.reg_p, kind='var') temp = temp.swaplevel(0, 1).sort_index(ascending=[1, 0]) bit_apply_gross.append(temp.filter(regex='sa', axis=0).rename(index=distortion_namer)) bit = pd.concat([bg.loc[['L', 'M', 'P', 'LR', 'Q', 'ROE', 'PQ', 'a']] for bg in bit_apply_gross], axis=1, keys=self.ports.keys(), names=['portfolio', 'line']) rob2 = bit.rename(index=universal_renamer) # .swaplevel(axis=0) # order = np.array([5, 3, 1, 4, 2, 0]) order = np.array([0, 4, 2, 5, 3, 1]) order = np.hstack([0] + [order + i * 6 for i in range(1, 7)] + [-6]) if self.case_id == 'tame': cols = [0, 1, 2, 3, 5, 6, 7] else: cols = [0, 1, 2, 3, 4, 6, 7] rob2 = rob2.iloc[order, cols] rob2.index.names = ['statistic', 'distortion'] # is this different? self.modern_monoline_by_distortion = rob2 def make_ad_comps(self): # ad = analyze distortion: combines all pricing methods into one summary data frame # TIME CONSUMING STEP...computed here for all allocations used here and Ch 090 110 for the HUSCS line # remember above, we replaced the dists for net with that calibrated on gross # print(self.ports['gross'].dists == self.ports['net'].dists) ad_compss = {} for k, port in self.ports.items(): if k not in ad_compss: ad_comps = port.analyze_distortions(p=self.reg_p, efficient=False, regex='ph|dual|wang|tvar|ccoc|blend') ad_compss[k] = ad_comps if k == 'net': # for classical CCoC need to just use roe distortion ad_compss['net_classical'] = port.analyze_distortions(p=self.reg_p, efficient=False, regex='ccoc') summaries = {} for k, ad_comps in ad_compss.items(): deets = ad_comps.comp_df.swaplevel(0, 1).sort_index(ascending=(1, 0)) summary = pd.concat(( deets.loc[[('L', 'EL')]], deets.loc[CaseStudy._stats_[1:]])) summaries[k] = summary self.ad_compss = ad_compss self.summaries = summaries # this is a bit mix n match self.classical_pricing = extract_sort_order(self.summaries, self._classical_, True) self.distortion_pricing = extract_sort_order(self.summaries, self._dist_, False) def make_bodoff_comparison(self): # tables 15.38-40 and Fig15.12-14 # note the allocation is assets, not capital bitg = self.gross.augmented_df.filter(regex='loss|exi_xgta_[A-Z]|F|^S$') bitn = self.net.augmented_df.filter(regex='loss|exi_xgta_[A-Z]|F|^S$') for bit in [bitg, bitn]: bit.set_index('loss', inplace=True) for ln in self.gross.line_names: bit[ln] = bit[f'exi_xgta_{ln}'].shift(1, fill_value=0).cumsum() * self.gross.bs cbit = self.distortion_pricing.loc['a'].copy() r = f'^{self.gross.line_names[0]}|^{self.gross.line_names[1]}' g = bitg.loc[self.gross.q(self.reg_p), :].filter(regex=r) n = bitn.loc[self.net.q(self.reg_p), :].filter(regex=r) g['total'] = g.sum() n['total'] = n.sum() cbit.loc['PLC', :] = 0 for i in g.index: cbit.loc['PLC', ('gross', i)] = g[i] cbit.loc['PLC', ('net', i)] = n[i] cbit.loc['PLC', ('ceded', 'diff')] = cbit.loc['PLC', ('gross', 'total')] - cbit.loc['PLC', ('net', 'total')] # TODO rename these! self.bitg = bitg self.bitn = bitn self.bodoff_compare = cbit def make_progression(self): """ Integrate modern stand-alone and portfolio pricing :return: """ vas = ['Premium', 'Loss Ratio', 'Rate of Return'] dns = ['Ccoc', 'PH', 'Wang', 'Dual', 'TVaR', 'Blend'] idx = product(vas, dns) bit_sa = self.modern_monoline_by_distortion.loc[idx] vas = ['P', 'LR', 'ROE'] dns = ['Dist ccoc', 'Dist ph', 'Dist wang', 'Dist dual', 'Dist tvar', 'Dist blend'] idx = product(vas, dns) renamer = {'P': 'Premium', 'LR': 'Loss Rato', 'ROE': 'Rate of return', 'Dist ccoc': 'CCoC', 'Dist ph': 'PH', 'Dist wang': 'Wang', 'Dist dual': 'Dual', 'Dist tvar': 'TVaR', 'Dist blend': 'Blend'} bit_d = self.distortion_pricing.loc[idx] bit_d = bit_d.drop(('ceded', 'diff'), axis=1).rename(index=renamer) bit = pd.concat((bit_sa, bit_d), keys=['Stand-alone', 'Allocated']) bit.index.names = ['Approach', 'Statistic', 'Distortion'] bit.columns.names = ['Perspective', 'Unit'] # and subset to just the premium walk b0 = urn(bit.xs('Premium', axis=0, level=1)['gross'].fillna('')).copy() b0.loc['Allocated', 'SoP'] = b0.loc['Allocated', 'Total'].values b0 = b0.drop(columns=['Total']) b0.columns = list(b0.columns[:-1]) + ['Total'] # difference # TODO SORT OUT - why are there missing values? b0d = b0.loc['Stand-Alone'].replace('', 0.) - b0.loc['Allocated'] walk = pd.concat((b0d, b0d / b0.loc['Stand-Alone'].replace('', 1), b0d.div(b0d['Total'].values, axis=0) ), keys=['Pooling ¤ Benefit', 'Pct Premium Reduction', 'Pct of Benefit'], axis=1, names=['View', 'Unit'] ) self.progression = bit self.walk = walk def make_all(self, dnet='', dgross='', n_tps=256, n_s=128): """ run code to make all the graphs and exhibits applies book default distortions or user selected Was n_tps=1024, n_s=512 which seems overkill """ # self.make_audit_exhibits() # print('audit exhibits done') self.make_boundss_p_stars(n_tps, n_s) process_memory() logger.info('boundss and pstar done') self.make_classic_pricing() process_memory() logger.info('classic pricing done') self.make_modern_monoline() process_memory() logger.info('modern monoline done') self.make_ad_comps() process_memory() logger.info('ad comps done') self.apply_distortions(dnet, dgross) process_memory() logger.info('apply distortions done') self.make_bodoff_comparison() process_memory() logger.info('Bodoff exhibits done') @staticmethod def extract_image(p, ref, w=100, caption=''): import base64 if caption == '': caption = 'some made up bullshit' data_uri = base64.b64encode(p.read_bytes()).decode('utf-8') img_tag = f''' <figure> <img src="data:image/png;base64,{data_uri}" alt="Figure {ref}" style="width:{w}%"> <figcaption>{caption}</figcaption> </figure> ''' return img_tag
[docs]def extract_sort_order(summaries, _varlist_, classical=False): ''' Pull out exhibits. Note difference: classical uses net_classical, calibrated to roe non-classical uses pricing calibrated to ''' parts = ['gross'] if classical: parts.append('net_classical') else: parts.append('net') bit = pd.concat([summaries[k] for k in parts], keys=['gross', 'net'], axis=1) ceded_name = 'diff' bit[('ceded', ceded_name)] = bit[('gross', 'total')] - bit[('net', 'total')] bit.loc['LR', ('ceded', ceded_name)] = bit.loc['L', ('ceded', ceded_name)].to_numpy() / bit.loc[ 'P', ('ceded', ceded_name)].to_numpy() bit.loc['ROE', ('ceded', ceded_name)] = bit.loc['M', ('ceded', ceded_name)].to_numpy() / bit.loc[ 'Q', ('ceded', ceded_name)].to_numpy() bit.loc['PQ', ('ceded', ceded_name)] = bit.loc['P', ('ceded', ceded_name)].to_numpy() / bit.loc[ 'Q', ('ceded', ceded_name)].to_numpy() df = bit.loc[(slice(None), _varlist_), :].copy() df['dm1'] = [CaseStudy._stats_.index(i) for i in df.index.get_level_values(0)] df['dm2'] = [_varlist_.index(i) for i in df.index.get_level_values(1)] df = df.sort_values(['dm1', 'dm2']) df = df.drop(columns=['dm1', 'dm2']) return df
[docs]def g_ins_stats(axi, dn0, dist, ls='-'): """ Six part plot with EL, premium, loss ratio, profit, layer ROE and P:S for g axi = axis iterator """ g = dist.g N = 1000 ps = np.linspace(1 / (2 * N), 1, N, endpoint=False) gs = g(ps) # ['Premium, $g(s)$', 'Loss Ratio, $s/g(s)$', 'Markup, $g(s)/s$', # 'Margin, $g(s)-s$', 'Discount Rate, $(g(s)-s)/(1-s)$', 'Premium Leverage, $g(s)/(1-g(s))$']): dn = {'ph': "PH", 'wang': "Wang", 'dual': "Dual", 'ccoc': 'CCoC', 'tvar': 'TVaR', 'blend': 'Blend'}[dn0] if dn0 == 'blend': dn = 'Blend' elif dn0 == 'ccoc': # TODO HORRIBLE dn = f'{dn}, 0.10' else: dn = f'{dn}, {dist.shape:.3f}' # FIG: six panel display for i, ys, key in zip(range(1, 7), [gs, ps / gs, gs / ps, gs - ps, (gs - ps) / (1 - ps), gs / (1 - gs)], ['Premium', 'Loss ratio', 'Markup', 'Margin', 'Discount rate', 'Premium leverage']): ax = next(axi) if i == 1: ax.plot(ps, ys, ls=ls, label=dn) else: ax.plot(ps, ys, ls=ls) if i in [1, 2, 4]: ax.plot(ps, ps, color='C2', linewidth=0.25) ax.set(title=key) if i == 3 or i == 6: ax.axis([-0.025, 1.025, 0, 5]) ax.set(aspect=1 / 5) else: ax.axis([-0.025, 1.025, -0.025, 1.025]) ax.set(aspect=1) if i == 1: ax.legend(loc='lower right')
[docs]class ClassicalPremium(object): """ manage classical premium examples Net, no loading Expected value, constant loading Maximum loss --> no VaR (as proxy for maximum loss) Variance Std Dev Semi-variance (Artzner p. 210) Exponential (zero utility, convex!) Esscher Originally in hack.py """ __classical_methods__ = ['Expected Value', 'VaR', 'Variance', 'Standard Deviation', 'Semi-Variance', 'Exponential', 'Esscher', 'Dutch', 'Fischer'] def __init__(self, ports, calibration_premium): """ set up and calibrate pricing methods """ self.ports = ports self.calibration_premium = calibration_premium
[docs] def distribution(self, port_name, line_name): """ classical methods all depend on the distribution...so pull it out pull the object that will provide q etc. pull the audit stats """ df = self.ports[port_name].density_df.filter(regex=f'p_{line_name}|loss'). \ rename(columns={f'p_{line_name}': 'p'}) df['F'] = df.p.cumsum() if line_name == 'total': ob = self.ports[port_name] else: ob = self.ports[port_name][line_name] stats = self.ports[port_name].audit_df.T[line_name] mn = stats['EmpEX1'] var = stats['EmpEX2'] - stats['EmpEX1'] ** 2 sd = var ** 0.5 return df, ob, stats, mn, var, sd
[docs] def calibrate(self, port_name, line_name, calibration_premium, df=None, ob=None, stats=None, mn=None, var=None, sd=None): """ calibrate all methods... """ from scipy.optimize import newton self.calibration_premium = calibration_premium # gather inputs if df is None: df, ob, stats, mn, var, sd = self.distribution(port_name, line_name) ans = {} ans['Expected Value'] = calibration_premium / mn - 1 ans['VaR'] = float(ob.cdf(calibration_premium)) ans['Variance'] = (calibration_premium - mn) / var ans['Standard Deviation'] = (calibration_premium - mn) / sd for method in self.__classical_methods__: if method not in ans: if method in ['Exponential', 'Esscher']: x0 = 1e-10 elif method == 'TVaR': x0 = ans['VaR'] - 0.025 else: x0 = 0.5 try: a = newton(lambda x: self.price(x, port_name, line_name, method, df, ob, stats, mn, var, sd) - calibration_premium, x0=x0) ans[method] = float(a) except RuntimeError as e: print(method, e) return ans
[docs] def prices(self, port_name, line_name, method_dict): """ run lots of prices """ df, ob, stats, mn, var, sd = self.distribution(port_name, line_name) ans = {} for method, param in method_dict.items(): ans[method] = self.price(param, port_name, line_name, method, df, ob, stats, mn, var, sd) return ans
[docs] def price(self, param, port_name, line_name, method, df=None, ob=None, stats=None, mn=None, var=None, sd=None): """ apply method to port_name, line_name with parameter(s) (all one param) these are all classical methods method_dict = {method name : param } param = float | [float param, p ge 1 value] latter fro Fischer method """ if df is None: df, ob, stats, mn, var, sd = self.distribution(port_name, line_name) if method == 'Expected Value': return mn * (1 + param) if method == 'VaR': return ob.q(param) if method == 'TVaR': return ob.tvar(param) if method == 'Variance': return mn + param * var if method == 'Standard Deviation': return mn + param * sd if method == 'Semi-Variance': sd_plus = np.sum(np.maximum(0, df.loss - mn) ** 2 * df.p) return mn + param * sd_plus if method == 'Exponential': # (1/k) ln E[e^kX] eax = np.sum(np.exp(param * df.loss) * df.p) return (1 / param) * np.log(eax) if method == 'Esscher': # E[Xe^aX] / E[e^aX] eax = np.sum(np.exp(param * df.loss) * df.p) exax = np.sum(df.loss * np.exp(param * df.loss) * df.p) return exax / eax if method == 'Dutch': excess = np.sum(np.maximum(df.loss - mn, 0) * df.p) return mn + param * excess if method == 'Fischer': # remember you must set p! excess = np.sum(np.maximum(df.loss - mn, 0) ** self.p * df.p) ** (1 / self.p) return mn + param * excess
[docs] def pricing_exhibit(self, port_name, line_name, calibration_premium, re_line): """ calibrate and apply to all other portfolios """ df, ob, stats, mn, var, sd = self.distribution(port_name, line_name) parameters = self.calibrate(port_name, line_name, calibration_premium, df, ob, stats, mn, var, sd) df = pd.DataFrame({(pfn, ln): pd.Series(self.prices(pfn, ln, parameters)) for pfn in self.ports.keys() for ln in self.ports['gross'].line_names_ex}) df[('ceded', re_line)] = df[('gross', re_line)] - df[('net', re_line)] el = pd.Series(np.hstack((self.ports['gross'].audit_df['EmpMean'].values, self.ports['net'].audit_df['EmpMean'].values, mn - self.ports['net'].audit_df.loc[re_line, 'EmpMean'])), index=df.columns) df = pd.concat((df, el / df.replace(0, np.nan)), keys=['P', 'LR'], names=['stat', 'view', 'line'], axis=1) return df
[docs] def illustrate(self, port_name, line_name, ax, margin, *, p=0, K=0, n_big=10000, n_sample=25, show_bounds=True, padding=2): ''' illustrate simulations at p level probability probability level determines capital or capital K input margin: premium = (1 + margin) * EL, margin = rho n_big = number of policies - max of horizontal axis n_sample = number of iterations to plot Theoretic bounds use the actual moments, simulated use those from the process being estimated From common_scripts Pentagon took out re-computation... ''' if line_name == 'total': print('Error - cannot use total...') raise ValueError() # ag = self.ports[port_name] else: ag = self.ports[port_name][line_name] if p and K == 0: self.ruin, self.u, self.mean, self._dfi = ag.cramer_lundberg(margin, kind='interpolate', padding=padding) K = self.u(p) # print(f'Using K={K}') elif K == 0: raise ValueError('Must input one of K and p') ea = np.sum(ag.density_df.loss * ag.density_df.p) ans = [(-1, ea)] ns = np.arange(n_big) prems = K + (1 + margin) * ea * ns means = K + margin * ea * ns # transparency of lines... alpha = min(1, 50 / n_sample) bombs = 0 for i in range(n_sample): samp = ag.density_df.loss.sample( n=n_big, weights=ag.density_df.p, replace=True) ans.append((i, np.mean(samp), np.max(samp))) ser = prems - samp.cumsum() idzero = np.argmax(ser <= 0) if idzero > 0: ser.iloc[idzero:] = ser.iloc[idzero] rc = np.random.rand() ax.plot(ns, ser, ls='-', lw=0.5, color=(rc, rc, rc), alpha=alpha) if idzero: ax.plot(ns[idzero], ser.iloc[idzero], 'x', ms=5) bombs += 1 ax.plot(ns, means, 'k-', linewidth=2) ef = EngFormatter(3, True) ax.set(xlabel='Number of risks', ylabel=f'Cumulative capital, $u_0={ef(K)}$') # LIL bounds lln = np.nan_to_num(np.log(abs(np.log(ns)))) lln[lln < 0] = 0 if show_bounds: var_a = (ag.agg_m * ag.agg_cv) ** 2 lb = means - np.sqrt(2.0 * var_a * ns * lln) ub = means + np.sqrt(2.0 * var_a * ns * lln) ax.plot(ns, lb, '--', lw=1) ax.plot(ns, ub, '--', lw=1) # titles etc. # title = f'{ag.name:s} at {margin:.3f} margin\nFailure rate: {bombs} / {n_sample} = {bombs / n_sample:.3f}' title = f'{bombs}/{n_sample} = {bombs / n_sample:.3f} ruins' # if p: # title += f' (p={p:.3f})' ax.set(title=title) # make xy plot # ax.spines['right'].set_visible(False) # ax.spines['top'].set_visible(False) # ax.spines['bottom'].set_position(('data', 0)) # ax.spines['left'].set_position(('data', 0)) return pd.DataFrame(ans, columns=['n', 'ea', 'min'])
[docs]def macro_market_graphs(axi, port, dn, rp): """ Create more succinct 4x4 graph to illustrate macro market structure and result, LR, P:S and ROE, etc. Use a port object with calibrated distortion see ch04_macro_market_stats_original from: common_scripts.py ch04_macro_market_stats updated line colors June 2022: removed all color='k' :param dist: :param rp: return period :param sigma: :return: """ dist = port.dists[dn] g = dist.g dn = {'ph': "PH", 'wang': "Wang", 'dual': "Dual", 'ccoc': 'CCoC', 'tvar': 'TVaR', 'blend': 'Blend'}[dn] # figure assets and basic stats A = port.q(1 - 1 / rp) xs = port.density_df.loss.to_numpy() sxs = port.density_df.S.to_numpy() gsxs = g(sxs) h = xs[1] el = cumtrapz(sxs, dx=h, initial=0) prem = cumtrapz(gsxs, dx=h, initial=0) # marginal layer statistics # be careful about divide by zero eps = np.finfo(float).eps iota = np.where(gsxs > 1 - eps, np.inf, (gsxs - sxs) / (1 - gsxs)) nu = np.where(np.isinf(iota), np.nan, 1 / (1 + iota)) delta = 1 - nu # delta = 1 - nu lr = sxs / gsxs leverage = gsxs / (1 - gsxs) # cumulative statistics, from the bottom to the limit eps = A * eps * 100 ciota = np.where(prem > xs - eps, np.inf, (prem - el) / (xs - prem)) cnu = np.where(np.isinf(ciota), np.nan, 1 / (1 + ciota)) clr = np.where(prem == 0, np.nan, el / prem) cdelta = 1 - cnu a = next(axi) a.plot(xs, prem, '-', label='Prem') a.plot(xs, el, '--', label='Loss') a.set(title=dn) # f'$\\bar P$ and $\\bar S$, {dn}') a.legend(loc='lower right') a = next(axi) a.plot(xs, prem, '-', label='premium') a.plot(xs, xs, '-', lw=.75, label='assets') a.plot(xs, xs - prem, '-.', label='capital') a.set(title='$a, \\bar P$ and $\\bar Q$') a.legend(loc='upper left') a = next(axi) a.plot(xs, clr, '-', label='cumul') a.plot(xs, lr, ':', label='layer') a.set(ylim=[-0.01, 1.01], title='Loss ratio') a.legend(loc='lower right') a = next(axi) a.plot(xs, cdelta, '-', label='cumul') a.plot(xs, delta, ':', label='layer') a.set(ylim=[-0.01, 1.01], title='Discount rate $\\delta$') a.legend(loc='upper right')
[docs]def pricing(port, p, roe, as_dataframe=True): """ Make nice stats output for pricing from common_scripts.py """ a = port.q(p) loss = port.density_df.loc[a, 'lev_total'] v = 1 / (1 + roe) d = 1 - v premium = loss * v + d * a q = a - premium ans = pd.Series([loss, premium - loss, premium, loss / premium, q, (premium - loss) / q, a, premium / q], index=['L', 'M', 'P', 'LR', 'Q', 'ROE', 'a', 'PQ']) if as_dataframe: df = pd.DataFrame(ans, columns=['total']) df.index.name = 'stat' return df else: return ans
[docs]def bivariate_density_plots(axi, ports, xmax, contour_scale, biv_log=True, cmap='viridis', levels=30, color_bar=False): """ bivarate plots of each line against the others for each portfolio in case arguments as for twelve_plot / bivden plot axi = iterator with enough exes ports = iterable of ports (list, dict.values(), etc.) from common_scripts.py """ min_density = 1e-15 # careful about origin when big prob of zero loss # handle for discrete distributions for port in ports: # this can fail to create enough points (e.g. big log2), you lose resolution at left hand end # min_loss, max_loss, jump = 0, xmax, (2 ** (port.log2 - 8)) * port.bs # this version ensures you get a reasonable number of points .... you still have to pick xmax carefullly xmax = port.snap(xmax) min_loss, max_loss, jump = 0, xmax, port.snap(xmax / 255) ps = [f'p_{i}' for i in port.line_names] title = port.name query = ' or '.join([f'`p_{i}` > 0' for i in port.line_names]) nobs = port.density_df.query(query).shape[0] nlines = len(port.line_names) if nobs < 512: logger.info('Contour plot has few points...going discrete...') bit = port.density_df.query(query) n = len(bit) for i, j in product(range(nlines), range(nlines)): if i < j: a13 = next(axi) Z = bit[ps[j]].to_numpy().reshape(n, 1) @ bit[ps[i]].to_numpy().reshape(1, n) X, Y = np.meshgrid(bit.index, bit.index) norm = mpl.colors.Normalize(vmin=-10, vmax=np.log10(np.max(Z.flat))) cm = mpl.cm.ScalarMappable(norm=norm, cmap='viridis') mapper = cm.to_rgba a13.scatter(x=X.flat, y=Y.flat, s=1000 * Z.flatten(), c=mapper(np.log10(Z.flat))) # edgecolor='C2', lw=1, facecolors='none') a13.set(xlim=[min_loss - (max_loss - min_loss) / 10, max_loss], ylim=[min_loss - (max_loss - min_loss) / 10, max_loss]) else: npts = np.arange(min_loss, max_loss, jump) bit = port.density_df.loc[npts, ps] n = len(bit) for i, j in product(range(nlines), range(nlines)): if i < j: a13 = next(axi) Z = bit[ps[j]].to_numpy().reshape(n, 1) @ bit[ps[i]].to_numpy().reshape(1, n) Z = Z / np.sum(Z) X, Y = np.meshgrid(bit.index, bit.index) if biv_log: z = np.log10(Z) mask = np.zeros_like(z) mask[z == -np.inf] = True mz = np.ma.array(z, mask=mask) cp = a13.contourf(X, Y, mz, levels=np.linspace(-17, 0, levels), cmap=cmap) if color_bar: cb = a13.figure.colorbar(cp) cb.set_label('Log10(Density)') else: mask = np.zeros_like(Z) mask[Z < min_density] = True mz = np.ma.array(Z, mask=mask) cp = a13.contourf(X, Y, mz, levels=levels, cmap=cmap) if color_bar: cb = a13.figure.colorbar(cp) cb.set_label('Density') a13.set(xlim=[min_loss, max_loss], ylim=[min_loss, max_loss]) # put in X+Y=c lines lines = np.arange(contour_scale / 4, 2 * contour_scale + 1, contour_scale / 4) logger.debug(f'Contour lines based on {contour_scale} gives {lines}') for x in lines: a13.plot([0, x], [x, 0], ls='solid', lw=.35, c='k', alpha=0.5, label=f'Sum = {x:,.0f}') a13.set(xlabel=f'Line {port.line_names[i]}', ylabel=f'Line {port.line_names[j]}', title=title, aspect=1)
def process_memory(show_process=False): # memory usage in GB and process id process = psutil.Process(os.getpid()) mem_info = process.memory_info() m, p = mem_info.rss, process.pid mu = m // 100000000 m = m / (1 << 30) if show_process: logger.info(f'Process id = {p}\nMemory usage = {m:.3f}GB: |' + '=' * mu) else: logger.info(f'Memory usage = {m:.3f}GB: |' + '=' * mu) class ManualRenderResults(): APPNAME = 'Pricing Insurance Risk' def __init__(self, case_object): """ Create local HTML page for the results datasets. Based on pricinginsurancerisk templates. Uses pricinginsurancerisk.com css file. """ self.templates = Path(agg.__file__).parent / 'templates' self.case_object = case_object self.env = Environment(loader=FileSystemLoader(self.templates), autoescape=(['html', 'xml'])) @staticmethod def now(): return 'Created {date:%Y-%m-%d %H:%M:%S.%f}'.format(date=datetime.now()).rstrip('0') def render(self): """ Render all the exhibits. To render the book exhibits only (without the extended exhibits) run .render_custom('[A-Y]') :return: """ self.render_custom('[A-Y]', suffix='book') self.render_custom('Z.*', suffix='extended') def render_custom(self, *argv, suffix='custom'): """ Render a custom list of exhibits using standard glob file expansion. The book exhibits are A-Y with Tn and Tg net and gross. All extended exhibits are Z... :param argv: :return: """ base_dir0 = Path.home() / 'aggregate/cases' base_dir = base_dir0 / self.case_object.case_id if base_dir.exists() is False: raise ValueError('{self.case_id} directory not found') # this is the actual content blobs = [] ids = [] for pattern in argv: for p in sorted(base_dir.glob('*.html')): if re.match(pattern, p.stem): ids.append(p.stem) blobs.append(p.read_text(encoding='utf-8')) desc = [f'<p>{self.case_object.case_description}</p>'] spec = self.case_object.to_dict() desc.append(f'<p>Distributions</p><pre>{spec["a_distribution"]}') desc.append(f'{spec["b_distribution_gross"]}') desc.append(f'{spec["b_distribution_net"]}</pre>') desc.append(f'<p>Other parameters: ') # desc.append('<ul>') desc.append(f'reg_p = {spec["reg_p"]}, ') desc.append(f'roe = {spec["roe"]}, ') desc.append(f'd2tc = {spec["d2tc"]}, ') desc.append(f's_values = {spec["s_values"]}, ') desc.append(f'gs_values = {spec["gs_values"]}, ') desc.append(f'f_discrete = {spec["f_discrete"]}, ') desc.append(f'log2 = {spec["log2"]}, ') desc.append(f'bs = {spec["bs"]}, and') desc.append(f'padding = {spec["padding"]}.</p>') # desc.append('</ul>') desc = '\n'.join(desc) # menu bar items template = self.env.get_template('results_extended.html') h = template.render(title=self.APPNAME, case_name=self.case_object.case_name, case_id=self.case_object.case_id, case_description=desc, subpage='extended', blobs=blobs, ids=ids, timestamp=self.now(), templates=self.templates ) # rebase images, entered in snippets as /{case_id}/... # other rebasing issues removed because we use the custom installed template h = h.replace(f'src="/{self.case_object.case_id}', f'src="{self.case_object.case_id}') p = Path.home() / f'aggregate/cases/{self.case_object.case_id}_{suffix}.html' p.write_text(h, encoding='utf-8') logger.info(f'Rendered {len(blobs)} exhibits and plots.')