Source code for aggregate.extensions.ft

import logging
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.ticker as ticker
import numpy as np
import pandas as pd
from scipy.fft import irfft,  rfft, ifft as ift
from numpy import real, imag, roll
from .. import build, qd, Aggregate
from .. constants import FIG_H, FIG_W


# get the logger
logger = logging.getLogger(__name__)


[docs] def poisson_example(en, small2): """ Example to show how to compute Po(en) using 1 << small2 buckets. For AAS paper. Sample call:: poisson_example(10**8, 17) :param en: mean of Poisson, e.g., 10**8 :param small2: log2 number of buckets to use in FFT routine, 2**small2 should be about 10 * en ** 0.5 to get +/-5 standard deviations around the mean """ from scipy.stats import poisson B = 1 << small2 z = np.zeros(B); z[1] = 1 wrap = irfft(np.exp(en * (rfft(z) - 1))) k = en // B + 1 xs = k * B - (B >> 1) + np.arange(B) pmf = roll(wrap, B >> 1) df = pd.DataFrame({'x': xs, 'FFT pmf': pmf}) po = poisson(en) df['Exact pmf'] = po.pmf(df.x) df = df.set_index('x', drop=True) fig, [ax0, ax1] = plt.subplots(1, 2, figsize=(FIG_W * 2, FIG_H + 0.3), constrained_layout=True) ax0.plot(wrap); ax0.set(title=f'Raw FFT-based output, wrapped to [0, {B}]', xlabel=f'Wrapped outcome, n mod {B}', ylabel='Probability mass, Pr(N=n)'); df[['FFT pmf', 'Exact pmf']].plot(style=['-', ':'], ax=ax1, logy=True, title='Shifted FFT vs exact Poisson probabilities\n(log scale)', xlabel='Outcome, n', ylabel='Probability mass, Pr(N=n)'); ax1.set(ylim=[1e-17, 2 * df['FFT pmf'].max()]) ax1.yaxis.set_minor_locator(ticker.LogLocator(subs='all'))
[docs] def fft_wrapping_illustration(ez=10, en=20, sev_clause='', small2=0, cmap='plasma'): """ Illustrate wrapping by convolving a uniform distribution with mean ez en times (if ``ez>0`` or ``sev_clause!=''``) or using the input ``sev_clause``. ``sev_clause`` should be a ``dsev`` tailored to ``bs==1`` or just a Poisson if ez==1. Show in a space just big enough for the severity first and then big enough for the full aggregate. Center and right hand plot illustrate how the full components are sliced up and combined to the wrapped total. If small2 is zero it is taken to be the smallest value to "fit" the severity. In Poisson ez==1 mode, small2 equals the size of the small window to use for convolution. big2 is estimated to fit the whole distribution. (moved from figures.py) """ fig, axs = plt.subplots(1, 3, figsize=(3 * FIG_W, FIG_H + 0.3), constrained_layout=True) ax0, ax1, ax2 = axs.flat if ez == 0 or sev_clause != '': if sev_clause == '': raise ValueError('Must input one of ez>0 or a valid DecL sev_clause') sev = build(f'agg Junk 1 claim {sev_clause} fixed') ez = sev.sev_m q1 = sev.q(1) if small2 == 0: small2 = int(np.ceil(np.log2(q1))) xs = np.hstack((-np.inf, np.arange(1 << small2) + 0.5)) z = np.diff(sev.sev.cdf(xs)) # enough space for aggregate big2 = int(np.ceil(np.log2(q1 * en))) elif ez == 1: assert small2, 'Need to input small2 in Poisson mode' z = np.zeros(1 << small2) z[1] = 1 # full space sigma = np.sqrt(en) big2 = int(np.ceil(np.log2(en + 5 * sigma))) else: # enough space for severity and make sev if small2 == 0: small2 = int(np.ceil(np.log2(2 * ez))) z = np.zeros(1 << small2) z[:ez*2] = 1 / ez / 2 # enough space for aggregate big2 = int(np.ceil(np.log2(2 * ez * en))) if big2 <= 8: ds = 'steps-post' else: ds = 'default' if ez == 1: wrapped = irfft( np.exp(en * (rfft(z) - 1))) full = irfft( np.exp(en * (rfft(z, 1 << big2) - 1))) else: wrapped = irfft( rfft(z) ** en ) full = irfft( rfft(z, 1 << big2) ** en ) ax0.plot(wrapped, c='C0', drawstyle=ds) ax0.xaxis.set_major_locator(mpl.ticker.MultipleLocator(4)) ax0.set(title=f'Wrapped distribution\nlog2={small2}') lm = ax0.get_ylim() lm = (-lm[1] / 20, lm[1]* 1.1) ax0.set(ylim=lm) norm = mpl.colors.Normalize(0, 1, clip=True) cmappable = mpl.cm.ScalarMappable(norm=norm, cmap=cmap) mapper = cmappable.to_rgba cc = list(map(mapper, np.linspace(0, 1, 1 << big2-small2))) ax1.plot(full, label='Full computation', c='w', alpha=1, lw=3, drawstyle=ds) ax1.xaxis.set_major_locator(mpl.ticker.MultipleLocator(32)) for n, (s, c) in enumerate(zip(full.reshape((1<<big2-small2, 1<<small2)), cc)): ax1.plot(s, c=c, label=f'Part {n}', drawstyle=ds) ax1.plot(np.arange((1<<small2) * n, (1<<small2) * (n+1)), s, c=c, lw=2, drawstyle=ds, label=None) for n in range(1 << big2-small2): ax1.axvline(n * (1 << small2), lw=.25, c='C7') for n in range(1 << big2-small2): ax1.axvline(n * (1 << small2), lw=.25, c='C7') if big2 - small2 <= 3: ax1.legend(loc='center right') ax1.set(title=f'Full distribution\nlog2={big2}, {1<<big2-small2} components') wrapped_from_full = full.reshape((1<<big2-small2, 1<<small2)) ax2.plot(wrapped_from_full.T, label=None, c='C7', lw=.5, drawstyle=ds) ax2.plot(wrapped_from_full.sum(0), lw=3 , drawstyle=ds, label='Wrapped from full', c='C1') ax2.plot(wrapped, lw=1, label='Wrapped', c='C0', drawstyle=ds) ax2.xaxis.set_major_locator(mpl.ticker.MultipleLocator(4)) ax2.set(title='Wrapping components (grey)\nSums (blue, organge as left)') # ax2.legend(loc) ax2.set(ylim=lm) assert np.allclose(wrapped_from_full.sum(0), wrapped)
[docs] def ft_invert(log2, chf, frz_generator, params, loc=0, scale=1, xmax=0, xshift=0, suptitle='', wraps=None, disc_calc='density'): """ Illustrate "manual" inversion of a characteristic function using irfft, including optional scaling and location shift. :param params: a list of shape parameters. loc and scale are handled separately. :param chf: the characteristic function of the distribution, takes args params, loc, scale, t; routine handles conversion to Fourier Transform :param frz_generator: the scipy.stats function to create the underlying distribution. Used to compute the exact answer. If there is not analytic formula, you can pass in a numpy array with the values of the distribution at the points in xs computed by some other means (e.g., for the Tweedie distribution). :param loc: location paramteter :param scale: scale parameter :param xmax: if not zero, used to fix xmax. Otherwise, selected as frz.isf(1e-17) to capture the full range of theunderlying distribution. For thick tailed distributions, you usually input xmax manually. Note bs = xmax / n. If None set equal to n = 1 << log2, for use in discrete distributions (forced bucket size of 1). :param xshift: if not zero, used to shift the x-axis. The minimum x value equals xshift. To center the x-axis, set xshift = -xmax/2. Should be a multiple of bs = xmax / n. :param suptitle: optional suptitle for the figure :param wraps: optional list of wrap values. :param disc_calc: 'density' rescales pdf, 'surival' uses backward differences of sf (to match Aggregate class calculation) """ # number of buckets n = 1 << log2 if xmax is None: xmax = n # make frozen object if callable(frz_generator): if scale is None: # freq dists (Poisson) do not allow scaling frz = frz_generator(*params, loc=loc) frz.pdf = frz.pmf # for subsequent use scale = 1 else: frz = frz_generator(*params, loc=loc, scale=scale) # spatial upto xmax; used to create exact using the scipy stats object # sampling interval (wavelength) = xmax / n if xmax == 0: xmax = frz.isf(1e-17) # sampling domain, for exact and to "label" the Fourier Transform output # xs = np.arange(n) * xmax / n bs = xmax / n xs = np.arange(n) * bs + xshift if callable(frz_generator): if disc_calc == 'density': exact = frz.pdf(xs) exact = exact / exact.sum() * (frz.cdf(xs[-1]) - frz.cdf(xs[0])) else: xs1 = np.hstack((xs - bs / 2, xs[-1] + bs / 2)) exact = -np.diff(frz.sf(xs1)) else: # pass in values exact = frz_generator # convert chf to ft including scale and loc effects def loc_ft(t): nonlocal params, loc, scale # ft(t)= int f(x)exp(-2πi t x)dx # chf(t) = int f(x)exp(i t x)dx t1 = -t * 2 * np.pi ans = chf(*params, t1 * scale) if loc != 0: # for some reason ans *= np.exp(-t1 * loc) does not work ans = ans * np.exp(t1 * loc * 1j) return ans # sampling interval = bs = xmax / n [small bs, high sampling rate] # sampling freq is 1 / bs = n / xmax, the highest sampling freq for inverting the FT # note xmax = n * bs, so n / xmax = 1 / bs. # f(x) = int_R fhat(t) exp(2πi tx)dt ≈ int_-f_max_f^max_f ... f_max = n / xmax # sample the FT; using real fft, only need half the range ts = np.arange(n // 2 + 1) * f_max / n # ts = np.arange(n // 2 + 1) / xmax fx = loc_ft(ts) # for debugging ft_invert.fx = fx ft_invert.ts = ts x = irfft(fx) if xshift != 0: x = np.roll(x, -int(xshift / bs)) # plotting fig, axs = plt.subplots(2, 2, figsize=(2 * 3.5, 2 * 2.45), constrained_layout=True) ax0, ax1, ax2, ax3 = axs.flat for ax in axs[0].flat: ax.plot(xs, exact, label='exact', lw=1.5) ax.plot(xs, x, label='xs irfft', ls=':', lw=1.5) ax.legend(fontsize='x-small') ax0.set(title='Density', xlabel='Outcome, x') # mn = min(np.log10(exact).min(), np.log10(x).min()) mn0 = np.log10(x).min() * 1.25 mn = 10 ** np.floor(mn0) mx = max(np.log10(exact).max(), np.log10(x).max()) mx = 10 ** np.ceil(mx) if np.isnan(mn): mn = 1e-17 if np.isnan(mx): mx = 1 ax1.set(yscale='log', ylim=[mn, mx], title='Log density', xlabel='Outcome, x') # amplitude and phase ax2.plot(ts, np.abs(fx), '-', lw=1.5, c='C3') ax2.set(title='Amplitude', ylabel='|ft|', yscale='log', xlabel='frequency') if log2 <= 8: ax3.plot(ts, np.cumsum(np.angle(fx)) / (2 * np.pi), '-', marker='.', ms=3, c='C2') else: ax3.plot(ts, np.cumsum(np.angle(fx)) / (2 * np.pi), c='C2') ax3.set(title='Cumulative phase', ylabel='cumsum(arg(ft)) / $2\\pi$', xlabel='frequency') if suptitle != '': fig.suptitle(suptitle) if wraps is not None: fig2, ax = plt.subplots(1, 1, figsize=(3.5 * 2, 2.45 * 2), constrained_layout=True) rt = exact.copy() ax.plot(xs, exact, label='exact', lw=1.5) ax.plot(xs, x, label='xs irfft', ls=':', lw=1.5) for b in wraps: xs2 = b * n / f_max + xs adj = frz.pdf(xs2) adj = adj / np.sum(adj) * (frz.cdf((b + 1) * n / f_max) - frz.cdf(b * n / f_max)) rt += adj ax.plot(xs, rt, label=f'wrap {b}', lw=.5) ax.set(yscale='log', ylim=[mn, mx], title='Aliasing analysis', xlabel='Outcome, x', ylabel='Log density') ax.legend(fontsize='x-small', ncol=2, loc='upper right') return pd.DataFrame({'x': xs, 'p': x, 'p_exact': exact}).set_index('x')
[docs] def recentering_convolution_example(sev_clause, en, log2, agg_log2=0, bs=1, freq_clause='poisson', remove_fuzz=False): """ Illustrate how to find the "correct" part of an aggregate and recenter it appropriately. Aggregate is:: agg RecenteringExample en claims dsev xs ps poisson **Method** #. Compute aggregate with enough space for the supported part of the distribution (say, where density > 1e-15 or so) #. Subtract the mean by rolling left (negative shift) by mean / bs buckets (mod n) #. fft shift = roll (in either direction) by n / 2 buckets, because the distribution is centered at zero and has positive and negative parts. #. Set appropriate x values to align with the density. Density is from mean - n/2 to mean + n/2 - 1 (times bs). Reasonable defaults:: en = 5000 log2 = 15 xs = [3, 4, 7, 34] ps = [1/8, 1/8, 1/8, 5/8] (from hifreq.py) :param xs: array of x values for dsev :param ps: array of x values for dsev """ df, ag = recentering_convolution(sev_clause, freq_clause, en, log2, bs, remove_fuzz) # update the ag object if agg_log2 if agg_log2 == 0: # just plot fig, ax0 = plt.subplots(1, 1, figsize=( 3.5, 2.45), constrained_layout=True) df.plot(ax=ax0, c='C0') ax0.set( title=f'Shifted, centered, re-indexed aggregate\nlog2={log2} buckets') bit = None elif agg_log2 > 0: ag.update(approximation='exact', log2=agg_log2, bs=bs, padding=0) qd(ag) print('-'*80) # percentiles - help determining log2 needed for hi freq calculation qd(pd.Series([ag.agg_sd, ag.q(0.001), ag.q(0.999), ag.q(0.999999) - ag.q(0.000001)], index=['std dev', 'p001', 'p999', 'range'])) print('-'*80) # merge and compare fig, axs = plt.subplots(2, 3, figsize=( 3 * 3.5, 2 * 2.45), constrained_layout=True) ax0, ax1, ax2, ax3, ax4, ax5 = axs.flat df.plot(ax=ax0, c='C0') ax0.set( title=f'Shifted, centered, re-indexed aggregate\nlog2={log2} buckets') bit = pd.concat((ag.density_df.p_total, df), axis=1, join='inner') bit.plot(ax=ax1) ax1.lines[1].set(ls='--') ax1.legend(loc='upper right') ax1.set( title=f'Shifted vs. agg object\n' f'Linear scale; agg object log2 = {ag.log2}') bit.plot(ax=ax2, logy=True) ax2.lines[1].set(ls='--') ax2.legend(loc='upper right') ax2.set(title='Shifted vs. agg object\nLog scale') abs_error = (bit.p_total - bit.a).abs() abs_error.plot(ax=ax4, c='C2', logy=True, title=f'Abs error\n' f'Max {abs_error.max():.5g}; ' f'Avg {abs_error.mean():.5g}') rel_error = abs_error / bit.p_total rel_error = rel_error.loc[bit.p_total > 1e-15] rel_error.plot(ax=ax5, c='C2', logy=True, title=f'Rel error p_total > 1e-15\n' f'Max {rel_error.max():.5g}; ' f'Avg {rel_error.mean():.5g}') ax3.remove() else: raise ValueError('log2 >= 0!') return df, ag, bit
[docs] def recentering_convolution(sev_clause, freq_clause, en, log2, bs, remove_fuzz): """ Compute hifreq convol for sev_clause, a DecL severity statement. Must have only one component for now. Illustrates how to find the "correct" part of an aggregate and recenter it appropriately. Aggregate is:: agg RecenteringExample en claims dsev xs ps poisson **Method** #. Compute aggregate with enough space for the supported part of the distribution (say, where density > 1e-15 or so) #. Subtract the mean by rolling left (negative shift) by mean / bs buckets (mod n) #. fft shift = roll (in either direction) by n / 2 buckets, because the distribution is centered at zero and has positive and negative parts. #. Set appropriate x values to align with the density. Density is from mean - n/2 to mean + n/2 - 1 (times bs). """ ag = build(f'agg SevEg {en} claims {sev_clause} {freq_clause}', update=False) ez = ag.sev_m mean_bucket = int(round(ez * en / bs)) ft_len = 1 << log2 ag.xs = np.linspace(0, ft_len * bs, ft_len, endpoint=False) ag.log2 = log2 ag.bs = bs ag.padding = 0 z = ag.discretize('round', 'survival', True) assert len(z) == 1 z = z[0] fz = rfft(z, ft_len) # aggregate by hand # fa = np.exp(en*(fz - 1)) fa = ag.freq_pgf(en, fz) a = irfft(fa) # center roll left by ez and 1 << log2-1, former # is the mean offset, the latter is fftshift a = roll(a, -(mean_bucket % ft_len) + (1 << log2 - 1)) # set up aligned to the appropriate xs values df = pd.DataFrame( {'n': np.arange(mean_bucket - (1 << log2 - 1), mean_bucket + (1 << log2 - 1)) * bs, 'a': a} ).set_index('n') if remove_fuzz: # remove fuzz eps = np.finfo(float).eps df.loc[df.a.abs() < 2 * eps, 'a'] = 0 qd(stats(df).T) print('-'*80) return df, ag
[docs] def stats(df): ans = {} ns = {} ns[1] = np.array(df.index) ns[2] = ns[1] * ns[1] ns[3] = ns[2] * ns[1] for c in df: ans[c] = [np.sum(ns[i] * df[c]) for i in [1, 2, 3]] df = pd.DataFrame(ans, index=[1, 2, 3]) df.loc['var'] = df.loc[2] - df.loc[1] ** 2 df.loc['sd'] = df.loc['var'] ** .5 df.loc['cv'] = df.loc['sd'] / df.loc[1] df.loc['skew'] = (df.loc[3] - 3 * df.loc[2] * df.loc[1] + 2 * df.loc[1]**3) / df.loc['sd'] ** 3 return df
[docs] class FourierTools(object): """Manual inversion of a ch. f. using FFTs.""" def __init__(self, chf, fz, scale_mode=True): """ Class version of manual inversion of characteristic function. Splits functionality of aggregate.extensions.ft_invert into: 1. Numerical inversion, ``ft_invert`` 2. Computation of actual density 2. Graph to compare densities 3. Graph to compute effect of wrapping Compared to ``ft_invert`` function, the meaning of x_max and xshift are changed. For discrete rvs, x_max is always n - 1 and the bucketr size 1. For continuous rvs, it is either input or estimated as a quantile. The arguments completely define the distribution of interest. Other class functions vary the numerical variables, defining the window and number of points used in the FFT routine. See BLOG POST. :param chf: the characteristic function of the distribution, takes args t; routine handles conversion to Fourier Transform and adds loc and scale effects. Must use same shape parameters as fz. :param fz: the scipy.stats frozen distribution object. Used to compute the exact answer. (Note: ft_invert allowed the class, pre-creation or an array. That option no longer allowed.) If fz is 'discrete' or 'continuous' or 'mixed' it is a generic distribution with no closed form cdf/pdf, e.g. Tweeedie. Then you can't compute exact, obviously. :param scale_mode: if True, the scale parameter from fz is used in the Fourier Transform, otherwise it is unadjusted. """ self.chf = chf self.fz = fz self.scale_mode = scale_mode if isinstance(fz, str): # extremely limited functionality self.params = (0,) # allows passing as param 1 to the chf self.loc = 0. self.scale = 1. self.discrete = True if fz == 'discrete' else False self.distribution_name = 'User defined' elif isinstance(fz, Aggregate): self.params = (0,) self.loc = 0. self.scale = 1. self.discrete = fz.bs == 1 self.distribution_name = f'Aggregate({fz.name})' elif fz is None: # flying blind...do the best we can self.params = (0,) self.loc = 0. self.scale = 1. self.discrete = False self.distribution_name = 'Unknown' else: # extract shape params from fz; no longer used self.params = fz.args # location and scale with default 0, 1 kwds = fz.kwds self.loc = kwds.get('loc', 0.) self.scale = kwds.get('scale', 1) # is this a discrete or continuous variable? self.discrete = True if str(type(fz)).find('discrete') > 0 else False self.distribution_name = fz.dist.name # slightly ugly state, but this encodes n=2**log2, xmin, xmax, etc. self._df = self._df_exact = None self._ts = None # phases (angles) at which ft evaluated , for plotting self._fourier = None # store the sample of ft at self._ts for plotting self.last_fig = None # from the last run, note: x_range = P self.bs = self.x_range = self.x_min = self.x_max = self.log2 = 0 self.exact_calc = "" def __repr__(self): """Repr of the object.""" return f'FourierTools({self.distribution_name}{self.params}, loc={self.loc}, scale={self.scale})'
[docs] def describe(self): """More information.""" return f'{repr(self)}\nn={2**self.log2}, x_min={self.x_min}, x_max={self.x_max:.3g}, bs={self.bs:.3g}'
@property def df(self): """Return current state dataframe output.""" if self._df is not None: return self._df else: raise ValueError('Must run invert first!') @property def df_exact(self): """Return current state dataframe output.""" if self._df is not None: return self._df_exact else: raise ValueError('Must run compute_exact first!')
[docs] def fourier_transform(self, t): """ Create ft function by converting ch f to Fourier transform and including scale and loc. Recall: ft(t)= int f(x)exp(-2πi t x)dx chf(t) = int f(x)exp(i t x)dx. """ TWOPI = 6.283185307179586 t1 = -t * TWOPI if self.scale_mode: ans = self.chf(t1 * self.scale) if self.loc != 0: # for some reason ans *= np.exp(-t1 * loc) does not work ans = ans * np.exp(t1 * self.loc * 1j) else: ans = self.chf(t1) return ans
[docs] def invert(self, log2, x_min=0, bs=0, x_max=None, s=1e-17): """ Invert a characteristic function using irfft. Call with just log2 for positive support, to determine x_max and bs from quantiles. Call with bs fixed to a reasonable value to deduce x_max = x_min + n bs. Call with x_max to fix the range and deduce bs. :param x_min: minimum value of range. :param bs: bucket size to use, determines x_max :param x_max: minimum value of range. :param s: survival probability to determine tails """ # number of buckets self.log2 = log2 n = 1 << log2 if x_min is None: x_min = self.fz.ppf(s) if bs == 0: if self.discrete: x_max = x_min + n else: x_max = self.fz.isf(s) if x_max is None else x_max else: x_max = x_min + bs * n if x_min == 0 and x_max == 0: raise ValueError('Must provide x_min < 0 or x_max > 0. Current range is 0 to 0!') # spatial range is [x_min, x_max] # translate to [0, x_max - x_min] x_range = x_max - x_min # sampling interval (wavelength) = bs = x_range / n # sampling domain, for exact and to "label" the Fourier Transform output # sampling interval is bs (small bs means high sampling rate) # the highest sampling freq for inverting the FT is 1 / bs = n / x_range bs = x_range / n if self.discrete and bs != 1: logger.warning(f'{bs=}, not the expected 1 for a discrete rv') # f_max is 1 / bs f_max = 1 / bs # f(x) = int_R fhat(t) exp(2πi tx)dt ≈ int_-f_max_f^max_f ... # sample the FT; using real fft, only need half the range # self._ts = np.arange(n // 2 + 1) * f_max / n self._ts = np.linspace(0, 1/2, n // 2 + 1) * f_max self._fourier = self.fourier_transform(self._ts) # for debugging # ft_invert.fx = fx # ft_invert.self._ts = self._ts probs = irfft(self._fourier) if x_min != 0: probs = np.roll(probs, -int(x_min / bs)) # for df index self._df = pd.DataFrame({ 'x': np.linspace(x_min, x_max, n, endpoint=False), 'p': probs}).set_index('x') # store for future use self.bs, self.x_range, self.x_min, self.x_max = bs, x_range, x_min, x_max
[docs] def invert_simpson(self, *, log2=0, bs=0, x_min=None): """ Add Simpson's method approximation. Adds ``simpson`` column to ``self.df``. Uses method of Wang and Zhang, "Simpson's rule based FFT method to compute densities of stable distribution" (2008). Can no longer use the real fft and ifft methods because the input vector is not conjugate symmetric about its midpoint. Run after self.invert to set parameters or input directly. Defaults are None because 0 is a legitimate value for x_min. Convenient to state x_min and bs directly. Best to input all parameters if adjusting to avoid mismatches. Note x_max gets trumped by n, bs, and x_min. """ # parameters to use for the calculation log2 = log2 if log2 > 0 else self.log2 n = 1 << log2 x_min = x_min if x_min is not None else self.x_min bs = bs if bs > 0 else self.bs f_max = 1 / bs ks = np.arange(0, n) P = n * bs # period t_left = np.linspace(-f_max / 2, f_max / 2, n, endpoint=False) t_left = np.roll(t_left, n >> 1) if x_min != 0: # pre-FFT adjustment x_min_adj = np.exp(2 * np.pi * 1j * x_min * t_left) # post-FFT adjustment post_2 = np.exp(2 * np.pi * 1j * x_min / (2 * P)) post_3 = np.exp(2 * np.pi * 1j * x_min / P) else: # no adjustments needed x_min_adj = post_2 = post_3 = 1. phi_left = self.fourier_transform(t_left) # three terms left, middle, right of simpson's rule term1 = ift(x_min_adj * phi_left) term2 = post_2 * np.exp(np.pi * 1j * ks / n) * ift( x_min_adj * self.fourier_transform(t_left + 0.5 / n * f_max)) term3 = post_3 * np.exp(2 * np.pi * 1j * ks / n) * ift( x_min_adj * self.fourier_transform(t_left + 1 / n * f_max)) # weighted sum simpson = (term1 + 4. * term2 + term3) / 6. # check imaginary part small max_abs = np.abs(np.imag(simpson)).max() if max_abs > 1e-10: logger.warning(f'Answer has suspiciously large imaginary component {max_abs}') # create / update answer dataframe if self._df is None or self.bs != bs or self.x_min != x_min: # changed scale or location, recreate dataframe from scratch logger.info('recreating df') self._df = pd.DataFrame({ 'x': np.linspace(x_min, x_min + n * bs, n, endpoint=False), 'simpson': np.real(simpson)}).set_index('x') else: # append to existing dataframe self.df['simpson'] = np.real(simpson)
[docs] def compute_exact(self, calc='survival', max_points=257): """ Compute exact density using frozen scipy.stats object. :param calc: 'density' re-scales pdf, 'survival' uses backward differences of sf. :param max_points: maximum number of points to use; if more just interpolate this many points. :param decimate: decimate (take ``::decimate``) input xs to reduce number of calls to fz (which may be slow) :param min_points: opposite of decimate, ensure exact computed with min_points. Useful for example when log2 is "too small" to ensure exact distribution is rendered correctly. """ assert calc in ('survival', 'density'), 'calc must be "survival" or "density"' self.exact_calc = calc assert self._df is not None, 'Must recompute first. Run ft_invert().' xs = np.array(self._df.index) if len(xs) > max_points and not self.discrete: # mostly this is an issue for non-discrete distributions xs = np.linspace(xs[0], xs[-1] + self.bs, max_points) # if decimate > 1: # xs = xs[::decimate] # # if too few points beef up, non-discrete distributions only # if len(xs) < min_points and not self.discrete: # xs = np.linspace(xs[0], xs[-1] + self.bs, min_points) self.bs_exact = bs = xs[1] - xs[0] # discrete dists have pmf not pdf if getattr(self.fz, 'pdf', None) is None: pdf = self.fz.pmf else: pdf = self.fz.pdf if calc == 'density': logger.warning('Best to use survival calc rather than density method.') exact = pdf(xs) self._df_exact = pd.DataFrame({'x': xs, 'p': exact}).set_index('x') elif calc == 'survival': xs1 = np.hstack((xs - bs / 2, xs[-1] + bs / 2)) exact = -np.diff(self.fz.sf(xs1)) / bs self._df_exact = pd.DataFrame({'x': xs, 'p': exact}).set_index('x') # self.decimate = decimate return self._df_exact
[docs] def plot(self, suptitle='', xlim=None, verbose=True): """ Compare density, log density, and plot amplitude and argument of Fourier transform. :param suptitle: super title for the plot. """ assert self._df is not None, 'Must recompute first. Run ft_invert() and compute_exact().' has_exact = self._df_exact is not None if not has_exact: logger.warning('No exact! Maybe run compute_exact().') # plot four graphs per ft_invert if verbose: self.last_fig, axs = plt.subplots(2, 3, figsize=(3 * 2.5, 2 * 2), constrained_layout=True) ax0, ax1, ax2, ax3, ax4, ax5 = axs.flat else: self.last_fig, axs = plt.subplots(1, 2, figsize=(2 * 2.5, 1 * 2.5), constrained_layout=True) ax0, ax1 = axs.flat x = np.array(self._df.index) p = self._df.p.values b = x[1] - x[0] if has_exact: xe = np.array(self._df_exact.index) pe = self._df_exact.p.values be = xe[1] - xe[0] else: # avoid an error below when no exact pe = self.df.p.values if xlim is None: if x[0] == 0: lower = -x[-1] / 25 else: lower = x[0] xlim = [lower, x[-1]] for ax in [ax0, ax1]: ax.plot(x, p / b, label='Fourier', lw=1) if has_exact: ax.plot(xe, pe, ls='--', c='C3', label='exact', lw=1) ax.legend(fontsize='x-small') ax0.set(xlim=xlim, title='Density', xlabel='Outcome, x') # mn = min(np.log10(exact).min(), np.log10(x).min()) mn0 = np.log10(p / b).min() * 1.25 mn = 10 ** np.floor(mn0) mx = max(np.log10(pe).max(), np.log10(x).max()) mx = 10 ** np.ceil(mx) if np.isnan(mn): mn = 1e-17 if np.isnan(mx): mx = 1 ax1.set(yscale='log', ylim=[mn, mx], xlim=xlim, title='Log density', xlabel='Outcome, x') if not verbose: if suptitle != '': self.last_fig.suptitle(suptitle) return # else: verbose mode: full monty with six plots if self.discrete and len(self._df) <= 64: drawstyle = 'steps-post' else: drawstyle = 'default' for ax in [ax2, ax5]: ax.plot(x, np.cumsum(p), label='cdf Fourier', lw=1, c='C0', drawstyle=drawstyle) if has_exact: ax.plot(xe, np.cumsum(pe * be), label='cdf exact', ls='--', lw=.5, c='C3', drawstyle=drawstyle) ax.plot(x, np.cumsum(p[::-1])[::-1], label='sf Fourier', lw=1, c='C2', drawstyle=drawstyle) if has_exact: ax.plot(xe, np.cumsum(pe[::-1] * be)[::-1], label='sf exact', ls='--', lw=.5, c='C4', drawstyle=drawstyle) ax2.set(title='sf and cdf', xlabel='Outcome, x', xlim=xlim) ax2.legend() ax5.set(yscale='log', title='log sf and cdf', ylim=[mn, 10], xlim=xlim, xlabel='Outcome, x') # ft on ax3, probably should inline this self._plot_fourier1d(ax3) # amplitude and phase both on ax4 ax4r = ax4.twinx() ax4.plot(self._ts, np.abs(self._fourier), '-', lw=1.5, c='C4', label='Amplidude') # for amplitude, only look at nonzero fts, find index of non-zero (inz) items inz = np.abs(self._fourier) > 0 tnz = self._ts[inz] fnz = self._fourier[inz] anz = np.angle(fnz) uw = np.unwrap(anz) if len(self._df) <= 256: kw = {'ls': '-', 'marker': '.', 'ms': 3, 'lw': 1} else: kw = {'lw': 1} ax4r.plot(tnz, uw, c='C2', **kw, label='phase, unwrapped') kw['lw'] = 0.5 kw['marker'] = None kw['ls'] = ':' ax4r.plot(tnz, anz, c='C2', **kw, label='phase, wrapped') ax4r.legend(loc='upper right') ax4.legend(loc='center right') ax4.set(ylabel='Amplitude |ft|', yscale='log', xlabel='frequency', xlim=[-.05, 0.5 / self.bs + 0.05]) if self.bs == 1: ax4.set(xticks=[0, .25, .5]) ax4r.set(title='Amplitude and phase', ylabel='Phase / 2π', xlabel='frequency / 2π') if suptitle != '': self.last_fig.suptitle(suptitle)
[docs] def plot_wraps(self, wraps=None, calc='survival', add_tail=False): """ Illustrate wrapping. Only run when fz.pdf is easy to calc, otherwise too slow. :param wraps: optional list of wrap values. Eg [-1,1] plots one greater and one less than [0, P) :param calc: how to estimate the density outside the base range, same ``compute_exact``. :param add_tail: plot the shifted exact densities in plots 2 and 4. """ assert self._df is not None, 'Must recompute first. Run invert().' assert self._df_exact is not None, "Must run compute_exact() first." # extract values x = np.array(self._df.index) b = x[1] - x[0] p = self._df['p'].values / b # here and below divide by b to convert to a density xe = np.array(self._df_exact.index) be = xe[1] - xe[0] pe = self._df_exact.p.values x_range = self.x_max - self.x_min rt = pe.copy() # duplicated... mn0 = np.log10(x).min() * 1.25 mn = 10 ** np.floor(mn0) mx = max(np.log10(p).max(), np.log10(x).max()) mx = 10 ** np.ceil(mx) if np.isnan(mn): mn = 1e-17 if np.isnan(mx): mx = 1 # report answer ans = [] self.last_fig, axs = plt.subplots(2, 2, figsize=(2 * 2.5, 2 * 2.), constrained_layout=True) ax0, ax1, ax2, ax3 = axs.flat for ax in axs.flat: ax.plot(x, p, label='Fourier', lw=2) lw = .5 if calc == 'density': if hasattr(self.fz, 'pdf'): pdf = self.fz.pdf elif hasattr(self.fz, 'pmf'): pdf = self.fz.pmf else: raise ValueError('fz must have pdf or pmf method for density method') for i, b_wrap in enumerate(wraps): if type(b_wrap) == int: bl = f'{b_wrap:d}' else: bl = f'{b_wrap:.3f}' xs2 = b_wrap * x_range + xe if calc == 'survival': # for computing probs using survival method xs2d = np.hstack((xs2 - be / 2, xs2[-1] + be / 2)) adj = -np.diff(self.fz.sf(xs2d)) / be rt += adj elif calc == 'density': adj = pdf(xs2) rt += adj else: raise ValueError('calc must be "survival" or "density"') c = f'C{i+1}' ax0.plot(xe, rt, label=bl, lw=lw, c=c) ax1.plot(xe, adj, label=bl, lw=lw, c=c) if add_tail: ax1.plot(xs2, adj, label=bl, lw=lw, c=c, ls=':') ax3.plot(xs2, adj, label=bl, lw=lw, c=c, ls=':') ax2.plot(xe, rt, label=bl, lw=lw, c=c) ax3.plot(xe, adj, label=bl, lw=lw, c=c) ans.append([b_wrap, xs2[0], xs2[-1], self.fz.cdf(xs2[-1]), self.fz.cdf(xs2[0]), self.fz.cdf(xs2[-1]) - self.fz.cdf(xs2[0])]) for ax in axs.flat: ax.plot(xe, pe, label='exact', ls='--', lw=1, c='C3') ax0.set(yscale='linear', title='Cumulative aliasing', xlabel='Outcome, x', ylabel='Density') ax2.set(yscale='log', title='Cumulative - log scale', xlabel='Outcome, x', ylabel='Log density') ax1.set(yscale='linear', title='Incremental aliasing', xlabel='Outcome, x', ylabel='Density') ax3.set(yscale='log', title='Incremental - log scale', xlabel='Outcome, x', ylabel='Log density') ax2.legend(fontsize='x-small', ncol=2) if add_tail: ax1.legend(fontsize='x-small', ncol=2) if add_tail: if 0 not in wraps: wraps.append(0) if 1 not in wraps: wraps.append(1) wraps.append(max(wraps) + 1) wraps = sorted(wraps) for b_wrap in wraps: ax1.axvline(x[0] + b_wrap * x_range, lw=.25, c='k', ls=':') ax3.axvline(x[0] + b_wrap * x_range, lw=.25, c='k', ls=':') for ax in [ax1, ax3]: ax.set(xticks=self.x_min + np.arange(0, max(wraps), 2) * self.x_max) ax.xaxis.set_minor_locator(ticker.AutoMinorLocator(n=2)) df = pd.DataFrame(ans, columns=['Wrap', 'x0', 'x1', 'Pr(X≤x1)', 'Pr(X≤x0)', 'Pr(X in Wrap)']) df = df.set_index('Wrap') return df
[docs] def plot_simpson(self, ylim=1e-16): """Plot Simpson's approximation.""" fig, ax = plt.subplots(1, 1, figsize=(5, 3.25), constrained_layout=True) if self._df_exact is not None: self.df_exact.p.plot(ax=ax, lw=1, c='C1') xs = np.array(self.df.index) if 'p' in self.df: ax.plot(xs, self.df.p / self.bs, label='basic', c='C0', lw=1) ax.plot(xs, self.df.simpson / self.bs, label='simpson', c='C3', ls=':') ax.set(yscale='log', ylim=ylim) ax.legend()
def _plot_fourier1d(self, ax, min_abs=1e-20): """Create simple plot of Fourier transform on one axis.""" fhat = self._fourier.copy() c = np.abs(fhat) num_large = np.sum(c > min_abs) fhat = fhat[:num_large] c = c[:num_large] ax.plot(np.real(fhat), np.imag(fhat), '-o', ms=2, lw=.5, label='ft') fhat = fhat[c > 0] / c[c > 0] ax.plot(np.real(fhat), np.imag(fhat), '-o', ms=1, lw=.5, label='ft / |ft|') lim = [-1.05, 1.05] ax.set(xlim=lim, ylim=lim, aspect='equal', xlabel='real(ft)', ylabel='imag(ft)', title='Fourier transform') ax.legend(loc='upper left') ax.axhline(0, c='k', alpha=.5, lw=.5) ax.axvline(0, c='k', alpha=.5, lw=.5)
[docs] def plot_fourier3d(self, scale=True): """Three dimensional line plot of the Fourier transform using mayavi.""" # dont want to make mayavi a required package logger.warning('REMEMBER: this routine pops a separate window!') try: from mayavi import mlab except ModuleNotFoundError: raise ModuleNotFoundError('mayavi required, pip install mayavi') # Generate data t = self._ts f_t = self._fourier c = np.abs(f_t) # Use |f(t)| for coloring if scale: f_t = f_t / c f_t[np.isinf(f_t)] = np.nan x = np.real(f_t) y = np.imag(f_t) # scale to 0, 1 z = t / np.max(t) # Create the 3D line plot mlab.figure(size=(1000, 800)) # Set figure size mlab.plot3d(x, y, z, c, tube_radius=0.005, colormap='viridis') # Add axes and labels mlab.xlabel('Re(f)') mlab.ylabel('Im(f)') mlab.zlabel('scaled t') mlab.title('Scaled FT' if scale else "FT") # Show the plot mlab.show()
[docs] def plot_fourier3da(self): """Three dimensional line plot of the Fourier transform using plotly.""" # dont want to make plotly a required package try: import plotly.graph_objects as go except ModuleNotFoundError: raise ModuleNotFoundError('plotly required, pip install plotly') t = self._ts f_t = self._fourier x = np.real(f_t) y = np.imag(f_t) c = np.abs(f_t) # normalized plot data f_t_rhs = f_t / np.abs(f_t) x_rhs = np.real(f_t_rhs) y_rhs = np.imag(f_t_rhs) # Create the subplot figure fig = go.Figure() # LHS plot fig.add_trace( go.Scatter3d( x=x, y=y, z=t, mode="lines", line=dict( color=.5, width=4, ), name="blue: f(t)", ) ) # RHS plot fig.add_trace( go.Scatter3d( x=x_rhs, y=y_rhs, z=t, mode="lines", line=dict( color=c, colorscale="Plasma", width=4, colorbar=dict( title="|f(t)|", len=0.5, lenmode="fraction", ), ), name="colored: f(t) / |f(t)|", ) ) # Update layout for side-by-side 3D plots fig.update_layout( scene=dict( xaxis=dict(title="Re(f)", range=[-1, 1]), yaxis=dict(title="Im(f)", range=[-1, 1]), zaxis=dict(title="t", range=[0, .55]), aspectmode="manual", aspectratio=dict(x=1, y=1, z=1) ), width=900, # Wider figure for two plots height=800, title='Fourier transform and normalized transform.' ) return fig
[docs] def make_levy_chf(alpha, beta): """Make the ch of stable(alpha, beta) per Nolan book page 5 Def 1.3.""" assert 0 < alpha < 2, f'alpha must be in (0, 2]' if alpha == 1: def chf(t): return np.where(t==0, 1., np.exp(- np.abs(t) * (1 + 1j * beta * 2 / np.pi * np.sign(t) * np.log(np.abs(t))))) elif alpha < 2: # alpha not = 1 tan = np.tan(np.pi / 2 * alpha) def chf(t): return np.exp(- np.abs(t) ** alpha * (1 - 1j * beta * np.sign(t) * tan)) else: # actually normal def chf(t): return np.exp(-t**2 / 2) return chf