Source code for inspec.statistics

# -*- coding: utf-8 -*-
# Time-stamp: <2021-03-17 23:15 ycopin@lyonovae03>

"""
.. _statistics:

statistics - Statistical utilities
==================================

.. autosummary::

   RobustFitter
   fit_adaptive_polynom
   fit_adaptive_emilines
   fit_adaptive_spectrum
"""

__author__ = "Yannick Copin <y.copin@ipnl.in2p3.fr>"

import warnings

import numpy as N
import scipy.stats as SS

import astropy.modeling as AM
from astropy.modeling.fitting import (_validate_model,
                                      _fitter_to_model_params,
                                      _model_to_fit_params,
                                      _convert_input)

# Sample statistics ==============================


[docs]def rms(arr, weights=None, axis=None): """ [Weighted] RMS. """ return N.average(arr**2, weights=weights, axis=axis) ** 0.5
[docs]def mad(a, axis=None): """ *Median Absolute Deviation* of an array along given axis. """ return N.median(N.abs(a - N.median(a, axis=axis, keepdims=True)), axis=axis)
[docs]def sample_mean(x, vx=None, axis=None): """ Sample mean and associated error. Compute [weighted] arithmetic mean and variance on mean of *x* along given *axis*. If variance *vx* is given, the computations are inverse variance-weighted. """ mask = ~N.isfinite(x) if vx is not None: mask |= ~N.isfinite(vx) | (vx <= 0) # Discard strange points w = N.ma.masked_where(mask, 1 / vx) # Inverse-variance weighting else: w = None # No weighting mx = N.ma.masked_where(mask, x) # [Weighted] mean mean, wsum = N.ma.average(mx, weights=w, axis=axis, returned=True) # Variance on mean if vx is not None: # (w**2*vx).sum() / w.sum()**2 = 1 / w.sum() vmean = 1 / wsum else: vmean = N.ma.var(mx, axis=axis, ddof=1) / wsum return mean, vmean # Masked arrays
[docs]def sample_std(x, axis=None, normal=False): r""" Sample standard deviation and associated error. Compute standard deviation (and its standard error) of *x* along given *axis*, from estimate of variance: .. math:: Var(\sigma^2) &= \frac{1}{N} \left(m^4 - \frac{N-3}{N-1}\sigma^4\right) \\ &\simeq \frac{1}{N} × 2\sigma^4 \quad\text{in the normal approximation} **See also:** `scipy.stats.bayes_mvs <docs.scipy.org/doc/scipy/reference/generated/scipy.stats.bayes_mvs.html>`_ """ var = N.var(x, axis=axis, ddof=1) # Variance = sigma**2 if axis is None: n = N.size(x) else: n = N.shape(x)[axis] if normal: dvar2 = 2. * var ** 2 / n # Variance of variance dvar**2 else: m4 = SS.moment(x, moment=4, axis=axis) # 4th central moment # Variance of variance dvar**2 dvar2 = (m4 - (n - 3.) / (n - 1.) * var ** 2) / n std = N.sqrt(var) # Std deviation sigma = sqrt(var) dstd = N.sqrt(dvar2) / (2 * std) # dstd = dvar/(2*std) return std, dstd
# Histogram utilities ==============================
[docs]def get_range(x, range=None, log=False, percentiles=False): """ Range utility. Get range from *x* and *range* = (*min*, *max*) or `None`. If *min* (resp. *max*) is `None`, *xmin* is set to min of *x*, or of strictly positive *x* if *log* (resp. *xmax* is set to the max of *x*). If *percentiles*, *range* is actually expressed in percentiles (in percents). >>> import numpy as N >>> get_range(N.linspace(0, 10, 101), range=(5, 95), percentiles=True) (0.5, 9.5) """ if range is not None: # Specified range if percentiles: # Range in percentiles vmin, vmax = range if vmin is None: # Take the min vmin = 0 if vmax is None: # Take the max vmax = 100 xmin, xmax = N.percentile(x, (vmin, vmax)) # Range in values else: xmin, xmax = range else: # Full range xmin, xmax = None, None xx = N.ravel(x) if xmin is None: # Automatic xmin = min(x) if log: # xmin = min(x>0) xmin = xx[xx > 0].min() # Might raise ValueError else: xmin = xx.min() if xmax is None: # Automatic xmax = max(x) xmax = xx.max() return xmin, xmax
[docs]def hist_binwidth(x, choice='FD', range=None, percentiles=False): """ Optimal histogram binwidth. Choices are: - 'S': Scott's choice - 'FD': Freedman & Diaconis (1981), fast, fair if single-peaked [default] - 'SS': Shimazaki and Shinomoto (2007), slow, best choice if double-peaked - 'BR': Birgé and Rozenholc (2006), slow Analysis is restricted to *range*=(*min*, *max*) if not `None` (full range by default). **References:** - `Histogram <http://en.wikipedia.org/wiki/Histogram>`_ - `Histogram Bin-width Optimization <http://176.32.89.45/~hideaki/res/histogram.html>`_ **See also:** similar functions in `Choosing Histogram Bins <http://docs.astropy.org/en/stable/visualization/histogram.html>`_ in Astropy v1.1. .. WARNING:: deprecated, use `N.histogram_bin_edges`. """ xx = N.ravel(x) xmin, xmax = get_range(xx, range, percentiles=percentiles) xx = xx[(xx >= xmin) & (xx <= xmax)] if choice == 'FD': # Freedman and Diaconis (1981) l, h = N.percentile(xx, [25., 75.]) h = 2 * (h - l) / len(xx) ** (1. / 3.) elif choice == 'S': # Scott's choice h = 3.49 * N.std(xx, ddof=1) / len(xx) ** (1. / 3.) elif choice == 'BR': # Birgé and Rozenholc (2006) def penalty(nbin): return nbin - 1 + N.log(nbin) ** 2.5 def likelihood(nbin): hist, bins = N.histogram(xx, bins=nbin) return (hist * N.log(nbin * N.maximum(hist, 1) / float(len(xx)))).sum() nbins = N.arange(2, round(len(xx) / N.log(len(xx))) + 1, dtype='i') nbin = nbins[N.argmax([likelihood(n) - penalty(n) for n in nbins])] h = (xmax - xmin) / nbin elif choice == 'SS': # Shimazaki and Shinomoto (2007) # http://web.mit.edu/hshimaza/www//res/histogram.html def objf(nbin): hist, bins = N.histogram(xx, bins=nbin) delta = bins[1] - bins[0] k = hist.mean() v = hist.var(ddof=0) return (2 * k - v) / delta ** 2 nbins = N.arange(2, round(len(xx) / N.log(len(xx))) + 1, dtype='i') nbin = nbins[N.argmin([objf(n) for n in nbins])] h = (xmax - xmin) / nbin else: raise ValueError(f"Unknow histogram binwidth's choice '{choice}'") if not h: raise ValueError("Cannot compute binwidth for Dirac-like distribution") return h
[docs]def hist_nbin(x, choice='FD', range=None, percentiles=False): """Optimal number of bins. See :func:`hist_binwidth` for details.""" xmin, xmax = get_range(x, range=range, percentiles=percentiles) return int(N.ceil((xmax - xmin) / hist_binwidth(x, choice=choice, range=range, percentiles=percentiles)))
[docs]def hist_bins(x, choice='FD', range=None, percentiles=False, log=False): """Optimal binning. See :func:`hist_binwidth` for details.""" xmin, xmax = get_range(x, range=range, percentiles=percentiles, log=log) if log: from math import log10 lxmin, lxmax = log10(xmin), log10(xmax) xx = N.ravel(x) xx = xx[xx >= xmin] return N.logspace(lxmin, lxmax, hist_nbin(N.log10(xx), choice=choice, range=(lxmin, lxmax))) else: return N.linspace(xmin, xmax, hist_nbin(x, choice=choice, range=range, percentiles=percentiles))
# Filters ==============================
[docs]def savitzky_golay(data, kernel=11, order=4, derivative=0): """ Savitzky-Golay filter. :param data: input numpy 1D-array :param kernel: a positive odd integer > order+2 giving the kernel size :param order: order of the polynomial :param derivative: 1 or 2 for 1st or 2nd derivatives :return: smoothed data as a numpy array """ try: kernel = abs(int(kernel)) order = abs(int(order)) except ValueError: raise ValueError("kernel and order must be castable to int") if kernel % 2 != 1 or kernel < 1: raise TypeError("kernel=%d is not a positive odd number" % kernel) if kernel < order + 2: raise TypeError("kernel=%d is not > order+2=%d" % (kernel, order + 2)) hsize = (kernel - 1) // 2 b = N.mat([ [k ** i for i in range(order + 1)] for k in range(-hsize, hsize + 1)]) weights = N.linalg.pinv(b).A[derivative] hsize = (len(weights) - 1) // 2 # Temporary data, extended with a mirror image to the left and right # left extension: f(x0-x) = f(x0)-(f(x)-f(x0)) = 2f(x0)-f(x) # right extension: f(xl+x) = f(xl)+(f(xl)-f(xl-x)) = 2f(xl)-f(xl-x) leftpad = 2 * data[0] - data[1:hsize + 1][::-1] rightpad = 2 * data[-1] - data[-(hsize + 1):-1][::-1] data = N.concatenate((leftpad, data, rightpad)) # Convolution sdata = N.convolve(data, weights, mode='valid') return sdata
# Robust fit ==============================
[docs]def p_better_fit(dchi2, ddof): """Is better fit if p < pmax.""" assert ddof >= 0, "d(DoF) = DoF_2 - DoF_1 should be positive" if ddof == 0: return 0 if dchi2 < 0 else 1 # Smaller chi2 is a better fit elif dchi2 >= 0: # Higher chi2 for a more complex fit is worse return 1 elif N.isneginf(dchi2): # Cannot be worse than infinite chi2 return 0 # ddof > 0 and dchi2 < 0: is it significant? return SS.distributions.chi2.sf(-dchi2, ddof) # p-value of chi2 improvement
[docs]class RobustFitter(AM.fitting.Fitter): def __init__(self, loss='pseudo_huber', optimizer=AM.optimizers.Simplex): self.loss = self.get_loss(loss) super().__init__(optimizer, statistic=self.statistic)
[docs] @classmethod def get_loss(cls, loss): """ `loss='xxx'` is a loss function name (`loss_xxx`) or a compound 'loss_neg/loss_pos' name. """ if '/' in loss: loss_neg, loss_pos = ( cls.get_loss(subloss) for subloss in loss.split('/') ) # Composite loss function return lambda a: N.where(a < 0, loss_neg(a), loss_pos(a)) else: try: return getattr(cls, 'loss_' + loss) except AttributeError: raise NotImplementedError( f"Loss function {loss!r} not implemented")
[docs] @classmethod def loss_squared(cls, a): return a ** 2
[docs] @classmethod def loss_squaredtop(cls, a, ascale=1.): return N.where(N.abs(a / ascale) < 1, a ** 2, ascale ** 2)
[docs] @classmethod def loss_biweight(cls, a, ascale=1.): ascale *= 2.23606797749979 # sqrt(5) so that psi peaks at ± ascale return N.where(N.abs(a / ascale) < 1, (1. - (1. - (a / ascale) ** 2) ** 3) / 3 * ascale ** 2, 1. / 3 * ascale ** 2)
[docs] @classmethod def loss_pseudo_huber(cls, a, ascale=1.): return 2. * (N.sqrt(1 + (a / ascale) ** 2) - 1.) * ascale ** 2
[docs] @classmethod def loss_cauchy(cls, a, ascale=1.): return N.log(1. + (a / ascale) ** 2) * ascale ** 2
[docs] @classmethod def derivative(cls, loss_fn, eps=1e-6): from scipy.misc import derivative return lambda x: derivative(loss_fn, x, dx=eps)
[docs] @staticmethod def residuals(measured_vals, updated_model, y_sigma, x): if y_sigma is None: return measured_vals - updated_model(x) else: return (measured_vals - updated_model(x)) / y_sigma
[docs] def statistic(self, measured_vals, updated_model, y_sigma, x): return self.loss( self.residuals(measured_vals, updated_model, y_sigma, x)).sum()
def __call__(self, model, x, y, y_sigma=None, **kwargs): model_copy = _validate_model(model, self._opt_method.supported_constraints) farg = (model_copy, y_sigma) + _convert_input(x, y) p0, _ = _model_to_fit_params(model_copy) fitparams, self.fit_info = self._opt_method( self.objective_function, p0, farg, **kwargs) _fitter_to_model_params(model_copy, fitparams) return model_copy
[docs] @classmethod def plot_losses(cls, amax=3, loss=None): """`loss` can be a list of valid loss function names.""" import mpl import matplotlib.pyplot as P import inspect from itertools import cycle linecycler = cycle(["-", "--", "-.", ":"]) # Loss functions if not loss: losses = [ (name, fn) for name, fn in inspect.getmembers(cls, inspect.ismethod) if name.startswith('loss_') ] else: losses = [ (name, cls.get_loss(name)) for name in loss ] fig = P.figure(figsize=(10, 4)) axrho = fig.add_subplot(1, 3, 1, xlabel='a', ylabel='ρ(a)') axpsi = fig.add_subplot(1, 3, 2, xlabel='a', ylabel='ψ(a)') axdis = fig.add_subplot(1, 3, 3, xlabel='a', ylabel='D(a)') x = N.linspace(-amax, amax, amax * 50) for name, rho in losses: name = name.replace('loss_', '') # Remove leading 'loss_' ls = next(linecycler) # Rho function: objfunc = sum_i rho(a_i) w/ a = (y_i - m_i)/dy_i axrho.plot(x, rho(x), c=mpl.red, ls=ls, marker='None', label=name) # Psi function: Psi(a) = d rho/d a psi = cls.derivative(rho) axpsi.plot(x, psi(x), c=mpl.red, ls=ls, marker='None') # PDF: D(x) = exp(-0.5 rho(x)) if name == 'pseudo_huber': xx = N.linspace(-amax, 2 * amax, amax * 75) dis = N.exp(-0.5 * rho(xx)) dis /= dis.sum() * (xx[1] - xx[0]) axdis.plot(xx, dis, c=mpl.red, ls=ls, marker='None') else: dis = N.exp(-0.5 * rho(x)) dis /= dis.sum() * (x[1] - x[0]) axdis.plot(x, dis, c=mpl.red, ls=ls, marker='None') axrho.legend(loc='best', fontsize='small') axrho.set_title("Loss function ρ") axpsi.set_title("ψ(a) = dρ/da") axdis.set_title("PDF") fig.tight_layout() return fig
[docs]def fit_adaptive_polynom(x, y, v=None, pmax=0.05, dmax=3, loss='pseudo_huber', verbose=0): """ Adaptive continuum fit. Fit increasingly higher-order polynoms until it does not significantly improve the objective function (according to the likelihood-ratio test). Use specific loss function, or standard (weighted) least-squares if null. """ assert len(y) == len(x), "Incompatible x and y input arrays" if v is not None: assert len(v) == len(x), "Incompatible variance array" v[~(v > 0)] = N.inf # Discard non-positive variances dy = N.sqrt(v) if v is not None else None w = 1 / v if v is not None else None if loss: # Robust fitter using loss function fitter = RobustFitter(loss=loss) else: fitter = AM.fitting.SimplexLSQFitter() # Std weighted least-square fitter # See https://github.com/astropy/astropy/issues/3574 degree = -1 chi2 = N.inf # Starting point: the worst fit ever ddof = 1 # We increase the DoF by 1 at each iteration fit = None while degree + ddof <= dmax: # Try a fit with an higher-order polynom degree += ddof model = AM.models.Polynomial1D(degree=degree) if degree == 0: # 1st iteration (fit is None) model.parameters = [N.average(y, weights=w)] # Initial guess else: # Initialize parameters from previous iteration model.parameters[:len(fit.parameters)] = fit.parameters if loss: # Robust fit newfit = fitter(model, x, y, dy, verblevel=verbose > 1) else: # LSQ fit newfit = fitter(model, x, y, weights=w, verblevel=verbose > 1) residuals = y - newfit(x) if dy is not None: residuals /= dy newchi2 = N.dot(residuals, residuals) # Test chi2 # Goodness of current fit (for information) dof = len(x) - degree - 1 pval = SS.distributions.chi2.sf(newchi2, dof) newmsg = f"Degree={degree}, Chi2={newchi2}, DoF={dof}, p-value={pval:g}" # Significance of fit improvement dchi2 = newchi2 - chi2 pval = p_better_fit(dchi2, ddof) if verbose == 2: print("Testing", newmsg) print(f"dChi2={dchi2:.2f}, dDoF={ddof}, p-value={pval:g}") if pval < pmax: # This is a significantly better fit, keep on going fit = newfit chi2 = newchi2 msg = newmsg ddof = 1 else: # This is not significantly better, try an higher order degree -= ddof # Restart from last improvement ddof += 1 if verbose: print("Converged to", msg) # Stick some metadata fit.pmax = pmax fit.dmax = dmax fit.loss = loss return fit
[docs]def find_peak(y, sgfilter=(11, 4), dy=None): """ Find significant emission lines. Locate maximum of *y*, potentially after Savitzky-Golay filtering. Return position (index), peak amplitude and estimate of stddev. """ if sgfilter: hsize, order = sgfilter z = savitzky_golay(y, hsize, order, derivative=0) else: z = y if dy is None: # imax = N.argmax(z) # Peak position # if imax in (0, len(y)): # raise ValueError("Cannot find definite maximum (imax={})".format(imax)) imax = N.argmax(z[1:-1]) + 1 # Flux peak position else: imax = N.argmax(z[1:-1] / dy[1:-1]) + 1 # Pull peak position if not (z[imax - 1:imax + 2] > 0).all(): raise ValueError(f"Cannot find positive maximum (imax={imax})") zmax = z[imax] # Peak amplitude sig2 = - N.diff(N.log(z[imax - 1:imax + 2]), n=2)[0] # Peak variance if sig2 <= 0: raise ValueError(f"Cannot find maximum (imax={imax})") return imax, zmax, N.sqrt(sig2)
[docs]def fit_adaptive_emilines(x, y, v=None, pmax=0.05, nmax=3, sgfilter=(11, 4), verbose=0): """ Adaptive emission line fit. Fit increasing number of emission lines until it does not significantly improve the chi2 (according to the likelihood-ratio test). """ from functools import reduce assert len(y) == len(x), "Incompatible x and y input arrays" if v is not None: assert len(v) == len(x), "Incompatible variance array" v[~(v > 0)] = N.inf # Discard non-positive variances dy = N.sqrt(v) if v is not None else None w = 1 / v if v is not None else None fitter = AM.fitting.SimplexLSQFitter() # Least-square fitter lines = [] # Adjusted lines z = N.copy(y) # Progressive left-over signal residuals = z / dy if dy is not None else z chi2 = N.dot(residuals, residuals) # Starting point: no line while len(lines) < nmax: try: # Initial guess imax, zmax, smax = find_peak(z, sgfilter=sgfilter, dy=dy) smax *= x[imax + 1] - x[imax - 1] except ValueError as err: warnings.warn(f"Line #{len(lines) + 1}: {err}") break line = AM.models.Gaussian1D(amplitude=zmax, mean=x[imax], stddev=smax, bounds={'amplitude': (0, None), 'stddev': (0, None)}) if verbose >= 2: print(f"Testing line a={zmax}, m={x[imax]}, s={smax}") newfit = fitter(line, x, z, weights=w) # Perform test fit residuals = z - newfit(x) # Test left-over if dy is not None: residuals /= dy newchi2 = N.dot(residuals, residuals) # Test chi2 # Goodness of current fit, including current line (for information) dof = len(x) - len(line.param_names) * (len(lines) + 1) pval = SS.distributions.chi2.sf(newchi2, dof) newmsg = f"#lines={len(lines) + 1}, Chi2={newchi2}, DoF={dof}, p-value={pval:g}" # Significance of fit improvement # Each line add new parameters ddof = len(line.param_names) dchi2 = newchi2 - chi2 pval = p_better_fit(dchi2, ddof) if verbose == 2: # print(newfit) print("Tested", newmsg) print(f"dChi2={dchi2:.2f}, dDoF={ddof}, p-value={pval:g}") if pval < pmax: # This is a significantly better fit, keep on going newfit.pval = pval lines.append(newfit) chi2 = newchi2 msg = newmsg z -= newfit(x) # Effectively remove adjusted line else: # This is not significantly better, stop now break if lines: if verbose: print("Converged to", msg) result = reduce(AM.Model.__add__, lines) result.lines = lines # Individual models # p-value of "best" next undetected line result.pnext = pval if pval >= pmax else None if verbose: print(result) else: if verbose: print(f"No significant line detected at the {pmax * 100.0}% level") result = lambda x: 0 * x result.lines = [] result.pnext = pval # p-value of "best" next undetected line # Stick some metadata # To be compared to [ line.pval for line in result.lines] result.pmax = pmax result.nmax = nmax return result
[docs]def fit_adaptive_spectrum(x, y, v=None, pmax=0.05, dmax=3, nmax=3, verbose=0): """ Adaptive continuum + emission line fit. Fit adaptive polynomial continuum and emission lines until it does not significantly improve the objective function (according to the likelihood-ratio test). Return `continuum, lines`. """ cont = fit_adaptive_polynom(x, y, v=v, pmax=pmax, dmax=dmax, loss='pseudo_huber', verbose=verbose) lines = fit_adaptive_emilines(x, y - cont(x), v=v, pmax=pmax, nmax=nmax, verbose=verbose) return cont, lines
if __name__ == '__main__': import mpl import matplotlib.pyplot as P try: import seaborn seaborn.set_style("whitegrid", {"font.family": ["DejaVu Sans"]}) except ImportError: pass N.random.seed(1234) x = N.linspace(1.21, 1.79, 600) y = 2 + N.sin(x * 4) # Continuum y += AM.models.Gaussian1D(amplitude=20, mean=+1.4, stddev=0.002)(x) y += AM.models.Gaussian1D(amplitude=10, mean=+1.6, stddev=0.002)(x) n = N.random.randn(len(x)) # Gaussian noise (mu=0, sig=1) pmax = 5e-2 # Minimal significance dmax = 3 # Continuum nmax = 5 # Nb of lines sgfilter = (11, 4) # Savitzky-Golay filter sgfilter = None fig, ax = P.subplots(1, 1, figsize=(10, 5)) ax.set_title( f"Adaptive spectrum fit: pmax={pmax:.2f}, dmax={dmax}, nmax={nmax}") ax.set_xlabel("x") ax.set_ylabel("y (dy=1)") ax.plot(x, y, c='k', label="Truth") ax.fill_between(x, y - 1, y + 1, color='0.5', alpha=0.5) print(f" dmax={dmax} + noise ".center(50, '-')) ax.errorbar(x, y + n, yerr=N.ones_like(y), fmt='ko', ms=3, ecolor='0.5', elinewidth=1, zorder=-2, label="Signal") if sgfilter is not None: hsize, order = sgfilter z = savitzky_golay(y + n, hsize, order, derivative=0) ax.plot(x, z, c='0.8', label="Filtered") # Adjust continua from itertools import cycle linecycler = cycle(["-", "--", "-.", ":"]) for loss, lbl in ((None, 'SQ'), ('pseudo_huber', 'PH')): cont = fit_adaptive_polynom(x, y + n, pmax=pmax, dmax=dmax, loss=loss, verbose=2) ax.plot(x, cont(x), zorder=3, lw=2, c=mpl.red, ls=next(linecycler), label=f"{lbl}: d°={cont.degree}") # Adjust emission lines on continuum-subtracted signal lines = fit_adaptive_emilines(x, y + n - cont(x), pmax=pmax, nmax=nmax, sgfilter=sgfilter, verbose=2) if lines is not None: ax.plot(x, cont(x) + lines(x), c=mpl.green, label=f"{len(lines.lines)} lines") ax.legend(loc='best', fontsize='small') if True: # Plot loss functions amax = 4 loss = None loss = ('squared', 'pseudo_huber') figL = RobustFitter.plot_losses(amax=amax, loss=loss) # Add histogram to distribution axes of fig1 nbins = hist_nbin(y + n) // 2 ax3 = figL.axes[2] ax3.hist(y + n - cont(x), bins=nbins, density=True, log=True, color='0.8', histtype='stepfilled') P.show()