Source code for aggregate.parser

# For historical interest, the journey to sorting out the parser ran as follows:
#
# 0. Base: 12 SR conflicts
# 1. Reinstated UMINUS: 25SR/15RR! Rejected rule
# 2. Removed percent: 25/14
# 3. MINUS "(" expr ")" %prec UMINUS --> MINUS expr %prec UMINUS: 25/14
# 4. protoexpr introduced as the first level of decoding a NUMBER, UMINUS, percent reinstated: 25/14
# 5. UMINUS at expression level: 25/14
# 6. Use same symbol but different precenence for scale and loc: 33/14?!!
# 7. UMINUS at proto_expression level: 25/14
# 8. Python style math (atom, power, factor, term, sum) (retains same symbol for LOC/SCALE): 29SR/NONE!
# 9. Removed percent made no difference....reinstated but made highest priority
# 10. Removed EXP and () 23 SR XXXX (26 with (),)
# 11. Removed SPECIFIED (23)
# 12. Removed name exposures layers builtin_sev; builtin_sev->sev and then use the sev rule (23)
# 13. builtin_sevs are defined by a dictionary...once looked up they are no different from regular sevs, so all special code removed... 23SR
# 14. EXP and () reinstated, 26 SR
# 15. Intrdouced sev_clause (includes sev and dsev) 29SR [if you try dfreq sev_clause you get 39SR] ...going with 39
# 16. Put LOW prec in reduced to 36...
# 17. Issue was driven by cases with optional arguments. Need to give the optional (reduce) case lower weight.
# 18. parameters to severity...
#
# Issue with scalar x RV + const and pulling out the parameters. If you allow 2 + 3 * lognorm it will never
# work with the same character. Hence need @. Similarly for #.
# zero param sevs are a problem too.
#
# Zero parameter severities did not work. YOu must enter at least one parameter, but it is ignored.
#
# Calculator is more bother than it is worth... keep exp, ** and /, but drop everything else (use f strings!)
# Result has SR conflicts but it parses all the test programs
#
# June 2023 have 21 shift/reduce conflicts.
# 20 of them are five groups of four: EXP, (, NUMBER, INFINITY
# The remaining one is [ around dfreq
#
# July 2023 changes
# make atom ** factor into factor ** factor so that (1 / 3) ** (3 /4) works
# splice


import logging
import numpy as np
from numpy import exp
from pathlib import Path
import re
from . sly import Lexer, Parser

logger = logging.getLogger(__name__)

DEBUGFILE = Path.home() / 'aggregate/parser/parser.out'


