Source code for aggregate.underwriter

# from collections import namedtuple
from copy import deepcopy
import logging
import numpy as np
import pandas as pd
from pathlib import Path
import re
# from IPython.display import HTML, display
# from inspect import signature

from .constants import *
from .portfolio import Portfolio
from .distributions import Aggregate, Severity
from .spectral import Distortion
from .parser import UnderwritingLexer, UnderwritingParser
from .utilities import (logger_level, round_bucket, Answer,
                        LoggerManager, qd, show_fig, more,
                        parse_note_ex)

logger = logging.getLogger(__name__)


# rejected: immutable
# WriteAnswer = namedtuple('WriteAnswer', ['kind', 'name', 'spec', 'program', 'object'])


[docs]class Underwriter(object): """ The ``Underwriter`` class manages the creation of Aggregate and Portfolio objects, and maintains a database of standard Severity (curves) and Aggregate (unit or line level) objects called the knowledge base. - Handles persistence to and from agg files - Is interface into program parser - Handles safe lookup from the knowledge for parser Objects have a kind and a name. The kind is one of 'sev', 'agg' or 'port'. The name is a string. They have a representation as a program. When the program is interpreted it produces a dictionary spec that can be used to create the object. The static method factory can create any object from the (kind, name, spec, program) quartet, though, strictly, program is not needed. The underwriter knowledge is stored in a dataframe indexed by kind and name with columns spec and program. """
[docs] def __init__(self, name='Rory', databases=None, update=False, log2=10, debug=False): """ Create an underwriter object. The underwriter is the interface to the knowledge base of the aggregate system. It is the interface to the parser and the interpreter, and to the database of curves, portfolios and aggregates. :param name: name of underwriter. Defaults to Rory, after Rory Cline, the best underwriter I know and a supporter of an analytic approach to underwriting. :param databases: name or list of database files to read in on creation. if None: nothing loaded; if 'default' (installed) or 'site' (user, in ~/aggregate/databases) database \\*.agg files in default or site directory are loaded. If 'all' both default and site databases loaded. A string refers to a single database; an interable of strings is also valid. See `read_database` for search path. :param update: if True, update database files with new objects. :param log2: log2 of number of buckets in discrete representation. 10 is 1024 buckets. :param debug: if True, print debug messages. """ self.name = name self.update = update if log2 <= 0: raise ValueError( 'log2 must be > 0. The number of buckets used equals 2**log2.') self.log2 = log2 self.debug = debug self._lexer = None self._parser = None # make sure all database entries are stored; they are read on demand self.databases = [] if databases is None else databases # stop pyCharm complaining # do not read in until needed for faster loading self._default_dir = None self._site_dir = None self._case_dir = None self._template_dir = None self._knowledge = pd.DataFrame(columns=['kind', 'name', 'spec', 'program'], dtype=object).set_index( ['kind', 'name']) # ensure description prints correctly. A bit cheaky. pd.set_option('display.max_colwidth', 100)
@property def lexer(self): if self._lexer is None: self._lexer = UnderwritingLexer() return self._lexer @property def parser(self): if self._parser is None: self._parser = UnderwritingParser(self.safe_lookup, self.debug) return self._parser @property def default_dir(self): # default_dir is installed by pip and contain installation files if self._default_dir is None: self._default_dir = Path(__file__).parent / 'agg' self._default_dir.mkdir(parents=True, exist_ok=True) return self._default_dir @property def site_dir(self): # site dir is in Users's home directory and stores their files if self._site_dir is None: self._site_dir = Path.home() / 'aggregate/databases' self._site_dir.mkdir(parents=True, exist_ok=True) return self._site_dir @property def case_dir(self): # case dir is in Users's home directory and stores their files if self._case_dir is None: self._case_dir = Path.home() / 'aggregate/cases' # check case dir exists self._case_dir.mkdir(parents=True, exist_ok=True) return self._case_dir @property def template_dir(self): # template dir is in Users's home directory and stores their files if self._template_dir is None: self._template_dir = self.default_dir.parent / 'templates' self._template_dir.mkdir(parents=True, exist_ok=True) return self._template_dir def read_databases(self): if self.databases is None: # nothing to do self.databases = [] elif self.databases == 'all': self.databases = ['default', 'site'] elif type(self.databases) == str: self.databases = [self.databases] if 'default' in self.databases: # add all databases in default_dir self.databases.remove('default') self.databases.extend(self.default_dir.glob('*.agg')) if 'site' in self.databases: # add all user databases self.databases.remove('site') self.databases.extend(self.site_dir.glob('*.agg')) # actually read databases for fn in self.databases: self.read_database(fn)
[docs] def read_database(self, fn): """ read database of curves, aggs and portfolios. These can live in the default directory that is part of the instalation or ~/aggregate/ fn can be a string filename, with or without extension. A .agg extension is added if there is no suffix. Search path: * in the current dir * in site_dir (user) * in default_dir (installation) :param fn: database file name """ p = Path(fn) if p.suffix == '': p = p.with_suffix('.agg') if p.exists(): db_path = p elif (self.site_dir / p).exists(): db_path = self.site_dir / fn elif (self.default_dir / p).exists(): db_path = self.default_dir / p else: logger.error(f'Database {fn} not found. Ignoring.') return try: program = db_path.read_text(encoding='utf-8') except Exception as e: logger.error( f'Error reading requested database {db_path.name}. Ignoring.') else: # read in, parse, save to sev/agg/port dictionaries # throw away answer...not creating anything # get rid of cosmetic spaces, but keep newline tabs (2 or more spaces) program = re.sub('^ +', '\t', program, flags=re.MULTILINE) program = re.sub(' +', ' ', program) logger.info(f'Reading database {fn}...') n = len(self._knowledge) self.interpret_program(program) n = len(self._knowledge) - n logger.info( f'Database {fn} read into knowledge, adding {n} entries.')
def __getitem__(self, item): """ handles self[item] item = 'Name' for all objects called Name item = ("kind", "name") for object of kind kind called name subscriptable: try user portfolios, b/in portfolios, line, severity to access specifically use severity or line methods :param item: :return: """ # much less fancy version: if not isinstance(item, (str, tuple)): raise ValueError( f'item must be a str (name of object) or tuple (kind, name), not {type(item)}.') assert self.knowledge is not None try: if type(item) == str: # name == item, any type rows = self._knowledge.xs( item, axis=0, level=1, drop_level=False) elif type(item) == tuple: # return a dataframe row rows = self._knowledge.loc[[item]] except KeyError: raise KeyError(f'Item {item} not found.') except TypeError as e: # TODO fix this "TypeError: unhashable type: 'slice'" raise KeyError(f'getitem TypeError looking for {item}, {e}') else: if len(rows) == 1: kind, name, spec, program = rows.reset_index().iloc[0] return Answer(kind=kind, name=name, spec=spec, program=program, object=None) else: raise KeyError( f'Error: no unique object found matching {item}. Found {len(rows)} objects.') def __repr__(self): import aggregate s = [] s.append(f'underwriter {self.name}') s.append(f'version {aggregate.__version__}') s.append(f'knowledge {len(self.knowledge)} programs') s.append(f'update {self.update}') for k in ['log2', 'debug']: s.append(f'{k:<19s}{getattr(self, k)}') s.append(f'validation_eps {VALIDATION_EPS}') sd = self.site_dir.resolve().relative_to(Path.home()) sd = f'~/{sd}' dd = self.default_dir.resolve() try: dd = dd.relative_to(Path.home()) dd = f'~/{dd}' except ValueError: dd = str(dd) s.append(f'site dir {sd}') s.append(f'default dir {dd}') s.append('') s.append('help') s.append('build.knowledge list of all programs') s.append('build.qshow(pat) show programs matching pattern') s.append('build.show(pat) build and display matching pattern') return '\n'.join(s)
[docs] def factory(self, answer): """ Create object of kind from spec, a dictionary. Creating from uw obviously needs the uw, so this is NOT a staticmethod! :param answer: an Answer class with members kind, name, spec, and program :return: creates answer.object """ kind, name, spec, program, obj = answer.values() if obj is not None: logger.error( f'Surprising: obj from Answer not None, type {type(obj)}. It will be overwritten.') if kind == 'agg': obj = Aggregate(**spec) obj.program = program elif kind == 'port': # Portfolio expects name, agg_list, uw # agg list is a list of spects that can be passed to Aggregate # need to drop the 'agg', name before the spec that gets returned # by the parser. Hence: agg_list = [k for i, j, k in spec['spec']] obj = Portfolio(name, agg_list, uw=self) obj.program = program elif kind == 'sev': if 'sev_wt' in spec and spec['sev_wt'] != 1: logger.log(WL, f'Mixed severity cannot be created, returning spec. You had {spec["sev_wt"]}, expected 1') obj = None else: obj = Severity(**spec) # ? set outside... obj.program = program elif kind == 'distortion': obj = Distortion(**spec) # ? set outside obj.program = program else: ValueError(f'Cannot build {kind} objects') # update the object answer['object'] = obj return answer
@property def knowledge(self): if len(self._knowledge) == 0 and len(self.databases) > 0: # knowledge - accounts and line known to the underwriter self.read_databases() return self._knowledge.sort_index()[['program', 'spec']] @property def version(self): import aggregate return aggregate.__version__ def test_suite(self): f = self.default_dir / 'test_suite.agg' txt = f.read_text(encoding='utf-8') return txt @property def test_suite_file(self): """ Return the test_suite filename, or None if it does not exist """ f = self.default_dir / 'test_suite.agg' if f.exists(): return f else: return None
[docs] def write(self, portfolio_program, log2=0, bs=0, update=None, **kwargs): """ Write a natural language program. Write carries out the following steps. 1. Read in the program and cleans it (e.g. punctuation, parens etc. are removed and ignored, replace ; with new line etc.) 2. Parse line by line to create a dictionary definition of sev, agg or port objects. 3. Replace sev.name, agg.name and port.name references with their objects. 4. If update set, update all created objects. Sample input:: port MY_PORTFOLIO agg Line1 20 loss 3 x 2 sev gamma 5 cv 0.30 mixed gamma 0.4 agg Line2 10 claims 3 x 2 sevgamma 12 cv 0.30 mixed gamma 1.2 agg Line 3100 premium at 0.4 3 x 2 sev 4 @ lognormal 3 cv 0.8 fixed 1 The indents are required if each agg item appears on a new line. See parser for full language spec! See Aggregate class for many examples. :param log2: :param bs: :param portfolio_program: :param update: override class default :param kwargs: passed to object's update method if ``update==True`` :return: single created object or dictionary name: object """ # prepare for update # what / how to do; little awkward: to make easier for user have to strip named update args # out of kwargs if update is None: update = self.update if update is True and log2 == 0: log2 = self.log2 # first see if portfolio_program refers to a built-in object try: # calls __getitem__ answer = self[portfolio_program] except (LookupError, TypeError): logger.debug( f'underwriter.write | object not found, processing as a program.') else: logger.debug(f'underwriter.write | {answer.kind} object found.') answer = self.factory(answer) if update: answer.object.update(log2, bs, **kwargs) # rationalize return to be the same as parsed programs # TODO test this code return [answer] # if you fall through to here then the portfolio_program did not refer to a built-in object # run the program, get the interpreter return value, the irv, which contains kind/name->spec,program irv = self.interpret_program(portfolio_program) rv = [] for answer in irv: # create objects and update if needed answer = self.factory(answer) if answer not in irv: logger.error('OK THAT FAILED' * 20) if answer.object is not None: # this can fail for named mixed severities, which can only # be created in context of an agg... that behaviour is # useful for named severities though... hence: if update: update_method = getattr(answer.object, 'update', None) if update_method is not None: update_method(log2, bs, **kwargs) rv.append(answer) # report on what has been done if rv is None: logger.log(WL, f'Program did not contain any output') else: if len(rv): logger.info(f'Program created {len(rv)} objects.') # return created objects return rv
[docs] def write_from_file(self, file_name, log2=0, bs=0, update=False, **kwargs): """ Read program from file. Delegates to write. :param file_name: :param log2: :param bs: :param update: :param kwargs: :return: """ portfolio_program = Path(file_name).read_text(encoding='utf-8') return self.write(portfolio_program, log2=log2, bs=bs, update=update, **kwargs)
[docs] def interpret_program(self, portfolio_program): """ Preprocess and then parse a program one line at a time. Each output is stored in the Underwriter's knowledge database. No objects are created. Error handling through parser. :param portfolio_program: :return: """ # Preprocess --------------------------------------------------------------------- portfolio_program = self.lexer.preprocess(portfolio_program) # create return value list rv = [] # Parse and Postprocess----------------------------------------------------------- # self.parser.reset() # program_line_dict = {} for program_line in portfolio_program: logger.debug(program_line) # preprocessor only returns lines of length > 0 try: # parser returns the type, name, and spec of the object # this is where you can marry up with the program kind, name, spec = self.parser.parse( self.lexer.tokenize(program_line)) except ValueError as e: if isinstance(e.args[0], str): logger.error(e) raise e else: t = e.args[0].type v = e.args[0].value i = e.args[0].index txt2 = program_line[0:i] + f'>>>' + program_line[i:] logger.error( f'Parse error in input "{txt2}"\nValue {v} of type {t} not expected') raise e else: # store in uw dictionary and create if needed logger.info( f'answer out: {kind} object {name} parsed successfully...adding to knowledge') self._knowledge.loc[(kind, name), :] = [spec, program_line] rv.append(Answer(kind=kind, name=name, spec=spec, program=program_line, object=None)) return rv
[docs] def safe_lookup(self, buildinid): """ Lookup buildinid=kind.name in uw to find expected kind and merge safely into self.arg_dict. Different from getitem because it splits the item into kind and name and double checks you get the expected kind. :param buildinid: a string in kind.name format :return: """ # allow for sev.WC.1 name kind, *name = buildinid.split('.') name = '.'.join(name) try: # lookup in Underwriter answer = self[(kind, name)] found_kind, found_name, spec, program, _ = answer.values() except LookupError as e: logger.error(f'ERROR id {kind}.{name} not found in the knowledge.') raise e logger.debug( f'UnderwritingParser.safe_lookup | retrieved {kind}.{name} as type {found_kind}.{found_name}') if found_kind != kind: raise ValueError( f'Error: type of {name} is {found_kind}, not expected {kind}') # don't want to pass back the original otherwise changes can be reflected in the knowledge spec = deepcopy(spec) return spec
[docs] @staticmethod def logger_level(level): """ Convenience function. :param level: :return: """ # set logger_level for all aggregate loggers logger_level(level)
[docs] def build(self, program, update=None, log2=0, bs=0, recommend_p=RECOMMEND_P, logger_level=None, **kwargs): """ Convenience function to make work easy for the user. Intelligent auto updating. Detects discrete distributions and sets ``bs = 1``. ``build`` method sets loger level to 30 by default. ``__call__`` is set equal to ``build``. :param program: :param update: build's update :param log2: 0 is default: Estimate log2 for discrete and self.log2 for all others. Inupt value over-rides and cancels discrete computation (good for large discrete outcomes where bucket happens to be 1.) :param bs: :param logger_level: temporary log(ger) level for this build :param recommend_p: passed to recommend bucket functions. Increase (closer to 1) for thick tailed distributions. :param kwargs: passed to update, e.g., padding. Note force_severity=True is applied automatically :return: created object(s) """ # automatically puts level back at the end if logger_level is not None: lm = LoggerManager(logger_level) # make stuff # write will return a dict with keys (kind, name) and value a WriteAnswer namedtuple rv = self.write(program, update=False, force_severity=True) if rv is None or len(rv) == 0: logger.log(WL, 'build produced no output') return None if update is None: update = self.update # in this loop bs_ and log2_ are the values actually used for each update; # they do not overwrite the input default values for answer in rv: if answer.object is None: # object not created logger.info(f'Object {answer.name} of kind {answer.kind} returned as ' 'a spec; no further processing.') elif isinstance(answer.object, Aggregate) and update is True: # try to guess good defaults d = answer.spec # extract hints from note string log2, bs, recommend_p, kwargs = parse_note_ex( d['note'], log2, bs, recommend_p, kwargs) if d['sev_name'] == 'dhistogram' and log2 == 0: bs_ = 1 # how big? if d['freq_name'] == 'fixed': max_loss = np.max(d['sev_xs']) * d['exp_en'] elif d['freq_name'] == 'empirical': max_loss = np.max(d['sev_xs']) * max(d['freq_a']) elif d['freq_name'] == 'bernoulli': # allow for max loss to occur max_loss = np.max(d['sev_xs']) else: # normal approx on count max_loss = np.max( d['sev_xs']) * d['exp_en'] * (1 + 3 * d['exp_en'] ** 0.5) # binaries are 0b111... len-2 * 2 is len - 1 log2_ = len(bin(int(max_loss))) - 1 logger.info(f'({answer.kind}, {answer.name}): Discrete mode, ' 'using bs=1 and log2={log2_}') else: if log2 == 0: log2_ = self.log2 else: log2_ = log2 if bs == 0: bs_ = round_bucket( answer.object.recommend_bucket(log2_, p=recommend_p)) else: bs_ = bs logger.info( f'({answer.kind}, {answer.name}): Normal mode, using bs={bs_} (1/{1/bs_}) and log2={log2_}') try: answer.object.update( log2=log2_, bs=bs_, debug=self.debug, force_severity=True, **kwargs) except ZeroDivisionError as e: logger.error(e) except AttributeError as e: logger.error(e) elif isinstance(answer.object, Severity): # there is no updating for severities pass elif isinstance(answer.object, Portfolio) and update is True: d = answer.spec # extract hints from note string log2, bs, recommend_p, kwargs = parse_note_ex( d['note'], log2, bs, recommend_p, kwargs) # figure stuff if log2 == -1: log2_ = 13 elif log2 == 0: log2_ = self.log2 else: log2_ = log2 if bs == 0: bs_ = answer.object.best_bucket(log2_, recommend_p) else: bs_ = bs logger.info(f'updating with {log2_}, bs=1/{1 / bs_}') logger.info( f'({answer.kind}, {answer.name}): bs={bs_} and log2={log2_}') answer.object.update(log2=log2_, bs=bs_, remove_fuzz=True, force_severity=True, debug=self.debug, **kwargs) elif isinstance(answer.object, Distortion): pass elif isinstance(answer.object, (Aggregate, Portfolio)) and update is False: pass else: logger.warning( f'Unexpected: output kind is {type(answer.object)}. (expr/number?)') pass if len(rv) == 1: # only one output...just return that # retun object if it exists, otherwise the ans namedtuple for answer in rv: if answer.object is None: return answer else: # "automatic validation" here! # if isinstance(answer.object, (Portfolio, Aggregate)): # rv = answer.object.valid # if rv == Validation.NOT_UNREASONABLE: # logger.warning(f'{answer.kind} {answer.name} is not unreasonable') # else: # logger.warning(f'WARNING: {answer.kind} {answer.name} FAIL VALIDATION') return answer.object else: # multiple outputs, see if there is just one portfolio...this is not ideal?! ports_found = 0 for answer in rv: if answer.kind == 'port': ports_found += 1 if ports_found == 1: # if only one, it must be answer if answer.object is None: return answer else: return answer.object # in all other cases, return the full list return rv
__call__ = build
[docs] def interpreter_file(self, *, filename='', where=''): """ Run a suite of test programs. For detailed analysis, run_one. filename is a string or Path. If a csv it is read into a dataframe, with the first column used as index. If it is an agg file (e.g. an agg database), it is preprocessed to remove comments and replace \\n\\t agg with a space, then split on new lines and converted to a dataframe. Other file formats are rejected. These methods are called interpreter\_... rather than interpret\_... because they are for testing and debugging the interpreter, not for actually interpreting anything! """ if filename == '': filename = Path.home() / 'aggregate/tests/test_suite.csv' elif type(filename) == str: filename = Path(filename) if filename.suffix == '.csv': df = pd.read_csv(filename, index_col=0) elif filename.suffix == '.agg': txt = filename.read_text(encoding='utf-8') stxt = re.sub('\n\tagg', ' agg', txt, flags=re.MULTILINE) stxt = stxt.split('\n') stxt = [i for i in stxt if len(i) and i[0] != '#'] df = pd.DataFrame(stxt, columns=['program']) else: raise ValueError( f'File suffix must be .csv or .agg, not {filename.suffix}') if where != '': df = df.loc[df.index.str.match(where)] # add One severity if not input # if txt.find('sev One dsev [1]') < 0: # logger.info('Adding One to knowledge.') self.write('sev One dsev [1]') return self._interpreter_work(df.iterrows())
[docs] def interpreter_line(self, program, name='one off', debug=True): """ Interpret single line of code in debug mode. name is index of output """ return self._interpreter_work(iter([(name, program)]), debug=debug)
[docs] def interpreter_list(self, program_list): """ Interpret elements in a list in debug mode. """ return self._interpreter_work(list(enumerate(program_list)), debug=True)
[docs] def run_test_suite(self): """ Run interpreter on the test suite """ df = self.interpreter_file(filename=self.test_suite_file) num_errors = df.error.sum() if num_errors != 0: logger.error('{num_errors} errors in test suite') return df
[docs] def _interpreter_work(self, iterable, debug=False): """ Do all the work for the test, allows input to be marshalled into the tester in different ways. Unlike production interpret_program, runs one line at a time. Each line is preprocessed and then run through a clean parser, and the output analyzed. Last column, program as input is only changed if the preprocessor changes the program :return: DataFrame """ lexer = UnderwritingLexer() parser = UnderwritingParser(self.safe_lookup, debug) ans = {} errs = 0 no_errs = 0 # detect non-trivial change def f(x, y): return 'same' if x.replace( ' ', '') == y.replace(' ', '').replace('\t', '') else y for test_name, program in iterable: if type(program) != str: program_in = program[0] program = lexer.preprocess(program_in) else: program_in = program program = lexer.preprocess(program_in) err = 0 if len(program) == 1: program = program[0] try: # print(program) kind, name, spec = parser.parse(lexer.tokenize(program)) except (ValueError, TypeError) as e: errs += 1 err = 1 kind = program.split()[0] # get something to say about the error ea = getattr(e, 'args', None) if ea is not None: # t = getattr(ea[0], 'type', ea[0]) # v = getattr(ea[0], 'value', ea[0]) i = getattr(ea[0], 'index', 0) if type(i) != int: i = 0 # print(i, ea) txt = program[0:i] + f'>>>' + program[i:] name = 'parse error' else: txt = str(e) name = 'other error' spec = txt else: no_errs += 1 ans[test_name] = [kind, err, name, spec, program, f(program, program_in)] elif len(program) > 1: logger.info( f'{program_in} preprocesses to {len(program)} lines; not processing.') logger.info(program) ans[test_name] = ['multiline', err, None, None, program, program_in] else: logger.info( f'{program_in} preprocesses to a blank line; ignoring.') ans[test_name] = ['blank', err, None, None, program, program_in] df_out = pd.DataFrame(ans, index=['kind', 'error', 'name', 'output', 'preprocessed program', 'program']).T df_out.index.name = 'index' return df_out
[docs] def more(self, regex): """ More information about methods and properties matching regex """ more(self, regex)
[docs] def qlist(self, regex): """ Wrapper for show to just list elements in knowledge that match ``regex``. Returns a dataframe. """ return self.show(regex, kind='', plot=False, describe=False, verbose=True)
[docs] def qshow(self, regex, tacit=True): """ Wrapper for show to just show (display) elements in knowledge that match ``regex``. No reutrn value if tacit, else returns a dataframe. """ def ff(x): fs = '{x:120s}' return fs.format(x=x) bit = self.show(regex, kind='', plot=False, describe=False)[['program']] bit['program'] = bit['program'].str.replace( r' note\{[^}]+\}', '').str.replace(' +', ' ') # , flags=re.MULTILINE) # bit['program'] = bit['program'].str.replace(' ( +)', ' ') #, flags=re.MULTILINE) # bit['program'] = bit['program'].str.replace(r' note\{[^}]+\}$| *', ' ' ) #, flags=re.MULTILINE) if tacit: qd(bit, line_width=160, max_colwidth=130, col_space=15, justify='left', max_rows=200, formatters={'program': ff}) else: return bit
[docs] def show(self, regex, kind='', plot=True, describe=True, logger_level=30, verbose=False, **kwargs): """ Create from knowledge by name or match to name. Optionally plot. Returns the created object plus dataframe with more detailed information. Allows exploration of preloaded databases. Eg ``regex = "A.*[234]`` to run examples named A...2, 3 and 4. See ``qshow`` for a wrapper that just returns the matches, with no object creation or plotting. Examples. :: from aggregate.utilities import pprint # pretty print all prgrams starting A; no object creation build.show('^A.*', 'agg', False, False).program.apply(pprint); # build and plot A..234 ans, df = build.show('^A.*') :param regex: for filtering name :param kind: the kind of object, port, agg, etc. :param plot: if True, plot (default True) :param describe: if True, print the describe dataframe :param logger_level: work silently! :param verbose: if True, return the dataframe and objects; else no return value :param kwargs: passed to build for calculation instructions :return: dictionary of created objects and DataFrame with info about each. """ # too painful getting the one thing out! ans = [] # temp logger level lm = LoggerManager(logger_level) if kind is None or kind == '': df = self.knowledge.droplevel('kind').filter( regex=regex, axis=0).copy() else: df = self.knowledge.loc[kind].filter(regex=regex, axis=0).copy() # severity causes an error: no est_m etc. if "One" in df.index: df = df.drop(index='One') if plot is False and describe is False: # just act like a filtered listing on knowledge return df.sort_values('name') # added detail columns df['log2'] = 0 df['bs'] = 0. df['agg_m'] = 0. df['agg_cv'] = 0. df['agg_sd'] = 0. df['agg_skew'] = 0. df['emp_m'] = 0. df['emp_cv'] = 0. df['emp_sd'] = 0. df['emp_skew'] = 0. df['valid'] = False for n, row in df.iterrows(): p = row.program try: a = self.build(p, **kwargs) ans.append(a) except NotImplementedError: logger.error(f'skipping {n}...element not implemented') else: if describe: pp = getattr(a, 'pprogram', None) if pp is not None: print(pp) qd(a) if plot is True: a.plot(figsize=(8, 2.4)) # print('\nDensity and Quantiles') print() show_fig(a.figure, format='svg') if describe: print('\n') df.loc[n, ['log2', 'bs', 'agg_m', 'agg_cv', 'agg_sd', 'agg_skew', 'emp_m', 'emp_cv', 'emp_sd', 'emp_skew', 'valid']] = ( a.log2, a.bs, a.agg_m, a.agg_cv, a.agg_sd, a.agg_skew, a.est_m, a.est_cv, a.est_sd, a.est_skew, a.explain_validation()) # if only one item, return it...much easier to use if len(ans) == 1: # noinspection PyUnboundLocalVariable ans = a if verbose: return ans, df
[docs] def dir(self, pattern=''): """ List all agg databases in site and default directories. If entries is True then read them and return named objects. :param pattern: glob pattern for filename; .agg is added """ if pattern == '': pattern = '*.agg' else: pattern += '.agg' entries = [] for dn, d in zip(['site', 'default'], [self.site_dir, self.default_dir]): for fn in d.glob(pattern): txt = fn.read_text(encoding='utf-8') stxt = txt.split('\n') for r in stxt: rs = r.split(' ') if rs[0] in ['agg', 'port', 'dist', 'distortion', 'sev']: entries.append([dn, fn.name] + rs[:2]) ans = pd.DataFrame(entries, columns=[ 'Directory', 'Database', 'kind', 'name']) return ans
# exported instance # self = dbuild = None logger_level(30) build = Underwriter(databases='test_suite', update=True, debug=False, log2=16) # uncomment to create debug build, add to __init__.py # debug_build = Underwriter(name='Debug', update=True, debug=True, log2=16)