Source code for inspec.combine

# -*- coding: utf-8 -*-

"""
.. _combine:

combine - Spectral combination utilities
========================================

Optimal combination in presence of outliers.

See also:

* `Dixon's Q test <http://sebastianraschka.com/Articles/2014_dixon_test.html>`_

Requirements:

* python >= 3.6 (f-string)
* numpy >= v1.12 (https://github.com/numpy/numpy/issues/7720)
* matplotllib >= 2.2 (hist density kwarg)

.. autosummary::

   outlier_clipping
"""

import warnings

import numpy as N
import scipy.stats as SS
import astropy.stats as AS

import matplotlib.pyplot as P

__author__ = "Yannick Copin <y.copin@ipnl.in2p3.fr>"


# Sample statistics ==============================


[docs]def sample_mean(x, dx): """ Standard (unweighted) mean and associated error along last axis. """ dx2 = N.broadcast_to(dx**2, x.shape) if N.ma.isMaskedArray(x): m, n = N.ma.average(x, axis=-1, returned=True) n = N.ma.masked_equal(n, 0) dx2 = N.ma.masked_array(dx2, mask=x.mask) else: m = x.mean(axis=-1) n = x.shape[-1] # Error on mean from individual variances dm = (dx2.sum(axis=-1))**0.5 / n return m, dm
[docs]def sample_weighted_mean(x, dx, intrinsic=0): """ Inverse-variance weighted mean (including intrinsic dispersion if any) and associated error along last axis. """ # Inverse variance weighting w = N.broadcast_to((dx**2 + intrinsic**2)**(-1), x.shape) if N.ma.isMaskedArray(x): wm, wsum = N.ma.average(x, weights=w, axis=-1, returned=True) wsum = N.ma.masked_equal(wsum, 0) else: wm, wsum = N.average(x, weights=w, axis=-1, returned=True) # Error on weighted mean dwm = wsum**(-0.5) return wm, dwm
[docs]def sample_median(x, dx): """ Median and associated error along last axis. Reference: Kendall & Stuart (1977), via `SWarp v2.21 User's Guide <https://www.astromatic.net/pubsvn/software/swarp/trunk/doc/swarp.pdf>`_ """ sqrtw = N.broadcast_to(dx**(-1), x.shape) if N.ma.isMaskedArray(x): m = N.ma.median(x, axis=-1) sqrtw = N.masked_array(sqrtw, mask=x.mask) n = N.ma.count(sqrtw, axis=-1) fac = 2 / N.pi * N.where(n % 2, (n + N.pi - 2) # Odd (n + N.pi / 2 - 1)) # Even else: m = N.median(x, axis=-1) n = x.shape[-1] fac = 2 / N.pi * ((n + N.pi - 2) if (n % 2) else (n + N.pi / 2 - 1)) # Error on median dm = (sqrtw.mean(axis=-1)**2 * fac)**(-0.5) return m, dm
# M-estimators ==============================
[docs]def weights_AS(x, dx, mu): return dx**(-2) / (1 + ((x - mu) / dx)**2)
[docs]def mean_AS(x, dx=None, mu=None, itermax=0, fracmax=0.1, verbose=False): """ Robust mean estimate using Artifical Skepticism (Stetson 1989). * Initial `mu` defaults to median estimate. * If `itermax > 0` or `fracmax > 0`, use iterative Expectation-Maximization algorithm up to convergence (step < fracmax × estimated error) or maximum iterations * Error on output `mu` neglect the covariance between weighted points (through previous estimate of mean). .. Warning:: native support for 1D-arrays only; multi-dimensional array support through sub-optimal index trickery. """ if (itermax <= 0) and (fracmax <= 0): raise ValueError("mean_AS requires a valid convergence criterion.") if dx is None: dx = 1. if N.ndim(x) > 1: if mu is None: # Initialize from median estimate mu = N.ma.median(x, axis=-1, keepdims=True) # Broadcast to same shape and ravel on all but last axes inshape = N.shape(x)[:-1] n = N.shape(x)[-1] _x, _dx = ( arr.reshape(-1, n) for arr in N.broadcast_arrays(x, dx) ) # Loop over last axis res = N.reshape([ mean_AS(subx, dx=subdx, mu=submu, itermax=itermax, fracmax=fracmax, verbose=False) for subx, subdx, submu in zip(_x, _dx, mu) ], inshape + (2,)) return res[..., 0], res[..., 1] if mu is None: # Initialize from median estimate mu = N.ma.median(x, axis=-1) # (n,) iter_ = 0 frac = N.inf while ((iter_ < itermax or itermax <= 0) and (frac > fracmax)): # Expectation-Maximization loop iter_ += 1 w = weights_AS(x, dx, mu) # Expectation (n,) pmu = mu # Previous iteration mu, sow = N.average(x, axis=-1, weights=w, # Maximization (1,) returned=True) # Estimated error on mu (1,) dmu = ((w / sow * dx)**2).sum(axis=-1) ** 0.5 frac = N.abs(mu - pmu) / dmu # Fractional evolution (1,) if verbose: print(f"Iter #{iter_}: mu={mu} ± {dmu}, frac={frac}") if (frac > fracmax > 0) and (iter_ == itermax): raise RuntimeError( "mean_AS did not converge after maximum number of iterations.") return mu, dmu
# z-score and pull ==============================
[docs]def zscore(x, dx=None, modified=False, intrinsic=0): """ Compute the z-score of observations *x* along last axis. """ if modified: # Robust location (median) and scale (nMAD) if dx is not None: warnings.warn("Modified z-score do not make use of quoted errors.") m = N.ma.median(x, axis=-1, keepdims=True) s = 1.4826022185056018 * N.ma.median(N.abs(x - m), axis=-1, keepdims=True) elif dx is None: # Standard (unweighted) mean and stddev m = N.ma.mean(x, axis=-1, keepdims=True) # Mean s = N.ma.std(x, axis=-1, keepdims=True, ddof=1) # StdDev else: # Weighted mean s = N.hypot(dx, intrinsic) # Quadratic sum of std deviations m = N.expand_dims( N.average(x, weights=N.broadcast_to(s**(-2), x.shape), axis=-1), axis=-1) return (x - m) / s # (modified) z-score
[docs]def pull(x, dx, fixed_mean=None, fixed_mean_error=0): r""" Pull distribution from *x*, *dx* along last axis. * Input data: *x* = :math:`x_i`, associated error *dx* = :math:`s_i` * Optimal (variance-weighted) mean: :math:`X = (\sum_i x_i/s_i^2) / (\sum_i 1/s_i^2)` * Variance on weighted mean: :math:`\sigma_X^2 = 1/(\sum_i 1/s_i^2)` * Pull: :math:`p_i = (x_i - X_i(x))/\sqrt{\sigma_{X_i}^2 + s_i^2}` where :math:`X_i` and :math:`\sigma_{X_i}^2` are computed *without* point i. If errors *dx* are correct, the pull distribution is centered on 0 with standard deviation of 1. Pull is related to z-score :math:`z_i = (x_i - \bar{X_i})/s_i` with :math:`\bar{X_i}` and :math:`s_i` being the sample mean and sample standard deviation, respectively. """ if fixed_mean is not None: return (x - fixed_mean) / N.hypot(dx, fixed_mean_error) # Build up all leave-one-out combinations, similar to # itertools.combination(range(n), n-1) n = x.shape[-1] # Working on last axis # 0, ..., n-1, 0, ..., n-1, ..., n times (n²,) i = N.resize(N.arange(n), n * n) i[::n + 1] = -1 # Mark successively 0, 1, 2, ..., n-1 # Remove marked indices & reshape (n, n-1): # [[1, 2, 3, ...], [0, 2, 3, ...], [0, 1, 3, ...]] j = i[i >= 0].reshape((n, n - 1)) # Handle masked arrays average = N.ma.average if N.ma.isMaskedArray(x) else N.average # Weighted mean, and error on weighted mean, on last axis w = N.broadcast_to(dx**(-2), x.shape) # Variance (optimal) weighting xi, wsum = average(x.take(j, axis=-1), # x.shape + (n-1,) weights=w.take(j, axis=-1), axis=-1, returned=True) # x.shape dxi = wsum**(-0.5) # x.shape return (x - xi) / N.hypot(dx, dxi) # Pull, Same shape as input x
# Outliers ==============================
[docs]def outlier_clipping(x, dx, method='pull', clip=5, side=0, maxiter=None, verbose=False, nmin=2): """ Iterative outlier-clipping of *x*, *dx* along last axis. Availaible methods: * sigma: unweighted sigma-clipping * weighted: weighted sigma-clipping * robust: nMAD-clipping * pull: pull-clipping Clipping is symmetric (two-tailed) if *side = 0*, or asymmetric (lower or upper one-tailed) if *side = ±1*. .. Warning:: `N.ma.amax(keepdims=True)` is buggy in numpy v1.11 (https://github.com/numpy/numpy/issues/7720). Use explicit `N.ma.expand_dims`. .. Warning:: if input `x` is a masked array, the mask will be updated with detected outliers. Provide a copy if you don't want it to be modified. """ if method == 'sigma': # Plain (unweighted) sigma-clipping _score = lambda x, dx: zscore(x, dx=None, modified=False) elif method == 'weighted': # Weighted sigma-clipping _score = zscore elif method == 'robust': # Robust clipping (no support for errors) _score = lambda x, dx: zscore(x, dx=None, modified=True) elif method == 'pull': # Pull-clipping _score = pull else: raise NotImplementedError(f"Unknown outlier clipping method {method}.") n = x.shape[-1] if maxiter is None: maxiter = n - 1 elif maxiter > n - 1: maxiter = n - 1 warnings.warn( f"Outlier clipping max. iteration set to max. value {maxiter}") if side == 0: # Two-sided test label = 'two-tailed' _detect_outliers = lambda p: ( (N.ma.abs(p) == N.ma.max(N.ma.abs(p), axis=-1, keepdims=True)) & (N.ma.abs(p) > clip)) elif side == -1: # One-sided test on minimal value label = 'lower one-tailed' _detect_outliers = lambda p: ( (p == N.ma.min(p, axis=-1, keepdims=True)) & (p < -clip)) elif side == +1: # One-sided test on maximal value label = 'upper one-tailed' _detect_outliers = lambda p: ( (p == N.ma.max(p, axis=-1, keepdims=True)) & (p > clip)) else: raise NotImplementedError( "Outlier clipping is implemented for side=0 and ±1.") if verbose: print(f"Outlier {method}-clipping: {label}, clip={clip}, nmin={nmin}") niter = 0 outliers = N.zeros_like(x, dtype=bool) # Outlier mask mx = N.ma.masked_array(x) while niter < maxiter: niter += 1 p = _score(mx, dx) # Score computed from masked elements only will be masked: it should # not, but set to 0. Restore original mask, it will only differ for # spuriously-masked single elements. p.mask = mx.mask # Cannot define outliers with less than nmin elements: set score to 0 p *= (mx.count(axis=-1, keepdims=True) >= nmin) new_outliers = _detect_outliers(p) # New outliers # WARNING: N.count_nonzero does not apply to masked array # https://github.com/numpy/numpy/issues/18573 # One could also simply use new_outliers.sum() nnew = N.count_nonzero(new_outliers.compressed()) if nnew: # Update outlier mask (incl. masked elements) outliers |= new_outliers.filled(True) mx.mask |= outliers # Mask outliers if verbose: print(f" Iter #{niter}/{maxiter}: " f"{nnew} outliers (total: {outliers.sum()})") else: break return outliers
[docs]def grubbs_gmax(n, alpha=0.05, one_sided=False): """ Critical value for Grubbs' test for outliers. """ sl = alpha / n # one-sided if not one_sided: # two-sided sl /= 2 # Critical value (squared) of the t distribution w/ n-2 DoF and # significance level sl cv2 = SS.t.ppf(1 - sl, n - 2) ** 2 gmax = (n - 1) * N.sqrt(cv2 / (n - 2 + cv2) / n) return gmax
[docs]def grubbs_test(x, dx=None, side=0, alpha=0.05, modified=False, verbose=False): """ Grubbs' test for outliers (Grubbs 1969 and Stefansky 1972, also known as *maximum normed residual test*) is used to detect a *single* outlier in a univariate data set that follows an approximately normal distribution. >>> t = N.array([199.31, 199.53, 200.19, 200.82, 201.92, 201.95, 202.18, 245.57]) >>> grubbs_test(t, side=+1, alpha=0.05, modified=False, verbose=True) H0: there are no outliers in the data Ha: the maximum value is an outlier <BLANKLINE> Test statistic: G = [ 2.46876461] Significance level: α = 0.05 Critical value for an upper one-tailed test: G_max = 2.03165200155 Critical region: Reject H0 if G > G_max array([False, False, False, False, False, False, False, True]) Source: `NIST/SEMATECH e-Handbook of Statistical Methods <http://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm>`_ .. Warning:: Because of potential screening effects (multiple outliers weaken the single-outlier test performance), it is not appropriate to apply a single-outlier test sequentially in order to detect multiple outliers. """ if side not in (-1, 0, +1): raise NotImplementedError( "Grubbs' test for outliers is implemented for side=0 and ±1.") n = x.shape[-1] gmax = grubbs_gmax(n, alpha, one_sided=bool(side)) # (modified) z-score z = zscore(x, dx=dx, modified=modified) # Grubbs' test statistic if side == 0: # Two-sided test g = N.amax(N.abs(z), axis=-1, keepdims=True) adj = 'extremum' label = 'a two-tailed' is_outlier = (z == g) & (z > gmax) elif side == -1: # One-sided test on minimal value g = N.amin(z, axis=-1, keepdims=True) adj = 'minimum' label = 'a lower one-tailed' is_outlier = (z == g) & (z < -gmax) elif side == +1: # One-sided test on maximal value g = N.amax(z, axis=-1, keepdims=True) adj = 'maximum' label = 'an upper one-tailed' is_outlier = (z == g) & (z > gmax) if verbose: print(f"""\ H0: there are no outliers in the data Ha: the {adj} value is an outlier Test statistic: G = {g} Significance level: α = {alpha} Critical value for {label} test: G_max = {gmax} Critical region: Reject H0 if G > G_max""") return is_outlier # Outlier flags (w/ at most *one* identified outlier)
[docs]def chauvenet_dmax(n, p=0.5, one_sided=False): """ Critical value for Chauvenet criterion for outliers. """ sl = p / (2*n) # one-sided if not one_sided: # two-sided sl /= 2 dmax = -SS.norm.ppf(sl) return dmax
[docs]def chauvenet_criterion(x, dx=None, side=0, p=0.5, modified=False, verbose=False): """ Chauvenet criterion to identify outliers in a univariate data set that follows an approximately normal distribution. >>> t = N.array([199.31, 199.53, 200.19, 200.82, 201.92, 201.95, 202.18, 245.57]) >>> chauvenet_criterion(t, side=0, p=0.5, modified=False, verbose=True) H0: there are no outliers in the data Ha: the extremal value are outliers <BLANKLINE> Test statistic: D = [0.45 0.44 0.39 0.35 0.28 0.28 0.27 2.47] Rejection probability: P = 0.5 Critical value for a two-tailed test: D_max = 2.15387469406 Critical region: Reject H0 if D > D_max array([False, False, False, False, False, False, False, True]) """ if side not in (-1, 0, +1): raise NotImplementedError( "Chauvenet criterion is implemented for side=0 and ±1.") n = x.shape[-1] dmax = chauvenet_dmax(n, p, one_sided=bool(side)) # (modified) z-score z = zscore(x, dx=dx, modified=modified) # Chauvenet criterion if side == 0: # Two-sided test d = N.abs(z) adj = 'extremal' label = 'a two-tailed' is_outlier = (d > dmax) elif side == -1: # One-sided test on minimal value d = z adj = 'minimal' label = 'a lower one-tailed' is_outlier = (d < -dmax) elif side == +1: # One-sided test on maximal value d = z adj = 'maximal' label = 'an upper one-tailed' is_outlier = (d > dmax) if verbose: print(f"""\ H0: there are no outliers in the data Ha: the {adj} value are outliers Test statistic: D = {d} Rejection probability: P = {p} Critical value for {label} test: D_max = {dmax} Critical region: Reject H0 if D > D_max""") return is_outlier # Outlier flags (w/ at most *one* identified outlier)
[docs]def detection_rates(outliers, identified, rates=('TPR', 'TNR', 'MCC'), verbose=True): """ Compute various success and failure rates by comparing true outliers to identified ones. Source: https://en.wikipedia.org/wiki/Confusion_matrix """ output = {} p = output['P'] = outliers.sum() # Positive cases n = output['N'] = (~outliers).sum() # Negative cases # Useful quantities tp = output['TP'] = (identified & outliers).sum() # True positive ident. tn = output['TN'] = (~identified & ~outliers).sum() # True negative ident. fp = output['FP'] = (identified & ~outliers).sum() # False positive errors fn = output['FN'] = (~identified & outliers).sum() # False negative errors # Compute all meaningful rates output['TPR'] = tp / p # True Pos. Rate = Sensitivity = Completeness output['TNR'] = tn / n # True Neg. Rate = Specificity output['PPV'] = tp / (tp + fp) # Pos. Predictive Value = Precision = Purity output['NPV'] = tn / (tn + fn) # Neg. Predictive Value output['FPR'] = fp / n # False Pos. Rate = 1 - (TNR = specificity) output['FNR'] = fn / p # False Neg. Rate = 1 - (TPR = sensitivity) output['FDR'] = fp / (tp + fp) # False Discovery Rate = 1 - PPV output['ACC'] = (tp + tn) / (n + p) # Accuracy output['F1'] = 2 * tp / (2 * tp + fp + fn) # F1-score = 2/(1/tpr + 1/ppv) # Matthews correlation coefficient (beware overfloat!) # output['MCC'] = (tp * tn - fp * fn) / ((tp + fp) * p * n * (tn + fn))**0.5 output['MCC'] = ((output['TPR'] * output['TNR'] - output['FPR'] * output['FNR']) / ((output['TPR'] + fp / p) * (output['TNR'] + fn / n))**0.5) if verbose: print(f"Input outliers: {p}/{p+n} ({p/(p + n):.2%})") print(f"Identified outliers: {identified.sum()} (TP: {tp}, FP: {fp}, FN: {fn})") print(f"Sensitivity TPR=1-FNR: {output['TPR']:.2%}") print(f"Specificity TNR=1-FPR: {output['TNR']:.2%}") print(f"Precision PPV: {output['PPV']:.2%}") print(f"Accuracy: {output['ACC']:.2%}") print(f"Matthews Correlation Coefficient: {output['MCC']:.3f}") if rates is None: # Return all rates in dict return output else: # Return specific rates for rate in rates: if rate.upper() not in output: raise KeyError(f"Unknown rate '{rate}'.") return [ output[rate.upper()] for rate in rates ]
# Plotting funtions ==============================
[docs]def plot_pull(x, dx, fixed_mean=None, ax=None, **kwargs): """Pull histogram.""" ps = pull(x, dx, fixed_mean=fixed_mean) pmean = ps.mean() pstd = ps.std(ddof=1) _, pvalue = SS.normaltest(ps) print(f"Pull: mean={pmean:+.3f}, std={pstd:.3f}, normality p-value={pvalue:g}") if ax is None: fig = P.figure() ax = fig.add_subplot(1, 1, 1, xlabel='Pull' if fixed_mean is None else f'Pull wrt. {fixed_mean}') _, bins = AS.freedman_bin_width(ps, return_bins=True) label = kwargs.pop('label', f'{len(x)} entries') label += f': μ={pmean:+.3f}, σ={pstd:.3f}' ax.hist(ps, bins=bins, histtype='stepfilled', label=label, **kwargs) return ax
[docs]def probplot(x, dist='norm', sparams=(), fit=True, ax=None, **kwargs): """ Probability plot (observed vs. expected quantiles). .. Note:: Just a wrapper to `scipy.stats.probplot`. """ if ax is None: fig = P.figure() ax = fig.add_subplot(1, 1, 1, xlabel='Expected quantiles', ylabel='Observed quantiles', title=f'Probability plot ({dist})') kwargs.setdefault('marker', '.') kwargs.setdefault('linestyle', 'none') if fit: (osm, osr), (a, b, r) = SS.probplot( x, sparams=sparams, dist=dist, fit=True) print(f"Probplot best-fit ax+b: a={a:g}, b={b:g}, R²={r:g}") label = ' '.join((kwargs.pop('label', ''), f": R²={r:.3f}")) l, = ax.plot(osm, osr, **kwargs) ax.plot(osm, a * osm + b, color=l.get_color(), ls='--', lw=2, label=label) else: osm, osr = SS.probplot(x, sparams=sparams, dist=dist, fit=False) kwargs.setdefault('label', None) ax.plot(osm, osr, **kwargs) return ax
[docs]def plot_rv(rv, eps=1e-3, ax=None, **kwargs): """ Plot (frozen) random variable PDF. Plot (x, PDF(x)) for (frozen) random variable 'rv', in PPF range (eps, 1 - eps). """ if ax is None: fig = P.figure() ax = fig.add_subplot(1, 1, 1, xlabel="x", ylabel="PDF(x)") x = N.linspace(rv.ppf(eps), rv.ppf(1 - eps), 100) y = rv.pdf(x) label = kwargs.pop('label', None) ax.plot(x, y, label=label, **kwargs) return ax
[docs]def plot_samples(x, dx=None, outliers=None, identified=None): """ Plot sample points, along with marginalized histogram. Flag outliers in red if any. """ from mpl_toolkits.axes_grid1 import make_axes_locatable assert x.ndim == 2 # (nsamples, points per sample) samples = N.resize(N.arange(len(x)), x.T.shape).T # x.shape if outliers is None: outliers = N.zeros_like(x, dtype=bool) # Scatter plot fig = P.figure() ax = fig.add_subplot(1, 1, 1, xlabel='Samples', ylabel='Value') if dx is not None: # Error bars assert identified is None ax.errorbar(samples[~outliers], x[~outliers], yerr=dx[~outliers], color='C0', marker=',', ls='none', alpha=0.5, elinewidth=0.5) ax.errorbar(samples[outliers], x[outliers], yerr=dx[outliers], color='C1', marker='.', ls='none', alpha=0.5, elinewidth=0.5) elif identified is None: # Simple outlier flag ax.plot(samples[~outliers], x[~outliers], color='C0', marker=',', ls='none', label='Inliers') ax.plot(samples[outliers], x[outliers], color='C1', marker='.', ls='none', label=f'Outliers ({(outliers.sum() / x.size):.2%})') ax.legend(loc='best', fontsize='x-small', numpoints=1, framealpha=0.5) else: # Combined outlier vs. identified rates = detection_rates(outliers, identified, rates=None, verbose=False) ax.plot(samples[~outliers & ~identified], x[~outliers & ~identified], color='C0', marker=',', ls='none', label=f"True negative ({rates['TNR']:.2%})") ax.plot(samples[outliers & identified], x[outliers & identified], color='C1', marker='.', ls='none', label=f"True positive ({rates['TPR']:.2%})") ax.plot(samples[~outliers & identified], x[~outliers & identified], color='C2', marker='o', ms=5, ls='none', label=f"False positive ({rates['FPR']:.2%})") ax.plot(samples[outliers & ~identified], x[outliers & ~identified], color='C3', marker='s', ms=5, ls='none', label=f"False negative ({rates['FNR']:.2%})") ax.legend(loc='best', fontsize='x-small', numpoints=1, framealpha=0.5) # Histogram axis divider = make_axes_locatable(ax) axHist = divider.append_axes("right", 1.5, pad=0.1, sharey=ax) _, bins = AS.freedman_bin_width(x.ravel(), return_bins=True) if identified is None: axHist.hist([x[~outliers], x[outliers]], bins=bins, color=['C0', 'C1'], density=True, stacked=True, log=True, histtype='stepfilled', orientation='horizontal') else: axHist.hist([x[~outliers & ~identified], # TN x[outliers & identified], # TP x[~outliers & identified], # FP x[outliers & ~identified]], # FN bins=bins, color=['C0', 'C1', 'C2', 'C3'], density=True, stacked=True, log=True, histtype='stepfilled', orientation='horizontal') # axHist.yaxis.set_major_formatter(P.NullFormatter()) P.setp(axHist.yaxis.get_ticklabels(), visible=False) return fig
[docs]def plot_comparison(x, y, dx=None, dy=None, xlabel='x', ylabel='y'): from mpl_toolkits.axes_grid1 import make_axes_locatable # Scatter plot fig = P.figure() ax = fig.add_subplot(1, 1, 1, aspect='equal', xlabel=xlabel, ylabel=ylabel) ax.scatter(x, y, c='C0', marker='.') ax.errorbar(x, y, xerr=dx, yerr=dy, fmt='none', ecolor='k', alpha=0.3, elinewidth=0.5) # Histogram axes divider = make_axes_locatable(ax) axHx = divider.append_axes("top", 1.5, pad=0.1, sharex=ax) axHy = divider.append_axes("right", 1.5, pad=0.1, sharey=ax) _, binsx = AS.freedman_bin_width(x, return_bins=True) _, binsy = AS.freedman_bin_width(y, return_bins=True) axHx.hist(x, bins=binsx, density=True, log=True, histtype='stepfilled', orientation='vertical', label=f"µ={x.mean():.3f}, σ=:{x.std(ddof=1):.3f}") axHx.hist(y, bins=binsy, density=True, log=True, lw=2, histtype='step', orientation='vertical') axHy.hist(y, bins=binsy, density=True, log=True, histtype='stepfilled', orientation='horizontal', label=f"µ={y.mean():.3f}, σ={y.std(ddof=1):.3f}") axHy.hist(x, bins=binsx, density=True, log=True, lw=2, histtype='step', orientation='horizontal') P.setp(axHx.xaxis.get_ticklabels(), visible=False) P.setp(axHy.yaxis.get_ticklabels(), visible=False) axHx.set_title(f"{xlabel} vs. {ylabel}") axHx.legend(loc='upper right', fontsize='x-small', numpoints=1, framealpha=0.5) axHy.legend(loc='upper right', fontsize='x-small', numpoints=1, framealpha=0.5) return fig
if __name__ == '__main__': from mpl import save_figures P.matplotlib.style.use("seaborn") N.random.seed(42) n = 4 # Nb of points per sample m = 10000 # Nb of samples size = (m, n) print(f"{m} realizations of {n}-samples") # Error distribution dx_dist = SS.lognorm(0.25) # Log-normal distribution # Outlier distribution eps = 5e-2 nsig = 10 print(f"Probability of {nsig}-sigma log-normal outliers: {eps}") outliers = N.random.rand(m, n) <= eps # Outlier mask outliers_dist = SS.lognorm(0.25, scale=nsig) # Outlier distribution outliers_val = outliers_dist.rvs(size=size) # Outlier values ax = plot_rv(dx_dist, label='StdDev') ax = plot_rv(outliers_dist, label='Outliers', ax=ax) ax.legend(fontsize='small') ax.set_title("Input distributions") # We assume the outlying process does not affect the measurement variance dx = dx_dist.rvs(size=size) # Generate a random sample of *m* n-tuple, centered on 0 with a # standard error of dx. x = N.random.normal(loc=0, scale=dx, size=size) # Pure gaussian errors # Add a fraction `epsilon` of `nsig` sigma positive outliers x += N.where(outliers, outliers_val, 0) fig = plot_samples(x, dx, outliers=outliers) fig.suptitle(f"Input: {eps:.1%} of {nsig}-sigma log-normal outliers", fontsize='large') noutliers = outliers.sum(axis=-1) # Nb of outliers in each sample for i in range(n + 1): print(f" {i} outlier(s): " f"p={SS.binom.pmf(i, n, eps)}, n={len(noutliers[noutliers == i])}") p = pull(x, dx) fig = plot_samples(p, outliers=outliers) fig.suptitle("Pull distribution on original samples", fontsize='large') ax = None ax_pb = None # Standard mean print(" Standard mean ".center(70, '-')) sm, dsm = sample_mean(x, dx) ax = plot_pull(sm, dsm, fixed_mean=0, ax=ax, alpha=0.5, label='Standard mean', log=True) ax_pb = probplot(sm, ax=ax_pb, label='Standard mean') # Weighted mean print(" Weighted mean ".center(70, '-')) wm, dwm = sample_weighted_mean(x, dx) ax = plot_pull(wm, dwm, fixed_mean=0, ax=ax, alpha=0.5, label='Weighted mean', log=True) ax_pb = probplot(wm, ax=ax_pb, label='Weighted mean') # Median print(" Median ".center(70, '-')) med, dmed = sample_median(x, dx) ax = plot_pull(med, dmed, fixed_mean=0, ax=ax, alpha=0.5, label='Median', log=True) ax_pb = probplot(med, ax=ax_pb, label='Median') # Artificial skepticism print(" Artificial Skepticism ".center(70, '-')) tm, dtm = mean_AS(x, dx) ax = plot_pull(tm, dtm, fixed_mean=0, ax=ax, alpha=0.5, label='AS', log=True) ax_pb = probplot(tm, ax=ax_pb, label='AS') # Grubbs' test for outliers print(" Upper Grubbs' test for outliers (standard) ".center(70, '-')) alpha = 0.01 identified = grubbs_test(x, dx, side=+1, alpha=alpha, modified=False) print("Grubbs' critical value for an upper one-tailed test for outliers: " f"gmax={grubbs_gmax(n, alpha, one_sided=True)}") detection_rates(outliers, identified, verbose=True) print(" Upper Grubbs' test for outliers (robust) ".center(70, '-')) alpha = 0.01 identified = grubbs_test(x, dx, side=+1, alpha=alpha, modified=True) detection_rates(outliers, identified, verbose=True) # Weighted sigma-clipping: equivalent to an upper one-tailed Grubbs' test print(" Upper wsigma-clipping ".center(70, '-')) clip = 3. side = +1 identified = outlier_clipping(x, dx, method='weighted', clip=clip, side=side, verbose=True) detection_rates(outliers, identified, verbose=True) fig = plot_samples(x, outliers=outliers, identified=identified) fig.suptitle("Identified outliers (wsigma-clipping)", fontsize='large') mx = N.ma.masked_array(x, mask=identified) pm, dpm = sample_weighted_mean(mx, dx) # Weighted-mean on non-outliers fig = plot_comparison( # x=med, dx=dmed, xlabel='Median', x=tm, dx=dtm, xlabel='Artificial Skepticism', y=pm, dy=dpm, ylabel='Pull-clipped weighted mean') mp = pull(x, dx, fixed_mean=pm.reshape(-1, 1), fixed_mean_error=dpm.reshape(-1, 1)) fig = plot_samples(mp, outliers=outliers, identified=identified) fig.suptitle("Pull distribution on wsigma-clipped samples", fontsize='large') ax = plot_pull(pm, dpm, fixed_mean=0, ax=ax, alpha=0.5, label='WSigma-clipping', log=True) ax_pb = probplot(pm, ax=ax_pb, label='WSigma-clipping') # Pull-clipping: equivalent to an upper one-tailed Grubbs' test print(" Upper pull-clipping ".center(70, '-')) clip = 3.5 side = +1 identified = outlier_clipping(x, dx, method='pull', clip=clip, side=side, verbose=True) detection_rates(outliers, identified, verbose=True) fig = plot_samples(x, outliers=outliers, identified=identified) fig.suptitle("Identified outliers (pull-clipping)", fontsize='large') mx = N.ma.masked_array(x, mask=identified) pm, dpm = sample_weighted_mean(mx, dx) # Weighted-mean on non-outliers mp = pull(x, dx, fixed_mean=pm.reshape(-1, 1), fixed_mean_error=dpm.reshape(-1, 1)) fig = plot_samples(mp, outliers=outliers, identified=identified) fig.suptitle("Pull distribution on pull-clipped samples", fontsize='large') ax = plot_pull(pm, dpm, fixed_mean=0, ax=ax, alpha=0.5, label='Pull-clipping', log=True) ax_pb = probplot(pm, ax=ax_pb, label='Pull-clipping') ax.set_title(f"{eps:.1%} of {nsig}-sigma log-normal outliers") ax.legend(fontsize='small') ax_pb.set_title(f"{eps:.1%} of {nsig}-sigma log-normal outliers") ax_pb.legend(loc='upper left', fontsize='small') print(" Clip study ".center(70, '-')) def study_clip(x, dx, clips, method='pull', side=+1): """Detection rates as function of clip.""" rates = N.array([ detection_rates(outliers, outlier_clipping(x, dx, method=method, clip=clip, side=side), rates=('TPR', 'TNR', 'MCC'), verbose=False) for clip in clips ]) for clip, (tpr, tnr, mcc) in zip(clips, rates): print(f"{method}, clip={clip}: " f"TPR={tpr:5.1%}, TNR={tnr:5.1%}, MCC={mcc:.3f}") fig = P.figure() ax = fig.add_subplot(1, 1, 1, xlabel='Clip', ylabel='Fraction or Correlation coefficient', title=f'{method}-clipping') ax.plot(clips, rates[:, 0], label='Sensitivity (TPR)') ax.plot(clips, rates[:, 1], label='Specificity (TNR)') ax.plot(clips, rates[:, 2], label='Matthews CC') ax.legend(loc='best', fontsize='small') ax.set_ylim(0.8, 1.01) return ax # study_clip(x, dx, N.linspace(2, 7, 11), method='sigma') study_clip(x, dx, N.linspace(2, 7, 11), method='weighted') # study_clip(x, dx, N.linspace(2, 7, 11), method='robust') study_clip(x, dx, N.linspace(2, 7, 11), method='pull') # Application to spectral combination def plot_spectra(x, dx, estimates=[]): n = x.shape[-1] index = N.arange(x.shape[0]) # Wavelength fig = P.figure() ax = fig.add_subplot(1, 1, 1, xlabel="Index", ylabel="Flux") # Individual spectra dx = N.broadcast_to(dx, x.shape) for i in range(n): ax.fill_between(index, x[:, i] - dx[:, i], x[:, i] + dx[:, i], facecolor='0.5', edgecolor='none', alpha=0.5) for name, (m, dm) in estimates: rms = float(N.mean(m**2) ** 0.5) # Handle MaskedArrays l, = ax.plot(index, m, lw=2, label=name + f' (RMS={rms:.3f})') ax.fill_between(index, m - dm, m + dm, edgecolor='none', color=l.get_color(), alpha=0.5) ax.legend(fontsize='small') return fig if False: plot_spectra(x, dx, estimates=[('Weighted mean', (wm, dwm)), ('Median', (med, dmed)), ('Pull-clipping', (pm, dpm))]) save_figures(prefix='combine_figs', ext='pdf', joint=True) P.show()