[docs]class UnderwritingLexer(Lexer): """ Implements the Lexer for the agg language. """ tokens = {ID, BUILTIN_AGG, BUILTIN_SEV,NOTE, SEV, AGG, PORT, NUMBER, # INFINITY, PLUS, MINUS, TIMES, DIVIDE, INHOMOG_MULTIPLY, LOSS, PREMIUM, AT, LR, CLAIMS, EXPOSURE, RATE, XS, PICKS, DISTORTION, CV, WEIGHTS, EQUAL_WEIGHT, XPS, SPLICE, MIXED, FREQ, TWEEDIE, ZM, ZT, NET, OF, CEDED, TO, OCCURRENCE, AGGREGATE, PART_OF, SHARE_OF, TOWER, AND, EXPONENT, EXP, DFREQ, DSEV, RANGE } ignore = ' \t,\\|' literals = {'[', ']', '!', '(', ')'} # per manual, need to list longer tokens before shorter ones # simple but effective notes NOTE = r'note\{[^\}]*\}' BUILTIN_AGG = r'agg\.[a-zA-Z][a-zA-Z0-9._:~\-]*' BUILTIN_SEV = r'sev\.[a-zA-Z][a-zA-Z0-9._:~\-]*' FREQ = 'binomial|pascal|poisson|bernoulli|geometric|fixed|neyman(a|A)?|logarithmic|negbin' DISTORTION = 'dist(ortion)?' # number regex including unary minus; need before MINUS else that grabs the minus sign in -3 etc. # includes inf, -inf and percents NUMBER = r'\-?(\d+\.?\d*|\d*\.\d+)([eE](\+|\-)?\d+)?%?|\-?inf' # do use _ in unit names as part of portfolios. Can use ~ or . or : instead: # why? because p_ is used and _ is special # on honor system...really need two types of ID, it is OK in a portfolio name ID = r'[a-zA-Z][\._:~a-zA-Z0-9\-]*' EXPONENT = r'\^|\*\*' PLUS = r'\+' MINUS = r'\-' TIMES = r'\*' DIVIDE = '/' INHOMOG_MULTIPLY = '@' EQUAL_WEIGHT = '=' RANGE = ':' ID['occurrence'] = OCCURRENCE ID['aggregate'] = AGGREGATE ID['exposure'] = EXPOSURE ID['tweedie'] = TWEEDIE ID['premium'] = PREMIUM ID['tower'] = TOWER ID['mixed'] = MIXED ID['picks'] = PICKS ID['prem'] = PREMIUM ID['claims'] = CLAIMS ID['splice'] = SPLICE ID['ceded'] = CEDED ID['claim'] = CLAIMS ID['dfreq'] = DFREQ ID['dsev'] = DSEV ID['loss'] = LOSS ID['port'] = PORT ID['rate'] = RATE ID['net'] = NET ID['sev'] = SEV ID['agg'] = AGG ID['xps'] = XPS ID['wts'] = WEIGHTS ID['and'] = AND ID['exp'] = EXP ID['at'] = AT ID['cv'] = CV ID['lr'] = LR ID['xs'] = XS ID['of'] = OF ID['to'] = TO ID['po'] = PART_OF ID['so'] = SHARE_OF ID['zm'] = ZM ID['zt'] = ZT @_(r'\n+') def newline(self, t): self.lineno += t.value.count('\n') def error(self, t): logger.error(f"Illegal character '{t.value[0]:s}'") self.index += 1
[docs] @staticmethod def preprocess(program): """ Separate preprocessor step, allowing it to be called separately. Preprocessing involves six steps: 1. Remove // comments, through end of line 2. Remove \\n in [ ] (vectors) that appear from using ``f'{np.linspace(...)}'`` 3. Backslash (line continuation) mapped to space 4. \\n\\t is replaced with space, supporting the tabbed indented Portfolio layout 5. Split on newlines :param program: :return: """ # handle \n in vectors; first item is outside, then inside... (multidimensional??) out_in = re.split(r'\[|\]', program) assert len(out_in) % 2 # must be odd odd = [t.replace('\n', ' ') for t in out_in[1::2]] # replace inside [] even = out_in[0::2] # otherwise, pass through # reassemble program = ' '.join([even[0]] + [f'[{o}] {e}' for o, e in zip(odd, even[1:])]) # remove comments C++-style // or # comments # must replace comments before changing other \ns program = re.sub(r'(//|#)[^\n]*$', r'\n', program, flags=re.MULTILINE) # preprocessing: line continuation; \n\t or \n____ to space (for port agg element indents), # ; to new line, split on new line program = program.replace('\\\n', ' '). replace('\n\t', ' ').replace('\n ', ' ') # split program into lines, only accept len > 0 program = [i.strip() for i in program.split('\n') if len(i.strip()) > 0] return program
[docs]class UnderwritingParser(Parser): """ Implements the Parser for the agg language. Here are testers for the math expressions:: from aggregate import build for t in ['-123', '-2%', '45%', '1e-3%', 'inf', '-inf', 'exp(1)', 'exp(1/2)', 'exp(-1)', '-1/8', 'exp(10)/exp(3**2/2)', '2**10', '50/exp(.3**2/2)', '1/exp(1.9**2 / 2)']: a = build(t) print(a.name) assert float(a.name) == eval(t.replace('%', '/100').replace('exp', 'np.exp').replace('inf', 'np.inf')) To test on the test_suite:: df = build.run_test_suite() assert len(df.query('error != 0')) == 0 """ expected_shift_reduce = 16 # Set this to the number of expected shift/reduce conflicts debugfile = None # uncomment to write detailed grammar rules # debugfile = Path.home() / 'aggregate/parser/parser.out' if debugfile is not None: debugfile.parent.mkdir(parents=True, exist_ok=True) # this won't have been created the first time this runs in a clean environment, hence: tokens = UnderwritingLexer.tokens precedence = ( ('nonassoc', LOW), # used to force shift in rules ('nonassoc', INHOMOG_MULTIPLY), ('left', PLUS, MINUS), ('left', TIMES), # for scaling distributions ('nonassoc', DIVIDE), # for internal math in expressions; nonassoc means 1/2/3 causes an error, force parens ('right', EXP), # exponential function ('right', EXPONENT), ) def __init__(self, safe_lookup_function, debug=False): self.debug = debug # self.reset() # instance of uw class to look up severities self.safe_lookup = safe_lookup_function def logger(self, msg, p): if self.debug is False: return nm = p._namemap sl = p._slice ans = [] for i, (k, v) in enumerate(nm.items()): # breaks out the parts; sl is a tuple of parse states rhs = v(sl, i) ans.append(f'[{i}] {k}={rhs!s}') ans = "; ".join(ans) logger.info(f'{msg:20s}\t{ans}')
[docs] @staticmethod def enhance_debugfile(f_out=''): """ Put links in the parser.out debug file, if DEBUGFILE != ''. :param f_out: Path or filename of output. If "" then DEBUGFILE.html used. :return: """ if DEBUGFILE == '': return if f_out == '': f_out = DEBUGFILE.with_suffix('.html') else: f_out = Path(f_out) txt = Path(DEBUGFILE).read_text(encoding='utf-8') txt = txt.replace('Grammar:\n', '<h1>Grammar:</h1>\n\n<pre>\n').replace('->', '<-') txt = re.sub(r'^Rule ([0-9]+)', r'<div id="rule_\1" />Rule \1', txt, flags=re.MULTILINE) txt = re.sub(r'^state ([0-9]+)$', r'<div id="state_\1" /><b>state \1</b>', txt, flags=re.MULTILINE) txt = re.sub(r'^ \(([0-9]+)\) ', r' <a href="#rule_\1">Rule (\1)</a> ', txt, flags=re.MULTILINE) txt = re.sub(r'go to state ([0-9]+)', r'go to <a href="#state_\1">state (\1)</a>', txt, flags=re.MULTILINE) txt = re.sub(r'using rule ([0-9]+)', r'using <a href="#rule_\1">rule (\1)</a>', txt, flags=re.MULTILINE) txt = re.sub(r'in state ([0-9]+)', r'in <a href="#state_\1">state (\1)</a>', txt, flags=re.MULTILINE) f_out.write_text(txt + '\n</pre>', encoding='utf-8')
@staticmethod def _check_vectorizable(value): """ Check the value can be vectorized. """ if isinstance(value, (float, int, np.ndarray)): return value else: return np.array(value) # final answer exit points ================================= @_('sev_out') def answer(self, p): self.logger( f'answer <-- sev_out, created severity {p.sev_out[1]}', p) return p.sev_out @_('agg_out') def answer(self, p): self.logger( f'answer <-- agg_out, created aggregate {p.agg_out[1]}', p) return p.agg_out @_('port_out') def answer(self, p): self.logger(f'answer <-- port_out, created portfolio {p.port_out[1]}', p) return p.port_out @_('distortion_out') def answer(self, p): self.logger(f'answer <-- distortion_out, created distortion {p.distortion_out[1]} ', p) return p.distortion_out @_('expr') def answer(self, p): self.logger(f'expr_out <-- expr {p.expr} ', p) return 'expr', f'{p.expr}', p.expr # making distortions ====================================== @_('DISTORTION name ID expr') def distortion_out(self, p): self.logger('distortion_out <-- DISTORTION ID name', p) # self.out_dict[("distortion", p.name)] = return 'distortion', p.name, {'name': p.ID, 'shape': p.expr } @_('DISTORTION name ID expr "[" numberl "]"') def distortion_out(self, p): self.logger('distortion_out <-- DISTORTION name ID [ numberl ]', p) # for bitvars etc. TODO apply edit to ID to check it is bitvar? # self.out_dict[('distortion', p.name)] = return 'distortion', p.name, {'name': p.ID, 'shape': p.expr, 'df': p.numberl } # building portfolios ====================================== @_('PORT name note agg_list') def port_out(self, p): self.logger( f'port_out <-- PORT name note agg_list', p) # self.out_dict[("port", p.name)] = return 'port', p.name, {'spec': p.agg_list, 'note': p.note} @_('agg_list agg_out') def agg_list(self, p): self.logger(f'agg_list <-- agg_list, agg_out', p) p.agg_list.append(p.agg_out) return p.agg_list # building aggregates ====================================== @_('agg_out') def agg_list(self, p): self.logger(f'agg_list <-- agg_out', p) return [p.agg_out] # simplify agg out with sev_clause @_('AGG name exposures layers sev_clause occ_reins freq agg_reins note') def agg_out(self, p): self.logger( f'agg_out <-- AGG name exposures layers SEV sev occ_reins freq agg_reins note', p) # self.out_dict[("agg", p.name)] = return 'agg', p.name, {'name': p.name, **p.exposures, **p.layers, **p.sev_clause, **p.occ_reins, **p.freq, **p.agg_reins, 'note': p.note} @_('AGG name dfreq layers sev_clause occ_reins agg_reins note') def agg_out(self, p): self.logger( f'agg_out <-- AGG name dfreq layers sev_clause occ_reins agg_reins note', p) # self.out_dict[("agg", p.name)] = return 'agg', p.name, {'name': p.name, **p.dfreq, **p.layers, **p.sev_clause, **p.occ_reins, **p.agg_reins, 'note': p.note} @_('AGG name TWEEDIE expr expr expr note') def agg_out(self, p): self.logger('agg_out <-- AGG name TWEEDIE expr expr expr note', p) # Tweedie distribution in mean, p, sigma^2 (dispersion) format (MUST be mean first!!) # variance function is sigma^2 mean^p # phi = sigma^2 in Jorgenson p. 127 notation # p = (2 + a)/(a + 1) to a = (2 - p)/(p - 1) # lambda = mu^(2-p) / ((2-p) sigma^2) # beta = lambda alpha / mu # if not here then relative import fails when you run the program to pring the grammar from .utilities import tweedie_convert mu = p[3] pp = p[4] sig2 = p[5] ans = tweedie_convert(p=pp, μ=mu, σ2=sig2) alpha = ans['α'] lam = ans['λ'] beta = ans['β'] # originally # alpha = (2 - pp) / (pp - 1) # lam = mu ** (2 - pp) / ((2 - pp) * sig2) # beta = lam * alpha / mu dout = {'name': p.name, 'exp_en': lam, 'freq_name': 'poisson', 'sev_name': 'gamma', 'sev_a': alpha, 'sev_scale': beta, 'note': f'Tw(p={pp}, μ={mu}, σ^2={sig2}) --> CP(λ={lam:8g}, ga(α={alpha:.8g}, β={beta:.8g}), ' f'scale={beta:.8g}'} # self.out_dict[('agg', p.name)] = dout return 'agg', p.name, dout @_('AGG name builtin_agg occ_reins agg_reins note') def agg_out(self, p): # for use when you change the agg and/or want a new name self.logger( f'agg_out <-- AGG name builtin_aggregate note', p) # rename; NOTE!! the code below will overwrite the new name! del p.builtin_agg['name'] return 'agg', p.name, {'name': p.name, **p.builtin_agg, **p.occ_reins, **p.agg_reins, 'note': p.note} @_('builtin_agg agg_reins note') def agg_out(self, p): # no change to the builtin agg, allows agg.A as a legitimate agg (called A) self.logger( f'agg_out <-- builtin_agg agg_reins note', p) # print(p.builtin_agg) # self.out_dict[("agg", p.builtin_agg['name'])] = return 'agg', p.builtin_agg['name'], {**p.builtin_agg, **p.agg_reins, 'note': p.note} # building severities ====================================== # difference from sev_clause (below) is sev_out has a name @_('SEV name sev note') def sev_out(self, p): self.logger( f'sev_out <-- sev name sev note ', p) p.sev['name'] = p.name p.sev['note'] = p.note # self.out_dict[("sev", p.name)] = p.sev return 'sev', p.name, p.sev @_('SEV name dsev note') def sev_out(self, p): self.logger( f'sev_out <-- sev name dsev note ', p) p.dsev['name'] = p.name p.dsev['note'] = p.note # self.out_dict[("sev", p.name)] = p.dsev return 'sev', p.name, p.dsev # frequency term =========================================== # for all frequency distributions claim count is determined by exposure / severity # EXCEPT for dfreq (and old EMPIRICAL) where it is entered # only freq shape parameters need be entered at the end # one and two parameter mixing distributions @_('freq ZM expr') def freq(self, p): self.logger('freq <-- freq ZM expr', p) f = p.freq f['freq_zm'] = True f['freq_p0'] = p.expr return f @_('freq ZT') def freq(self, p): self.logger('freq <-- freq ZT', p) f = p.freq f['freq_zm'] = True f['freq_p0'] = 0.0 return f @_('MIXED ID expr expr') def freq(self, p): self.logger( f'freq <-- MIXED ID {p.ID} expr expr', p) return {'freq_name': p.ID, 'freq_a': p[2], 'freq_b': p[3]} @_('MIXED ID expr') def freq(self, p): self.logger( f'freq <-- MIXED ID {p.ID} expr', p) return {'freq_name': p.ID, 'freq_a': p.expr} @_('FREQ expr expr') def freq(self, p): self.logger( f'freq <-- FREQ {p.FREQ} expr expr', p) if p.FREQ != 'pascal': logger.warning( f'Illogical choice of frequency {p.FREQ}, expected pascal') return {'freq_name': p.FREQ, 'freq_a': p[1], 'freq_b': p[2]} # binomial p @_('FREQ expr') def freq(self, p): self.logger( f'freq <-- FREQ expr {p.FREQ}', p) # one parameter distributions if p.FREQ not in ['binomial', 'neyman', 'neymana', 'neymanA', 'negbin']: logger.warning( f'Illogical choice of frequency {p.FREQ}, expected binomial or neyman A') return {'freq_name': p.FREQ, 'freq_a': p.expr} @_('FREQ') def freq(self, p): self.logger( f'freq <-- FREQ {p.FREQ} (zero param distributions)', p) # zero parameter distributions if p.FREQ not in ('poisson', 'bernoulli', 'fixed', 'geometric', 'logarithmic'): logger.error( f'Illogical choice for FREQ {p.FREQ}, should be poisson, bernoulli, geometric, logarithmic or fixed.') return {'freq_name': p.FREQ} # agg reins clause ======================================== @_('AGGREGATE NET OF reins_list') def agg_reins(self, p): self.logger(f'agg_reins <-- AGGREGATE NET OF reins_list', p) return {'agg_reins': p.reins_list, 'agg_kind': 'net of'} @_('AGGREGATE CEDED TO reins_list') def agg_reins(self, p): self.logger(f'agg_reins <-- AGGREGATE CEDED TO reins_list', p) return {'agg_reins': p.reins_list, 'agg_kind': 'ceded to'} @_(" %prec LOW") def agg_reins(self, p): self.logger('agg_reins <-- missing agg reins', p) return {} # occ reins clause ======================================== @_('OCCURRENCE NET OF reins_list') def occ_reins(self, p): self.logger(f'occ_reins <-- OCCURRENCE NET OF reins_list', p) return {'occ_reins': p.reins_list, 'occ_kind': 'net of'} @_('OCCURRENCE CEDED TO reins_list') def occ_reins(self, p): self.logger(f'occ_reins <-- OCCURRENCE CEDED TO reins_list', p) return {'occ_reins': p.reins_list, 'occ_kind': 'ceded to'} @_("") def occ_reins(self, p): self.logger('occ_reins <-- missing occ reins', p) return {} # reinsurance clauses ==================================== @_('reins_list AND reins_clause') def reins_list(self, p): self.logger(f'reins_list <-- reins_list AND reins_clause', p) p.reins_list.append(p.reins_clause) return p.reins_list @_('reins_clause') def reins_list(self, p): self.logger(f'reins_list <-- reins_clause becomes reins_list', p) return [p.reins_clause] @_('tower') def reins_list(self, p): # would be dumb if it only contained one layer self.logger( f'reins_clause <-- tower', p) limit = p.tower[0] attach = p.tower[1] return [(1.0, l, a) for l, a in zip(limit, attach)] @_('expr XS expr') def reins_clause(self, p): self.logger( f'reins_clause <-- expr XS expr {p[0]} xs {p[2]}', p) return (1.0, p[0], p[2]) @_('expr SHARE_OF expr XS expr') def reins_clause(self, p): self.logger( f'reins_clause <-- expr SHARE_OF expr XS expr {p[0]} s/o {p[2]} xs {p[4]}', p) # here expr is the proportion...always store as a proportion return (p[0], p[2], p[4]) @_('expr PART_OF expr XS expr') def reins_clause(self, p): self.logger( f'reins_clause <-- expr PART_OF expr XS expr {p[0]} p/o {p[2]} xs {p[4]}', p) # here expr is the currency amount of cover if p[0] / p[2] < 0.05: logger.warning( f'Part of clause with proportion {p[0] / p[2]} is suspiciously small. ' 'Did you mean share of?') return (p[0] / p[2], p[2], p[4]) # severity term ============================================ # %prec LOW removed @_('SEV sev') def sev_clause(self, p): return p.sev @_('dsev') def sev_clause(self, p): return p.dsev @_('BUILTIN_SEV') def sev_clause(self, p): # when the builtin does not need adjusting self.logger(f'sev_clause <-- BUILTIN_SEV ({p.BUILTIN_SEV})', p) built_in_dict = self.safe_lookup(p.BUILTIN_SEV) if 'name' in built_in_dict: del built_in_dict['name'] return built_in_dict @_('sev picks') def sev(self, p): self.logger(f'sev <-- sev picks', p) return {**p.sev, **p.picks} @_('dsev "!"') def dsev(self, p): self.logger(f'dsev <-- unconditional (conditional=False) flag set', p) p.dsev['sev_conditional'] = False return p.dsev @_('sev "!"') def sev(self, p): self.logger(f'sev <-- unconditional (conditional=False) flag set', p) p.sev['sev_conditional'] = False return p.sev @_('sev2 weights splice') def sev(self, p): self.logger( f'sev <-- sev1 weights splice', p) p.sev2['sev_wt'] = p.weights p.sev2['sev_lb'] = p.splice['sev_lb'] p.sev2['sev_ub'] = p.splice['sev_ub'] return p.sev2 @_('sev1 PLUS numbers', 'sev1 MINUS numbers') def sev2(self, p): self.logger(f'sev2 <-- sev1 {p[1]} numbers', p) p.sev1['sev_loc'] = UnderwritingParser._check_vectorizable( p.sev1.get('sev_loc', 0)) sign = 1 if p[1]=='+' else -1 p_numbers = UnderwritingParser._check_vectorizable(p.numbers) p.sev1['sev_loc'] += sign * p_numbers return p.sev1 @_('sev1') def sev2(self, p): self.logger(f'sev2 <-- sev1', p) return p.sev1 @_('numbers TIMES sev0') def sev1(self, p): self.logger(f'sev1 <-- numbers TIMES sev0', p) p_numbers = UnderwritingParser._check_vectorizable(p.numbers) if 'sev_mean' in p.sev0: p.sev0['sev_mean'] = UnderwritingParser._check_vectorizable( p.sev0.get('sev_mean', 0)) p.sev0['sev_mean'] *= p_numbers # only scale if there is a scale (otherwise you double count) if 'sev_scale' in p.sev0: p.sev0['sev_scale'] = UnderwritingParser._check_vectorizable( p.sev0.get('sev_scale', 0)) p.sev0['sev_scale'] *= p_numbers if 'sev_mean' not in p.sev0: # e.g. Pareto has no mean and it is important to set the scale # but if there is a mean it handles the scaling and setting scale will # confuse the distribution maker p.sev0['sev_scale'] = p_numbers # if there is a location it needs to scale too --- that's a curious choice! if 'sev_loc' in p.sev0: p.sev0['sev_loc'] = UnderwritingParser._check_vectorizable( p.sev0['sev_loc']) p.sev0['sev_loc'] *= p_numbers # logger.error(str(p.sev0)) return p.sev0 @_('sev0') def sev1(self, p): self.logger(f'sev1 <-- sev0', p) return p.sev0 @_('ids numbers CV numbers') def sev0(self, p): self.logger( f'sev0 <-- ids numbers CV numbers', p) return {'sev_name': p.ids, 'sev_mean': p[1], 'sev_cv': p[3], 'sev_scale': 1.0} @_('ids numbers numbers') def sev0(self, p): self.logger( f'sev0 <-- ids numbers numbers', p) # two parameters for shape...must specify scale somehow. put in default scale as 1 return {'sev_name': p.ids, 'sev_a': p[1], 'sev_b': p[2], 'sev_scale': 1.0} @_('ids numbers') def sev0(self, p): self.logger( f'sev0 <-- ids numbers', p) return {'sev_name': p.ids, 'sev_a': p[1], 'sev_scale': 1.0} # no weights with xps terms @_('ids xps') def sev0(self, p): self.logger(f'sev0 <-- ids xps (ids should be (c|d)histogram) or zero param (xps is none)', p) return {'sev_name': p.ids, **p.xps} @_('ids') def sev0(self, p): # for norm expon uniform levy, zero parameter severities # need to make sure there is a scale self.logger( f'sev0 <-- ids, zero parameter severity {p.ids}', p) return {'sev_name': p.ids, 'sev_scale': 1.0} @_('XPS doutcomes dprobs') def xps(self, p): self.logger('xps <-- XPS doutcomes dprobs', p) if len(p.dprobs) == 0: ps = np.ones_like(p.doutcomes) / len(p.doutcomes) else: ps = p.dprobs return {'sev_xs': p.doutcomes, 'sev_ps': ps} @_('DSEV doutcomes dprobs') def dsev(self, p): self.logger('dsev <-- DSEV doutcomes dprobs', p) # need to check probs has been populated if len(p.dprobs) == 0: ps = np.ones_like(p.doutcomes) / len(p.doutcomes) else: ps = p.dprobs return {'sev_name': 'dhistogram', 'sev_xs': p.doutcomes, 'sev_ps': ps} @_('DFREQ doutcomes dprobs') def dfreq(self, p): self.logger('dfreq <-- DFREQ doutcomes dprobs', p) # need to check probs has been populated if len(p.dprobs) == 0: b = np.ones_like(p.doutcomes) / len(p.doutcomes) else: b = p.dprobs return {'freq_name': 'empirical', 'freq_a': p.doutcomes, 'freq_b': b, 'exp_en': -1} @_('PICKS "[" numberl "]" "[" numberl "]"') def picks(self, p): self.logger('picks <-- PICKS "[" numberl "]" "[" numberl "]"', p) return {'sev_pick_attachments': p[2], 'sev_pick_losses': p[5]} # never valid for this to be a single number not in [], using this # format rather than numbers enforces an actual list @_('"[" numberl "]"') def doutcomes(self, p): self.logger('doutcomes <-- [numberl] (must be a list)', p) a = self._check_vectorizable(p.numberl) return a @_('"[" expr RANGE expr "]"') def doutcomes(self, p): self.logger('doutcomes <-- [expr : expr]', p) return np.arange(p[1], p[3] + 1) @_('"[" expr RANGE expr RANGE expr "]"') def doutcomes(self, p): self.logger('doutcomes <-- [expr : expr : expr]', p) return np.arange(p[1], p[3] + 0.5 * p[5], p[5]) # see note above doutcomes @_('"[" numberl "]"') def dprobs(self, p): self.logger('dprobs <-- [numberl] (must be a list)', p) a = self._check_vectorizable(p.numberl) return a @_('') def dprobs(self, p): self.logger('dprobs <-- missing dprobs term', p) return [] @_('WEIGHTS EQUAL_WEIGHT expr') def weights(self, p): self.logger( f'weights <-- WEIGHTS EQUAL_WEIGHTS expr ', p) return np.ones(int(p.expr)) / p.expr # force weights to be a vector @_('WEIGHTS "[" numberl "]"') def weights(self, p): self.logger(f'weights <-- WEIGHTS [numberl]', p) return p.numberl @_('') def weights(self, p): self.logger('weights <-- missing weights term', p) return 1. @_('SPLICE "[" numberl "]" "[" numberl "]"') def splice(self, p): self.logger(f'splice <-- SPLICE [numberl] [numberl]', p) # explicitly enter lower and upper bounds for each splice # neded for mixed EM / Pareto example in Albrecher return {'sev_lb': p[2], 'sev_ub': p[5]} @_('SPLICE "[" numberl "]"') def splice(self, p): self.logger(f'splice <-- SPLICE [numberl]', p) return {'sev_lb': p.numberl[:-1], 'sev_ub': p.numberl[1:]} @_('') def splice(self, p): self.logger('splice <-- missing splice term', p) # not sure best return value; weights returns 1 # return {'sev_lb': [0], 'sev_ub': [np.inf]} return {'sev_lb': 0., 'sev_ub': np.inf} # layer terms, optional ==================================== @_('numbers XS numbers') def layers(self, p): self.logger( f'layers <-- numbers XS numbers', p) return {'exp_attachment': p[2], 'exp_limit': p[0]} @_('tower') def layers(self, p): self.logger( f'layers <-- tower', p) return {'exp_attachment': p.tower[1], 'exp_limit': p.tower[0]} @_('') def layers(self, p): self.logger('layers <-- missing layer term', p) return {} @_('TOWER doutcomes') def tower(self, p): # doutcomes allows a list, range, or range with step self.logger(f'tower <-- tower doutcomes', p) breaks = p.doutcomes # do not want this. it means net == 0 and ceded== gross in total which # is rarely what you want. User can put in themselves. # if breaks[0] != 0: # breaks = np.hstack((0., breaks)) # if not np.isinf(breaks[-1]): # breaks = np.hstack((breaks, np.inf)) limits = np.diff(breaks) attach = breaks[:-1] # logger.info('\n'.join([f'{x} xs {y}' for x, y in zip(limits, attach)])) return [limits, attach] # optional note =========================================== @_('NOTE') def note(self, p): self.logger(f'note <-- NOTE', p) return p.NOTE[5:-1] @_(" %prec LOW") def note(self, p): self.logger("note <-- missing note term", p) return '' # exposures ================================================ @_('numbers CLAIMS') def exposures(self, p): self.logger(f'exposures <-- numbers CLAIMS', p) return {'exp_en': p.numbers} @_('numbers LOSS') def exposures(self, p): self.logger(f'exposures <-- numbers LOSS', p) return {'exp_el': p.numbers} @_('numbers PREMIUM AT numbers LR') def exposures(self, p): self.logger( f'exposures <-- numbers PREMIUM AT numbers LR', p) return {'exp_premium': p[0], 'exp_lr': p[3], 'exp_el': np.array(p[0]) * np.array(p[3])} @_('numbers EXPOSURE AT numbers RATE') def exposures(self, p): self.logger(f'exposures <-- numbers EXPOSURE AT numbers RATE', p) return {'exp_premium': p[0], 'exp_lr': p[3], 'exp_el': np.array(p[0]) * np.array(p[3])} # ID ======================================================= @_('"[" idl "]"') def ids(self, p): self.logger(f'ids <-- [idl]', p) return p.idl @_('idl ID') def idl(self, p): self.logger(f'idl <-- idl ID ({p.ID})', p) p.idl.append(p.ID) return p.idl @_('ID') def idl(self, p): self.logger(f'idl <-- ID ({p.ID})', p) ans = [p.ID] self.logger(f'idl <-- ID', p) return ans @_('ID') def ids(self, p): self.logger(f'ids <-- ID ({p.ID})', p) return p.ID # elements made from named portfolios ======================== @_('expr INHOMOG_MULTIPLY builtin_agg') def builtin_agg(self, p): """ inhomogeneous change of scale """ self.logger( f'builtin_agg <-- expr INHOMOG_MULTIPLY builtin_agg', p) bid = p.builtin_agg.copy() bid['name'] += '_i_scaled' bid['exp_en'] = self._check_vectorizable(bid.get('exp_en', 0)) * p.expr bid['exp_el'] = self._check_vectorizable(bid.get('exp_el', 0)) * p.expr bid['exp_premium'] = self._check_vectorizable(bid.get('exp_premium', 0)) * p.expr return bid @_('expr TIMES builtin_agg') def builtin_agg(self, p): """homogeneous change of scale """ self.logger('builtin_agg <-- expr TIMES builtin_agg', p) # bid = built_in_dict, want to be careful not to add scale too much bid = p.builtin_agg bid['name'] += '_homog_scaled' if 'sev_mean' in bid: bid['sev_mean'] = self._check_vectorizable(bid['sev_mean']) * p.expr if 'sev_scale' in bid: bid['sev_scale'] = self._check_vectorizable(bid['sev_scale']) * p.expr if 'sev_loc' in bid: bid['sev_loc'] = self._check_vectorizable(bid['sev_loc']) * p.expr bid['exp_attachment'] = self._check_vectorizable(bid.get('exp_attachment', 0)) * p.expr bid['exp_limit'] = self._check_vectorizable(bid.get('exp_limit', np.inf)) * p.expr bid['exp_el'] = self._check_vectorizable(bid.get('exp_el', 0)) * p.expr bid['exp_premium'] = self._check_vectorizable(bid.get('exp_premium', 0)) * p.expr return bid @_('builtin_agg PLUS expr', 'builtin_agg MINUS expr') def builtin_agg(self, p): """ translation (shift, change location) by expr :param p: :return: """ self.logger('builtin_agg <-- builtin_agg PLUS expr', p) # bid = built_in_dict, want to be careful not to add scale too much bid = p.builtin_agg bid['name'] += '_shifted' sign = 1 if p[1]=="+" else -1 # TODO make vector addable if 'sev_loc' in bid: bid['sev_loc'] += sign * p.expr else: bid['sev_loc'] = sign * p.expr return bid @_('BUILTIN_AGG') def builtin_agg(self, p): # ensure lookup only happens here self.logger(f'builtin_agg <-- BUILTIN_AGG ({p.BUILTIN_AGG})', p) built_in_dict = self.safe_lookup(p.BUILTIN_AGG) return built_in_dict @_('BUILTIN_SEV') def sev(self, p): # ensure lookup only happens here # unlike aggs, will never just say sev.A # usage: agg A 1 claim sev sev.B fixed; a little awkward but not used much # leaving it here allos for subsequent scaling and translation # if it is directly a sev_clause it cannot be adjusted self.logger(f'sev <-- BUILTIN_SEV ({p.BUILTIN_SEV})', p) built_in_dict = self.safe_lookup(p.BUILTIN_SEV) if 'name' in built_in_dict: del built_in_dict['name'] return built_in_dict # ids ========================================================= @_('ID') def name(self, p): self.logger(f'name <-- ID = {p.ID}', p) return p.ID # vectors of numbers ========================================== @_('"[" numberl "]"') def numbers(self, p): self.logger(f'numbers <-- [numberl]', p) return p.numberl # allow range notation in numbers @_('"[" expr RANGE expr "]"') def numbers(self, p): self.logger('numbers <-- [expr : expr]', p) return np.arange(p[1], p[3] + 1) @_('"[" expr RANGE expr RANGE expr "]"') def numbers(self, p): self.logger('numbers <-- [expr : expr : expr]', p) return np.arange(p[1], p[3] + 1, p[5]) @_('numberl expr') def numberl(self, p): self.logger( f'numberl <-- numberl expr (adding {p.expr} to list {p.numberl})', p) p.numberl.append(p.expr) return p.numberl @_('expr') def numberl(self, p): self.logger(f'numberl <-- expr', p) ans = [p.expr] return ans @_('expr') def numbers(self, p): self.logger('numbers <-- expr', p) return p.expr # @_('term') # def expr(self, p): # self.logger('expr <-- term', p) # return p.term # # @_('term DIVIDE factor') # def term(self, p): # self.logger('term <-- term / factor', p) # return p.term / p.factor # # @_('factor') # def term(self, p): # self.logger('term <-- factor', p) # return p.factor # # @_('"(" term ")"') # def factor(self, p): # return p.term # # @_('EXP "(" term ")"') # def factor(self, p): # return exp(p.term) # # @_('power') # def factor(self, p): # self.logger('factor <-- power', p) # return p.power # # @_('factor EXPONENT factor') # def power(self, p): # self.logger('power <-- factor EXPONENT factor', p) # return p[0] ** p[2] # # @_('atom') # def power(self, p): # self.logger('power <-- atom', p) # return p.atom @_('atom') def expr(self, p): self.logger('expr <-- atom', p) return p.atom @_('atom DIVIDE atom') def atom(self, p): self.logger('atom <-- atom / atom', p) return p[0] / p[2] @_('"(" atom ")"') def atom(self, p): self.logger('atom <-- (atom)', p) return p.atom @_('EXP atom') def atom(self, p): self.logger('atom <-- EXP atom', p) return exp(p.atom) @_('atom EXPONENT atom') def atom(self, p): self.logger('atom <-- atom EXPONENT atom', p) return p[0] ** p[2] @_('NUMBER') def atom(self, p): self.logger(f'atom <-- NUMBER, {p.NUMBER}', p) if p.NUMBER.endswith('%'): t = float(p.NUMBER[:-1]) / 100 elif p.NUMBER == "inf": t = np.inf elif p.NUMBER == "-inf": t = -np.inf else: t = float(p.NUMBER) return t
[docs] def error(self, p): if p: raise ValueError(p) else: raise ValueError('Unexpected end of file')
[docs]def grammar(add_to_doc=False, save_to_fn=''): """ Write the grammar at the top of the file as a docstring To work with multi-rules enter them on one line, like so:: @_('builtin_agg PLUS expr', 'builtin_agg MINUS expr') :param add_to_doc: add the grammar to the docstring :param save_to_fn: save the grammar to a file """ pout = Path(__file__).parent / '../docs/4_agg_language_reference/ref_include.rst' # get the grammar from the top of the file txt = Path(__file__).read_text(encoding='utf-8') stxt = txt.split('@_') ans = {} # 3:-3 get rid of junk at top and bottom (could change if file changes) for it in stxt[3:-3]: if it.find('# def') >= 0: # skip rows with a comment between @_ and def pass else: b = it.split('def') b0 = b[0].strip()[2:-2] # check if multirule if ', ' in b0: b0 = [i.replace("'", '') for i in b0.split(', ')] else: b0 = [b0] try: b1 = b[1].split("(self, p):")[0].strip() except: logger.error(f'Unexpected multirule behavior {it}') exit() if b1 in ans: ans[b1] += b0 else: ans[b1] = b0 s = '' for k, v in ans.items(): s += f'{k:<20s}\t::= {v[0]:<s}\n' for rhs in v[1:]: s += f'{" "*20}\t | {rhs:<s}\n' s += '\n' # finally add the language words # this is a bit manual, but these shouldnt change much... # lang_words = '\n\nlanguage words go here\n\n' lang_words = '''FREQ ::= 'binomial|poisson|bernoulli|pascal|geometric|neymana?|fixed|logarithmic|negbin' BUILTINID ::= 'sev|agg|port|meta.ID' NOTE ::= 'note{TEXT}' EQUAL_WEIGHT ::= "=" AGG ::= 'agg' AGGREGATE ::= 'aggregate' AND ::= 'and' AT ::= 'at' CEDED ::= 'ceded' CLAIMS ::= 'claims|claim' CONSTANT ::= 'constant' CV ::= 'cv' DFREQ ::= 'dfreq' DSEV ::= 'dsev' EXP ::= 'exp' EXPONENT ::= '^|**' INHOMOG_MULTIPLY ::= "@" INFINITY ::= 'inf|unlim|unlimited' LOSS ::= 'loss' LR ::= 'lr' MIXED ::= 'mixed' NET ::= 'net' OCCURRENCE ::= 'occurrence' OF ::= 'of' PART_OF ::= 'po' PERCENT ::= '%' PORT ::= 'port' PREMIUM ::= 'premium|prem' SEV ::= 'sev' SHARE_OF ::= 'so' TO ::= 'to' WEIGHTS ::= 'wts|wt' XPS ::= 'xps' XS ::= "xs|x" ''' s += lang_words # create for docs in one file (that gets included by rst) if add_to_doc is True: pout.write_text(s, encoding='utf-8') # save to user folder grammar if save_to_fn == '': save_to_fn = Path.home() / 'aggregate/parser/grammar.md' Path(save_to_fn).write_text(s, encoding='utf-8') return s
if __name__ == '__main__': # print the grammar and add to this file as part of docstring in 41_language_reference.rst grammar(add_to_doc=True) UnderwritingParser.enhance_debugfile()