# -*- coding: utf-8 -*-
"""
.. _combine:
combine - Spectral combination utilities
========================================
Optimal combination in presence of outliers.
See also:
* `Dixon's Q test <http://sebastianraschka.com/Articles/2014_dixon_test.html>`_
Requirements:
* python >= 3.6 (f-string)
* numpy >= v1.12 (https://github.com/numpy/numpy/issues/7720)
* matplotllib >= 2.2 (hist density kwarg)
.. autosummary::
outlier_clipping
"""
import warnings
import numpy as N
import scipy.stats as SS
import astropy.stats as AS
import matplotlib.pyplot as P
__author__ = "Yannick Copin <y.copin@ipnl.in2p3.fr>"
# Sample statistics ==============================
[docs]def sample_mean(x, dx):
"""
Standard (unweighted) mean and associated error along last axis.
"""
dx2 = N.broadcast_to(dx**2, x.shape)
if N.ma.isMaskedArray(x):
m, n = N.ma.average(x, axis=-1, returned=True)
n = N.ma.masked_equal(n, 0)
dx2 = N.ma.masked_array(dx2, mask=x.mask)
else:
m = x.mean(axis=-1)
n = x.shape[-1]
# Error on mean from individual variances
dm = (dx2.sum(axis=-1))**0.5 / n
return m, dm
[docs]def sample_weighted_mean(x, dx, intrinsic=0):
"""
Inverse-variance weighted mean (including intrinsic dispersion if any) and
associated error along last axis.
"""
# Inverse variance weighting
w = N.broadcast_to((dx**2 + intrinsic**2)**(-1), x.shape)
if N.ma.isMaskedArray(x):
wm, wsum = N.ma.average(x, weights=w,
axis=-1, returned=True)
wsum = N.ma.masked_equal(wsum, 0)
else:
wm, wsum = N.average(x, weights=w,
axis=-1, returned=True)
# Error on weighted mean
dwm = wsum**(-0.5)
return wm, dwm
# M-estimators ==============================
[docs]def weights_AS(x, dx, mu):
return dx**(-2) / (1 + ((x - mu) / dx)**2)
[docs]def mean_AS(x, dx=None, mu=None, itermax=0, fracmax=0.1, verbose=False):
"""
Robust mean estimate using Artifical Skepticism (Stetson 1989).
* Initial `mu` defaults to median estimate.
* If `itermax > 0` or `fracmax > 0`, use iterative Expectation-Maximization
algorithm up to convergence (step < fracmax × estimated error) or
maximum iterations
* Error on output `mu` neglect the covariance between weighted points
(through previous estimate of mean).
.. Warning:: native support for 1D-arrays only; multi-dimensional array
support through sub-optimal index trickery.
"""
if (itermax <= 0) and (fracmax <= 0):
raise ValueError("mean_AS requires a valid convergence criterion.")
if dx is None:
dx = 1.
if N.ndim(x) > 1:
if mu is None: # Initialize from median estimate
mu = N.ma.median(x, axis=-1, keepdims=True)
# Broadcast to same shape and ravel on all but last axes
inshape = N.shape(x)[:-1]
n = N.shape(x)[-1]
_x, _dx = ( arr.reshape(-1, n)
for arr in N.broadcast_arrays(x, dx) )
# Loop over last axis
res = N.reshape([
mean_AS(subx, dx=subdx, mu=submu,
itermax=itermax, fracmax=fracmax, verbose=False)
for subx, subdx, submu in zip(_x, _dx, mu) ],
inshape + (2,))
return res[..., 0], res[..., 1]
if mu is None: # Initialize from median estimate
mu = N.ma.median(x, axis=-1) # (n,)
iter_ = 0
frac = N.inf
while ((iter_ < itermax or itermax <= 0) and
(frac > fracmax)): # Expectation-Maximization loop
iter_ += 1
w = weights_AS(x, dx, mu) # Expectation (n,)
pmu = mu # Previous iteration
mu, sow = N.average(x, axis=-1, weights=w, # Maximization (1,)
returned=True)
# Estimated error on mu (1,)
dmu = ((w / sow * dx)**2).sum(axis=-1) ** 0.5
frac = N.abs(mu - pmu) / dmu # Fractional evolution (1,)
if verbose:
print(f"Iter #{iter_}: mu={mu} ± {dmu}, frac={frac}")
if (frac > fracmax > 0) and (iter_ == itermax):
raise RuntimeError(
"mean_AS did not converge after maximum number of iterations.")
return mu, dmu
# z-score and pull ==============================
[docs]def zscore(x, dx=None, modified=False, intrinsic=0):
"""
Compute the z-score of observations *x* along last axis.
"""
if modified: # Robust location (median) and scale (nMAD)
if dx is not None:
warnings.warn("Modified z-score do not make use of quoted errors.")
m = N.ma.median(x, axis=-1, keepdims=True)
s = 1.4826022185056018 * N.ma.median(N.abs(x - m),
axis=-1, keepdims=True)
elif dx is None: # Standard (unweighted) mean and stddev
m = N.ma.mean(x, axis=-1, keepdims=True) # Mean
s = N.ma.std(x, axis=-1, keepdims=True, ddof=1) # StdDev
else: # Weighted mean
s = N.hypot(dx, intrinsic) # Quadratic sum of std deviations
m = N.expand_dims(
N.average(x, weights=N.broadcast_to(s**(-2), x.shape), axis=-1),
axis=-1)
return (x - m) / s # (modified) z-score
[docs]def pull(x, dx, fixed_mean=None, fixed_mean_error=0):
r"""
Pull distribution from *x*, *dx* along last axis.
* Input data: *x* = :math:`x_i`, associated error *dx* =
:math:`s_i`
* Optimal (variance-weighted) mean:
:math:`X = (\sum_i x_i/s_i^2) / (\sum_i 1/s_i^2)`
* Variance on weighted mean: :math:`\sigma_X^2 = 1/(\sum_i 1/s_i^2)`
* Pull: :math:`p_i = (x_i - X_i(x))/\sqrt{\sigma_{X_i}^2 + s_i^2}`
where :math:`X_i` and :math:`\sigma_{X_i}^2` are computed
*without* point i.
If errors *dx* are correct, the pull distribution is centered on 0
with standard deviation of 1.
Pull is related to z-score :math:`z_i = (x_i - \bar{X_i})/s_i` with
:math:`\bar{X_i}` and :math:`s_i` being the sample mean and sample standard
deviation, respectively.
"""
if fixed_mean is not None:
return (x - fixed_mean) / N.hypot(dx, fixed_mean_error)
# Build up all leave-one-out combinations, similar to
# itertools.combination(range(n), n-1)
n = x.shape[-1] # Working on last axis
# 0, ..., n-1, 0, ..., n-1, ..., n times (n²,)
i = N.resize(N.arange(n), n * n)
i[::n + 1] = -1 # Mark successively 0, 1, 2, ..., n-1
# Remove marked indices & reshape (n, n-1):
# [[1, 2, 3, ...], [0, 2, 3, ...], [0, 1, 3, ...]]
j = i[i >= 0].reshape((n, n - 1))
# Handle masked arrays
average = N.ma.average if N.ma.isMaskedArray(x) else N.average
# Weighted mean, and error on weighted mean, on last axis
w = N.broadcast_to(dx**(-2), x.shape) # Variance (optimal) weighting
xi, wsum = average(x.take(j, axis=-1), # x.shape + (n-1,)
weights=w.take(j, axis=-1),
axis=-1, returned=True) # x.shape
dxi = wsum**(-0.5) # x.shape
return (x - xi) / N.hypot(dx, dxi) # Pull, Same shape as input x
# Outliers ==============================
[docs]def outlier_clipping(x, dx, method='pull', clip=5, side=0,
maxiter=None, verbose=False, nmin=2):
"""
Iterative outlier-clipping of *x*, *dx* along last axis.
Availaible methods:
* sigma: unweighted sigma-clipping
* weighted: weighted sigma-clipping
* robust: nMAD-clipping
* pull: pull-clipping
Clipping is symmetric (two-tailed) if *side = 0*, or asymmetric (lower or
upper one-tailed) if *side = ±1*.
.. Warning:: `N.ma.amax(keepdims=True)` is buggy in numpy v1.11
(https://github.com/numpy/numpy/issues/7720). Use explicit
`N.ma.expand_dims`.
.. Warning:: if input `x` is a masked array, the mask will be updated with
detected outliers. Provide a copy if you don't want it to be modified.
"""
if method == 'sigma': # Plain (unweighted) sigma-clipping
_score = lambda x, dx: zscore(x, dx=None, modified=False)
elif method == 'weighted': # Weighted sigma-clipping
_score = zscore
elif method == 'robust': # Robust clipping (no support for errors)
_score = lambda x, dx: zscore(x, dx=None, modified=True)
elif method == 'pull': # Pull-clipping
_score = pull
else:
raise NotImplementedError(f"Unknown outlier clipping method {method}.")
n = x.shape[-1]
if maxiter is None:
maxiter = n - 1
elif maxiter > n - 1:
maxiter = n - 1
warnings.warn(
f"Outlier clipping max. iteration set to max. value {maxiter}")
if side == 0: # Two-sided test
label = 'two-tailed'
_detect_outliers = lambda p: (
(N.ma.abs(p) == N.ma.max(N.ma.abs(p), axis=-1, keepdims=True)) &
(N.ma.abs(p) > clip))
elif side == -1: # One-sided test on minimal value
label = 'lower one-tailed'
_detect_outliers = lambda p: (
(p == N.ma.min(p, axis=-1, keepdims=True)) & (p < -clip))
elif side == +1: # One-sided test on maximal value
label = 'upper one-tailed'
_detect_outliers = lambda p: (
(p == N.ma.max(p, axis=-1, keepdims=True)) & (p > clip))
else:
raise NotImplementedError(
"Outlier clipping is implemented for side=0 and ±1.")
if verbose:
print(f"Outlier {method}-clipping: {label}, clip={clip}, nmin={nmin}")
niter = 0
outliers = N.zeros_like(x, dtype=bool) # Outlier mask
mx = N.ma.masked_array(x)
while niter < maxiter:
niter += 1
p = _score(mx, dx)
# Score computed from masked elements only will be masked: it should
# not, but set to 0. Restore original mask, it will only differ for
# spuriously-masked single elements.
p.mask = mx.mask
# Cannot define outliers with less than nmin elements: set score to 0
p *= (mx.count(axis=-1, keepdims=True) >= nmin)
new_outliers = _detect_outliers(p) # New outliers
# WARNING: N.count_nonzero does not apply to masked array
# https://github.com/numpy/numpy/issues/18573
# One could also simply use new_outliers.sum()
nnew = N.count_nonzero(new_outliers.compressed())
if nnew:
# Update outlier mask (incl. masked elements)
outliers |= new_outliers.filled(True)
mx.mask |= outliers # Mask outliers
if verbose:
print(f" Iter #{niter}/{maxiter}: "
f"{nnew} outliers (total: {outliers.sum()})")
else:
break
return outliers
[docs]def grubbs_gmax(n, alpha=0.05, one_sided=False):
"""
Critical value for Grubbs' test for outliers.
"""
sl = alpha / n # one-sided
if not one_sided: # two-sided
sl /= 2
# Critical value (squared) of the t distribution w/ n-2 DoF and
# significance level sl
cv2 = SS.t.ppf(1 - sl, n - 2) ** 2
gmax = (n - 1) * N.sqrt(cv2 / (n - 2 + cv2) / n)
return gmax
[docs]def grubbs_test(x, dx=None, side=0, alpha=0.05, modified=False, verbose=False):
"""
Grubbs' test for outliers (Grubbs 1969 and Stefansky 1972, also known as
*maximum normed residual test*) is used to detect a *single* outlier in a
univariate data set that follows an approximately normal distribution.
>>> t = N.array([199.31, 199.53, 200.19, 200.82,
201.92, 201.95, 202.18, 245.57])
>>> grubbs_test(t, side=+1, alpha=0.05, modified=False, verbose=True)
H0: there are no outliers in the data
Ha: the maximum value is an outlier
<BLANKLINE>
Test statistic: G = [ 2.46876461]
Significance level: α = 0.05
Critical value for an upper one-tailed test: G_max = 2.03165200155
Critical region: Reject H0 if G > G_max
array([False, False, False, False, False, False, False, True])
Source: `NIST/SEMATECH e-Handbook of Statistical Methods
<http://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm>`_
.. Warning:: Because of potential screening effects (multiple outliers
weaken the single-outlier test performance), it is not appropriate to
apply a single-outlier test sequentially in order to detect multiple
outliers.
"""
if side not in (-1, 0, +1):
raise NotImplementedError(
"Grubbs' test for outliers is implemented for side=0 and ±1.")
n = x.shape[-1]
gmax = grubbs_gmax(n, alpha, one_sided=bool(side))
# (modified) z-score
z = zscore(x, dx=dx, modified=modified)
# Grubbs' test statistic
if side == 0: # Two-sided test
g = N.amax(N.abs(z), axis=-1, keepdims=True)
adj = 'extremum'
label = 'a two-tailed'
is_outlier = (z == g) & (z > gmax)
elif side == -1: # One-sided test on minimal value
g = N.amin(z, axis=-1, keepdims=True)
adj = 'minimum'
label = 'a lower one-tailed'
is_outlier = (z == g) & (z < -gmax)
elif side == +1: # One-sided test on maximal value
g = N.amax(z, axis=-1, keepdims=True)
adj = 'maximum'
label = 'an upper one-tailed'
is_outlier = (z == g) & (z > gmax)
if verbose:
print(f"""\
H0: there are no outliers in the data
Ha: the {adj} value is an outlier
Test statistic: G = {g}
Significance level: α = {alpha}
Critical value for {label} test: G_max = {gmax}
Critical region: Reject H0 if G > G_max""")
return is_outlier # Outlier flags (w/ at most *one* identified outlier)
[docs]def chauvenet_dmax(n, p=0.5, one_sided=False):
"""
Critical value for Chauvenet criterion for outliers.
"""
sl = p / (2*n) # one-sided
if not one_sided: # two-sided
sl /= 2
dmax = -SS.norm.ppf(sl)
return dmax
[docs]def chauvenet_criterion(x, dx=None,
side=0, p=0.5, modified=False, verbose=False):
"""
Chauvenet criterion to identify outliers in a univariate data set that
follows an approximately normal distribution.
>>> t = N.array([199.31, 199.53, 200.19, 200.82,
201.92, 201.95, 202.18, 245.57])
>>> chauvenet_criterion(t, side=0, p=0.5, modified=False, verbose=True)
H0: there are no outliers in the data
Ha: the extremal value are outliers
<BLANKLINE>
Test statistic: D = [0.45 0.44 0.39 0.35 0.28 0.28 0.27 2.47]
Rejection probability: P = 0.5
Critical value for a two-tailed test: D_max = 2.15387469406
Critical region: Reject H0 if D > D_max
array([False, False, False, False, False, False, False, True])
"""
if side not in (-1, 0, +1):
raise NotImplementedError(
"Chauvenet criterion is implemented for side=0 and ±1.")
n = x.shape[-1]
dmax = chauvenet_dmax(n, p, one_sided=bool(side))
# (modified) z-score
z = zscore(x, dx=dx, modified=modified)
# Chauvenet criterion
if side == 0: # Two-sided test
d = N.abs(z)
adj = 'extremal'
label = 'a two-tailed'
is_outlier = (d > dmax)
elif side == -1: # One-sided test on minimal value
d = z
adj = 'minimal'
label = 'a lower one-tailed'
is_outlier = (d < -dmax)
elif side == +1: # One-sided test on maximal value
d = z
adj = 'maximal'
label = 'an upper one-tailed'
is_outlier = (d > dmax)
if verbose:
print(f"""\
H0: there are no outliers in the data
Ha: the {adj} value are outliers
Test statistic: D = {d}
Rejection probability: P = {p}
Critical value for {label} test: D_max = {dmax}
Critical region: Reject H0 if D > D_max""")
return is_outlier # Outlier flags (w/ at most *one* identified outlier)
[docs]def detection_rates(outliers, identified, rates=('TPR', 'TNR', 'MCC'),
verbose=True):
"""
Compute various success and failure rates by comparing true outliers to
identified ones.
Source: https://en.wikipedia.org/wiki/Confusion_matrix
"""
output = {}
p = output['P'] = outliers.sum() # Positive cases
n = output['N'] = (~outliers).sum() # Negative cases
# Useful quantities
tp = output['TP'] = (identified & outliers).sum() # True positive ident.
tn = output['TN'] = (~identified & ~outliers).sum() # True negative ident.
fp = output['FP'] = (identified & ~outliers).sum() # False positive errors
fn = output['FN'] = (~identified & outliers).sum() # False negative errors
# Compute all meaningful rates
output['TPR'] = tp / p # True Pos. Rate = Sensitivity = Completeness
output['TNR'] = tn / n # True Neg. Rate = Specificity
output['PPV'] = tp / (tp + fp) # Pos. Predictive Value = Precision = Purity
output['NPV'] = tn / (tn + fn) # Neg. Predictive Value
output['FPR'] = fp / n # False Pos. Rate = 1 - (TNR = specificity)
output['FNR'] = fn / p # False Neg. Rate = 1 - (TPR = sensitivity)
output['FDR'] = fp / (tp + fp) # False Discovery Rate = 1 - PPV
output['ACC'] = (tp + tn) / (n + p) # Accuracy
output['F1'] = 2 * tp / (2 * tp + fp + fn) # F1-score = 2/(1/tpr + 1/ppv)
# Matthews correlation coefficient (beware overfloat!)
# output['MCC'] = (tp * tn - fp * fn) / ((tp + fp) * p * n * (tn + fn))**0.5
output['MCC'] = ((output['TPR'] * output['TNR'] -
output['FPR'] * output['FNR']) /
((output['TPR'] + fp / p) * (output['TNR'] + fn / n))**0.5)
if verbose:
print(f"Input outliers: {p}/{p+n} ({p/(p + n):.2%})")
print(f"Identified outliers: {identified.sum()} (TP: {tp}, FP: {fp}, FN: {fn})")
print(f"Sensitivity TPR=1-FNR: {output['TPR']:.2%}")
print(f"Specificity TNR=1-FPR: {output['TNR']:.2%}")
print(f"Precision PPV: {output['PPV']:.2%}")
print(f"Accuracy: {output['ACC']:.2%}")
print(f"Matthews Correlation Coefficient: {output['MCC']:.3f}")
if rates is None: # Return all rates in dict
return output
else: # Return specific rates
for rate in rates:
if rate.upper() not in output:
raise KeyError(f"Unknown rate '{rate}'.")
return [ output[rate.upper()] for rate in rates ]
# Plotting funtions ==============================
[docs]def plot_pull(x, dx, fixed_mean=None, ax=None, **kwargs):
"""Pull histogram."""
ps = pull(x, dx, fixed_mean=fixed_mean)
pmean = ps.mean()
pstd = ps.std(ddof=1)
_, pvalue = SS.normaltest(ps)
print(f"Pull: mean={pmean:+.3f}, std={pstd:.3f}, normality p-value={pvalue:g}")
if ax is None:
fig = P.figure()
ax = fig.add_subplot(1, 1, 1,
xlabel='Pull' if fixed_mean is None
else f'Pull wrt. {fixed_mean}')
_, bins = AS.freedman_bin_width(ps, return_bins=True)
label = kwargs.pop('label', f'{len(x)} entries')
label += f': μ={pmean:+.3f}, σ={pstd:.3f}'
ax.hist(ps, bins=bins, histtype='stepfilled', label=label, **kwargs)
return ax
[docs]def probplot(x, dist='norm', sparams=(), fit=True, ax=None, **kwargs):
"""
Probability plot (observed vs. expected quantiles).
.. Note:: Just a wrapper to `scipy.stats.probplot`.
"""
if ax is None:
fig = P.figure()
ax = fig.add_subplot(1, 1, 1,
xlabel='Expected quantiles',
ylabel='Observed quantiles',
title=f'Probability plot ({dist})')
kwargs.setdefault('marker', '.')
kwargs.setdefault('linestyle', 'none')
if fit:
(osm, osr), (a, b, r) = SS.probplot(
x, sparams=sparams, dist=dist, fit=True)
print(f"Probplot best-fit ax+b: a={a:g}, b={b:g}, R²={r:g}")
label = ' '.join((kwargs.pop('label', ''), f": R²={r:.3f}"))
l, = ax.plot(osm, osr, **kwargs)
ax.plot(osm, a * osm + b,
color=l.get_color(), ls='--', lw=2, label=label)
else:
osm, osr = SS.probplot(x, sparams=sparams, dist=dist, fit=False)
kwargs.setdefault('label', None)
ax.plot(osm, osr, **kwargs)
return ax
[docs]def plot_rv(rv, eps=1e-3, ax=None, **kwargs):
"""
Plot (frozen) random variable PDF.
Plot (x, PDF(x)) for (frozen) random variable 'rv', in PPF range (eps, 1 -
eps).
"""
if ax is None:
fig = P.figure()
ax = fig.add_subplot(1, 1, 1,
xlabel="x", ylabel="PDF(x)")
x = N.linspace(rv.ppf(eps), rv.ppf(1 - eps), 100)
y = rv.pdf(x)
label = kwargs.pop('label', None)
ax.plot(x, y, label=label, **kwargs)
return ax
[docs]def plot_samples(x, dx=None, outliers=None, identified=None):
"""
Plot sample points, along with marginalized histogram.
Flag outliers in red if any.
"""
from mpl_toolkits.axes_grid1 import make_axes_locatable
assert x.ndim == 2 # (nsamples, points per sample)
samples = N.resize(N.arange(len(x)), x.T.shape).T # x.shape
if outliers is None:
outliers = N.zeros_like(x, dtype=bool)
# Scatter plot
fig = P.figure()
ax = fig.add_subplot(1, 1, 1, xlabel='Samples', ylabel='Value')
if dx is not None: # Error bars
assert identified is None
ax.errorbar(samples[~outliers], x[~outliers], yerr=dx[~outliers],
color='C0', marker=',', ls='none', alpha=0.5, elinewidth=0.5)
ax.errorbar(samples[outliers], x[outliers], yerr=dx[outliers],
color='C1', marker='.', ls='none', alpha=0.5, elinewidth=0.5)
elif identified is None: # Simple outlier flag
ax.plot(samples[~outliers], x[~outliers],
color='C0', marker=',', ls='none', label='Inliers')
ax.plot(samples[outliers], x[outliers],
color='C1', marker='.', ls='none',
label=f'Outliers ({(outliers.sum() / x.size):.2%})')
ax.legend(loc='best', fontsize='x-small', numpoints=1, framealpha=0.5)
else: # Combined outlier vs. identified
rates = detection_rates(outliers, identified, rates=None, verbose=False)
ax.plot(samples[~outliers & ~identified], x[~outliers & ~identified],
color='C0', marker=',', ls='none',
label=f"True negative ({rates['TNR']:.2%})")
ax.plot(samples[outliers & identified], x[outliers & identified],
color='C1', marker='.', ls='none',
label=f"True positive ({rates['TPR']:.2%})")
ax.plot(samples[~outliers & identified], x[~outliers & identified],
color='C2', marker='o', ms=5, ls='none',
label=f"False positive ({rates['FPR']:.2%})")
ax.plot(samples[outliers & ~identified], x[outliers & ~identified],
color='C3', marker='s', ms=5, ls='none',
label=f"False negative ({rates['FNR']:.2%})")
ax.legend(loc='best', fontsize='x-small',
numpoints=1, framealpha=0.5)
# Histogram axis
divider = make_axes_locatable(ax)
axHist = divider.append_axes("right", 1.5, pad=0.1, sharey=ax)
_, bins = AS.freedman_bin_width(x.ravel(), return_bins=True)
if identified is None:
axHist.hist([x[~outliers], x[outliers]],
bins=bins, color=['C0', 'C1'],
density=True, stacked=True, log=True,
histtype='stepfilled', orientation='horizontal')
else:
axHist.hist([x[~outliers & ~identified], # TN
x[outliers & identified], # TP
x[~outliers & identified], # FP
x[outliers & ~identified]], # FN
bins=bins, color=['C0', 'C1', 'C2', 'C3'],
density=True, stacked=True, log=True,
histtype='stepfilled', orientation='horizontal')
# axHist.yaxis.set_major_formatter(P.NullFormatter())
P.setp(axHist.yaxis.get_ticklabels(), visible=False)
return fig
[docs]def plot_comparison(x, y, dx=None, dy=None, xlabel='x', ylabel='y'):
from mpl_toolkits.axes_grid1 import make_axes_locatable
# Scatter plot
fig = P.figure()
ax = fig.add_subplot(1, 1, 1,
aspect='equal',
xlabel=xlabel, ylabel=ylabel)
ax.scatter(x, y, c='C0', marker='.')
ax.errorbar(x, y, xerr=dx, yerr=dy, fmt='none',
ecolor='k', alpha=0.3, elinewidth=0.5)
# Histogram axes
divider = make_axes_locatable(ax)
axHx = divider.append_axes("top", 1.5, pad=0.1, sharex=ax)
axHy = divider.append_axes("right", 1.5, pad=0.1, sharey=ax)
_, binsx = AS.freedman_bin_width(x, return_bins=True)
_, binsy = AS.freedman_bin_width(y, return_bins=True)
axHx.hist(x, bins=binsx,
density=True, log=True,
histtype='stepfilled', orientation='vertical',
label=f"µ={x.mean():.3f}, σ=:{x.std(ddof=1):.3f}")
axHx.hist(y, bins=binsy,
density=True, log=True, lw=2,
histtype='step', orientation='vertical')
axHy.hist(y, bins=binsy,
density=True, log=True,
histtype='stepfilled', orientation='horizontal',
label=f"µ={y.mean():.3f}, σ={y.std(ddof=1):.3f}")
axHy.hist(x, bins=binsx,
density=True, log=True, lw=2,
histtype='step', orientation='horizontal')
P.setp(axHx.xaxis.get_ticklabels(), visible=False)
P.setp(axHy.yaxis.get_ticklabels(), visible=False)
axHx.set_title(f"{xlabel} vs. {ylabel}")
axHx.legend(loc='upper right', fontsize='x-small',
numpoints=1, framealpha=0.5)
axHy.legend(loc='upper right', fontsize='x-small',
numpoints=1, framealpha=0.5)
return fig
if __name__ == '__main__':
from mpl import save_figures
P.matplotlib.style.use("seaborn")
N.random.seed(42)
n = 4 # Nb of points per sample
m = 10000 # Nb of samples
size = (m, n)
print(f"{m} realizations of {n}-samples")
# Error distribution
dx_dist = SS.lognorm(0.25) # Log-normal distribution
# Outlier distribution
eps = 5e-2
nsig = 10
print(f"Probability of {nsig}-sigma log-normal outliers: {eps}")
outliers = N.random.rand(m, n) <= eps # Outlier mask
outliers_dist = SS.lognorm(0.25, scale=nsig) # Outlier distribution
outliers_val = outliers_dist.rvs(size=size) # Outlier values
ax = plot_rv(dx_dist, label='StdDev')
ax = plot_rv(outliers_dist, label='Outliers', ax=ax)
ax.legend(fontsize='small')
ax.set_title("Input distributions")
# We assume the outlying process does not affect the measurement variance
dx = dx_dist.rvs(size=size)
# Generate a random sample of *m* n-tuple, centered on 0 with a
# standard error of dx.
x = N.random.normal(loc=0, scale=dx, size=size) # Pure gaussian errors
# Add a fraction `epsilon` of `nsig` sigma positive outliers
x += N.where(outliers, outliers_val, 0)
fig = plot_samples(x, dx, outliers=outliers)
fig.suptitle(f"Input: {eps:.1%} of {nsig}-sigma log-normal outliers",
fontsize='large')
noutliers = outliers.sum(axis=-1) # Nb of outliers in each sample
for i in range(n + 1):
print(f" {i} outlier(s): "
f"p={SS.binom.pmf(i, n, eps)}, n={len(noutliers[noutliers == i])}")
p = pull(x, dx)
fig = plot_samples(p, outliers=outliers)
fig.suptitle("Pull distribution on original samples",
fontsize='large')
ax = None
ax_pb = None
# Standard mean
print(" Standard mean ".center(70, '-'))
sm, dsm = sample_mean(x, dx)
ax = plot_pull(sm, dsm, fixed_mean=0,
ax=ax, alpha=0.5, label='Standard mean', log=True)
ax_pb = probplot(sm, ax=ax_pb, label='Standard mean')
# Weighted mean
print(" Weighted mean ".center(70, '-'))
wm, dwm = sample_weighted_mean(x, dx)
ax = plot_pull(wm, dwm, fixed_mean=0,
ax=ax, alpha=0.5, label='Weighted mean', log=True)
ax_pb = probplot(wm, ax=ax_pb, label='Weighted mean')
# Median
print(" Median ".center(70, '-'))
med, dmed = sample_median(x, dx)
ax = plot_pull(med, dmed, fixed_mean=0,
ax=ax, alpha=0.5, label='Median', log=True)
ax_pb = probplot(med, ax=ax_pb, label='Median')
# Artificial skepticism
print(" Artificial Skepticism ".center(70, '-'))
tm, dtm = mean_AS(x, dx)
ax = plot_pull(tm, dtm, fixed_mean=0,
ax=ax, alpha=0.5, label='AS', log=True)
ax_pb = probplot(tm, ax=ax_pb, label='AS')
# Grubbs' test for outliers
print(" Upper Grubbs' test for outliers (standard) ".center(70, '-'))
alpha = 0.01
identified = grubbs_test(x, dx, side=+1, alpha=alpha, modified=False)
print("Grubbs' critical value for an upper one-tailed test for outliers: "
f"gmax={grubbs_gmax(n, alpha, one_sided=True)}")
detection_rates(outliers, identified, verbose=True)
print(" Upper Grubbs' test for outliers (robust) ".center(70, '-'))
alpha = 0.01
identified = grubbs_test(x, dx, side=+1, alpha=alpha, modified=True)
detection_rates(outliers, identified, verbose=True)
# Weighted sigma-clipping: equivalent to an upper one-tailed Grubbs' test
print(" Upper wsigma-clipping ".center(70, '-'))
clip = 3.
side = +1
identified = outlier_clipping(x, dx, method='weighted',
clip=clip, side=side, verbose=True)
detection_rates(outliers, identified, verbose=True)
fig = plot_samples(x, outliers=outliers, identified=identified)
fig.suptitle("Identified outliers (wsigma-clipping)", fontsize='large')
mx = N.ma.masked_array(x, mask=identified)
pm, dpm = sample_weighted_mean(mx, dx) # Weighted-mean on non-outliers
fig = plot_comparison( # x=med, dx=dmed, xlabel='Median',
x=tm, dx=dtm, xlabel='Artificial Skepticism',
y=pm, dy=dpm, ylabel='Pull-clipped weighted mean')
mp = pull(x, dx,
fixed_mean=pm.reshape(-1, 1),
fixed_mean_error=dpm.reshape(-1, 1))
fig = plot_samples(mp, outliers=outliers, identified=identified)
fig.suptitle("Pull distribution on wsigma-clipped samples",
fontsize='large')
ax = plot_pull(pm, dpm, fixed_mean=0,
ax=ax, alpha=0.5, label='WSigma-clipping', log=True)
ax_pb = probplot(pm, ax=ax_pb, label='WSigma-clipping')
# Pull-clipping: equivalent to an upper one-tailed Grubbs' test
print(" Upper pull-clipping ".center(70, '-'))
clip = 3.5
side = +1
identified = outlier_clipping(x, dx, method='pull',
clip=clip, side=side, verbose=True)
detection_rates(outliers, identified, verbose=True)
fig = plot_samples(x, outliers=outliers, identified=identified)
fig.suptitle("Identified outliers (pull-clipping)", fontsize='large')
mx = N.ma.masked_array(x, mask=identified)
pm, dpm = sample_weighted_mean(mx, dx) # Weighted-mean on non-outliers
mp = pull(x, dx,
fixed_mean=pm.reshape(-1, 1),
fixed_mean_error=dpm.reshape(-1, 1))
fig = plot_samples(mp, outliers=outliers, identified=identified)
fig.suptitle("Pull distribution on pull-clipped samples",
fontsize='large')
ax = plot_pull(pm, dpm, fixed_mean=0,
ax=ax, alpha=0.5, label='Pull-clipping', log=True)
ax_pb = probplot(pm, ax=ax_pb, label='Pull-clipping')
ax.set_title(f"{eps:.1%} of {nsig}-sigma log-normal outliers")
ax.legend(fontsize='small')
ax_pb.set_title(f"{eps:.1%} of {nsig}-sigma log-normal outliers")
ax_pb.legend(loc='upper left', fontsize='small')
print(" Clip study ".center(70, '-'))
def study_clip(x, dx, clips, method='pull', side=+1):
"""Detection rates as function of clip."""
rates = N.array([
detection_rates(outliers,
outlier_clipping(x, dx, method=method,
clip=clip, side=side),
rates=('TPR', 'TNR', 'MCC'),
verbose=False)
for clip in clips ])
for clip, (tpr, tnr, mcc) in zip(clips, rates):
print(f"{method}, clip={clip}: "
f"TPR={tpr:5.1%}, TNR={tnr:5.1%}, MCC={mcc:.3f}")
fig = P.figure()
ax = fig.add_subplot(1, 1, 1,
xlabel='Clip',
ylabel='Fraction or Correlation coefficient',
title=f'{method}-clipping')
ax.plot(clips, rates[:, 0], label='Sensitivity (TPR)')
ax.plot(clips, rates[:, 1], label='Specificity (TNR)')
ax.plot(clips, rates[:, 2], label='Matthews CC')
ax.legend(loc='best', fontsize='small')
ax.set_ylim(0.8, 1.01)
return ax
# study_clip(x, dx, N.linspace(2, 7, 11), method='sigma')
study_clip(x, dx, N.linspace(2, 7, 11), method='weighted')
# study_clip(x, dx, N.linspace(2, 7, 11), method='robust')
study_clip(x, dx, N.linspace(2, 7, 11), method='pull')
# Application to spectral combination
def plot_spectra(x, dx, estimates=[]):
n = x.shape[-1]
index = N.arange(x.shape[0]) # Wavelength
fig = P.figure()
ax = fig.add_subplot(1, 1, 1,
xlabel="Index",
ylabel="Flux")
# Individual spectra
dx = N.broadcast_to(dx, x.shape)
for i in range(n):
ax.fill_between(index, x[:, i] - dx[:, i], x[:, i] + dx[:, i],
facecolor='0.5', edgecolor='none', alpha=0.5)
for name, (m, dm) in estimates:
rms = float(N.mean(m**2) ** 0.5) # Handle MaskedArrays
l, = ax.plot(index, m, lw=2,
label=name + f' (RMS={rms:.3f})')
ax.fill_between(index, m - dm, m + dm,
edgecolor='none', color=l.get_color(), alpha=0.5)
ax.legend(fontsize='small')
return fig
if False:
plot_spectra(x, dx,
estimates=[('Weighted mean', (wm, dwm)),
('Median', (med, dmed)),
('Pull-clipping', (pm, dpm))])
save_figures(prefix='combine_figs', ext='pdf', joint=True)
P.show()