# -*- coding: utf-8 -*-
# Time-stamp: <2021-04-13 02:28 ycopin@lyonovae03>
"""
.. _sir:
sir - SIR spectrum manipulations
================================
Single-dither and combined spectra from `EUC_SIR_W-COMBSPEC*.fits`.
.. rubric:: Data repository:
* repository: `ousir:S1RD@ta@metis.lambrate.inaf.it`
* path: `home/ousir/ftp/data/euclid/[prodname]/data/`
* FTP command: `sftp://[repository]/[path]/EUC_SIR_W-COMBSPEC*.fits`
.. rubric:: Internal structure:
* `ext[0].header`: `TILE_ID` and `N_OBJ`
* for each object `NNN` from 0 to `N_OBJ-1`,
* `ext['N_META'].header`: `N_DITH`, `OBJECT_ID`, `RA_OBJ`, `DEC_OBJ`
* `ext['N_COMBINED1D_SIGNAL'].data`: columns `WAVELENGTH`, `SIGNAL`, `QUALITY`
and `VAR`. `QUALITY=0` for good data.
* for each dither `X` from 1 to `N_DITH`:
* `ext['N_DITH1D_X_SIGNAL']`, with `X`=`DITH_ID` (1-4)
* data: same columns as combined extension
* header: `GWA_POS` (str) and `GWA_TILT` (int)
.. autosummary::
Spectrum
Dithers
"""
import os
import warnings
from collections import OrderedDict
import numpy as N
import matplotlib.pyplot as P
import astropy.io.fits as F
import inspec.combine as C
from inspec.mpl import save_figures
from inspec.statistics import mad
__author__ = "Yannick Copin <y.copin@ipnl.in2p3.fr>"
[docs]def warning_simple(message, category, filename, lineno, file=None, line=None):
return f'\033[93mWARNING\033[0m {message}\n'
warning_default = warnings.formatwarning
warnings.formatwarning = warning_simple
[docs]class Spectrum:
"""
A spectrum, made of (w, f ± df) masked-array.
"""
binwidth = 13.4 #: Wavelength step [Å]
def __init__(self, w, f, df, label='anonymous'):
self.label = label
self.w = N.ma.masked_array(w)
self.f = N.ma.masked_array(f)
self.df = N.ma.masked_array(df)
# Check mask consistency
assert N.array_equal(f.mask, w.mask)
assert N.array_equal(f.mask, df.mask)
def __str__(self):
return f"{self.label}: {N.ma.count(self.f)}/{len(self.f)} px"
[docs]class Dithers:
"""
A dict of single-dither and combined spectra.
"""
def __init__(self):
self.spectra = OrderedDict()
[docs] def get_dithers(self, fdf=False):
"""
Return list of single-dither spectra, or stacked (f, df) arrays.
"""
specs = [ self.spectra[key]
for key in self.spectra if key.startswith('D#') ]
if fdf:
# (2, ndith, nw)
return N.ma.swapaxes([ N.ma.stack([s.f, s.df])
for s in specs ], 0, 1)
else:
return specs
[docs] @classmethod
def init_from_combspec(cls, hdl, obj_idx):
"""
Initialize Dithers from object in HDU list.
"""
self = cls()
self.filename = hdl._file.name # Filename
assert 0 <= obj_idx < hdl[0].header['N_OBJ'], \
f"Invalid objidx {obj_idx} (max={hdl[0].header['N_OBJ'] - 1})."
self.obj_idx = obj_idx # Object index (0-N_OBJ)
meta_name = f'{obj_idx:d}_META' # Extension name
meta_idx = hdl.index_of(meta_name) # Extension idx (from name)
self.ndith = hdl[meta_name].header['N_DITH'] # Nb of dithers
self.objID = hdl[meta_name].header['HIERARCH OBJECT_ID']
for ext_idx in range(meta_idx + 1, meta_idx + self.ndith + 2):
ext_name = hdl[ext_idx].name # Extension name (from idx)
q = hdl[ext_idx].data["QUALITY"]
mask = q > 0
w = N.ma.masked_array(hdl[ext_idx].data['WAVELENGTH'], mask=mask)
f = N.ma.masked_array(hdl[ext_idx].data['SIGNAL'], mask=mask)
df = N.ma.masked_array(hdl[ext_idx].data['VAR']**0.5, mask=mask)
# Unflagged NaNs: https://euclid.roe.ac.uk/issues/14802
if N.any(N.isnan(f)):
warnings.warn(
f"#{obj_idx:03d} [{self.objID}]: "
f"{N.count_nonzero(N.isnan(f))} unflagged NaNs in signal.")
f = N.ma.fix_invalid(f)
w.mask |= f.mask
df.mask |= f.mask
label = 'combined' if 'COMBINED' in ext_name \
else f"D#{hdl[ext_idx].header['DITH_ID']}"
self.spectra[label] = Spectrum(w, f, df, label=label)
# # Inconsistent flags: https://euclid.roe.ac.uk/issues/14804
# cmask = self.spectra['combined'].f.mask
# dmask = self.combined_dither_masks()
# if not N.all(cmask == dmask):
# warnings.warn(
# "#{:03d}: inconsistent masks (combined: {}, dithers: {})."
# .format(obj_idx,
# N.count_nonzero(cmask), N.count_nonzero(dmask)))
if not self.spectra['combined'].f.count(): # No valid combined px
dmask = self.combined_dither_masks()
if N.count_nonzero(~dmask): # Valid dither px
warnings.warn(
f"#{obj_idx:03d} [{self.objID}]: "
"empty combined while non-empty dithers.")
return self
[docs] def combined_dither_masks(self):
"""
Combined mask from dithers.
"""
return ~N.logical_or.reduce([ ~s.f.mask
for s in self.get_dithers() ], axis=0)
def __str__(self):
basename = os.path.basename(self.filename)
s = f"{basename}#{self.obj_idx}: {self.ndith} dithers"
# for label, spec in self.spectra.items():
# s += f"\n {label}: {spec.f.count()}/{len(spec.f)} px"
return s
[docs] def combine(self, stat='wmean', clipping=None, **kwargs):
"""
Combine using stat and clipping, and store under key='[clipping-]stat'.
stat: mean|wmean|median
clipping: None|sigma|wsigma|robust|pull
"""
w = self.spectra['combined'].w.copy()
f, df = self.get_dithers(fdf=True)
if clipping in ['pull', 'sigma', 'weighted', 'robust']:
method = clipping
elif clipping == 'wsigma':
method = 'weighted'
elif clipping is not None:
raise NotImplementedError(f"Unknown clipping method {clipping!r}")
if stat == 'wmean':
fn = C.sample_weighted_mean
elif stat == 'mean':
fn = C.sample_mean
elif stat == 'median':
fn = C.sample_median
else:
raise NotImplementedError(f"Unknown combine statistic {stat!r}")
if clipping:
# Clipping only works for at least 2 dithers.
if self.ndith > 1:
identified = C.outlier_clipping(
f.T, df.T, method=method, **kwargs).T
f.mask |= identified
df.mask |= identified
key = f'{clipping}-{stat}' # See decipher_strategy
else:
key = stat
c, dc = fn(f.T, df.T)
w.mask = c.mask
dc.mask = c.mask
label = f"{os.path.basename(self.filename)}#{self.obj_idx}: {key}"
self.spectra[key] = Spectrum(w, c, dc, label=label)
return c, dc
[docs] def plot(self, ax=None, discard=None):
"""
Plot combined and single-dither spectra.
"""
if ax is None:
fig, ax = P.subplots(1, 1)
ax.set(xlabel="Wavelength [Å]", ylabel="Flux")
basename = os.path.basename(self.filename)
ax.set_title(f"{basename}#{self.obj_idx} [{self.objID}]",
fontsize='x-small')
ax.grid()
# Secondary axis in px
w = self.spectra['combined'].w.data
w2i = lambda x: (x - w[0]) / Spectrum.binwidth
i2w = lambda i: w[0] + i * Spectrum.binwidth
ax2 = ax.secondary_xaxis('top', functions=(w2i, i2w))
ax2.set_xlabel("Pixel", fontsize='small')
for name, spec in self.spectra.items():
if discard and name in discard:
continue
isDither = name.startswith('D#')
kwargs = dict(lw=2 if not isDither else 1,
zorder=1 if not isDither else 0)
l, = ax.plot(spec.w, spec.f,
label=f"{name} ({spec.w.count()} px)",
**kwargs)
ax.fill_between(spec.w, spec.f - spec.df, spec.f + spec.df,
fc=l.get_color(),
alpha=0.2, label='_')
ax.legend(fontsize='small')
return ax
[docs] def get_pull(self, clip=5, side=0, maxiter=None, return_final=False):
"""
Get pull of individual dithers.
It should not be straight single-iteration pull values because of
screening effects. Rather, it should iteratively correspond to pull
values just before clipping.
"""
f, df = self.get_dithers(fdf=True)
if maxiter is None:
maxiter = f.shape[0] - 1
pull = C.pull(f.T, df.T).T # 1st iteration pull (ndither, nlbda)
p = None
niter = 0
while niter < maxiter:
niter += 1
# Outlier clipping
identified = C.outlier_clipping(f.T, df.T, method='pull',
clip=clip, side=side, maxiter=1).T
# WARNING: N.count_nonzero does not apply to masked array
# https://github.com/numpy/numpy/issues/18573
if N.count_nonzero(identified.compressed()) == 0:
break
f.mask |= identified # mask identified outliers
df.mask |= identified
p = C.pull(f.T, df.T).T # new pull (ndither, nlbda)
# Update pull array with new pull values
pull[~p.mask] = p[~p.mask]
return pull if not return_final else (pull, p)
[docs]def decipher_strategy(key):
"""
Decipher '[clipping-]stat' strategy.
"""
tokens = key.split('-')
if len(tokens) == 1:
clipping, stat = None, tokens[0]
elif len(tokens) == 2:
clipping, stat = tokens
else:
raise NotImplementedError(f"Unknown combine strategy {key!r}")
return clipping, stat
[docs]def plot_spectra(hdl, obj_ids, keys=['combined'], nperfig=50, **kwargs):
sub_ids = [ obj_ids[i:i+nperfig]
for i in range(0, len(obj_ids), nperfig) ]
figs = []
for ids in sub_ids:
fig, ax = P.subplots(1, 1, figsize=(8, 4 + 4*N.log10(len(ids))))
figs.append(fig)
ax.set(xlabel="Wavelength [Å]",
ylabel="Flux + offset [a.u.]", yticklabels=[])
ax.set_title(os.path.basename(hdl._file.name), fontsize='small')
fig.tight_layout()
for i, idx in enumerate(ids):
d = Dithers.init_from_combspec(hdl, idx)
print(d)
norm = 0
for j, key in enumerate(keys):
if key not in d.spectra: # Perform on-the-fly combination
clipping, stat = decipher_strategy(key)
d.combine(stat=stat, clipping=clipping, **kwargs)
s = d.spectra[key]
if j == 0: # Plot baseline and annotation
ax.plot([s.w.data[0], s.w.data[-1]], [-i, -i],
lw=0.5, c='0.5', zorder=-1)
c = 'k' if s.w.count() else 'r' # Annotation color
ax.annotate(f"#{idx:03d}[{d.ndith}]", (s.w.data[-1], -i),
ha='right', c=c)
if not s.w.count(): # Nothing to plot
continue
if norm == 0: # Set norm and offset from 1st valid spectrum
#norm = 0.2 / s.f.std()
#offset = norm * s.f.mean()
norm = 0.1 / mad(s.f.compressed()) # Robust estimate of std
offset = norm * N.median(s.f.compressed())
y = norm * s.f - offset - i
dy = norm * s.df
c = f'C{j:02d}'
ax.plot(s.w, y, lw=0.5, c=c)
ax.fill_between(s.w, y - dy, y + dy, fc=c, alpha=0.2)
if d.ndith <= 2:
break
for j, key in enumerate(keys):
ax.plot([], [], color=f'C{j:02d}', label=key)
#ax.legend()
fig.legend()
ax.set_ylim(-i - 2, +2)
return figs
[docs]def plot_pulls(hdl, obj_ids, clip=5, ax=None):
import scipy.stats as SS
pulls = []
finalPulls = []
for idx in obj_ids:
d = Dithers.init_from_combspec(hdl, idx)
print(d)
p, fp = d.get_pull(clip=clip, return_final=True)
pulls.append(p)
if fp is not None:
finalPulls.append(fp)
pulls = N.ma.concatenate(pulls).compressed()
finalPulls = N.ma.concatenate(finalPulls).compressed()
pmean = finalPulls.mean()
pstd = finalPulls.std(ddof=1)
if ax is None:
fig, ax = P.subplots(1, 1)
ax.set(xlabel="Pull (unscreened)")
ax.set_title(os.path.basename(hdl._file.name), fontsize='small')
fig.tight_layout()
# pmin, pmax = N.percentile(pulls, (1, 100 - 1))
pmin, pmax = -10, +20
bins = N.histogram_bin_edges(pulls, bins='auto', range=(pmin, pmax))
ax.hist(pulls, histtype='step',
bins=bins, density=True, log=True,
label=f"{len(pulls)} entries [{pulls.min():+.2f},{pulls.max():+.2f}]")
ax.hist(finalPulls, histtype='stepfilled', alpha=0.2,
bins=bins, density=True, log=True,
label=f"{len(finalPulls)} entries: μ={pmean:+.3f}, σ={pstd:.3f}")
x = N.linspace(-4, 4, 100)
ax.plot(x, SS.norm(0, 1).pdf(x), c='k', lw=2,
label="N(μ=0,σ=1)")
ax.plot(x, SS.norm(pmean, pstd).pdf(x), c='C03', lw=2, ls='--',
label=f"N(μ={pmean:+.2f},σ={pstd:+.2f})")
ax.legend(loc='upper right')
# C.probplot(pulls, ax=ax2)
# C.probplot(finalPulls, ax=ax2)
return ax
[docs]def plot_stats(hdl, ax=None):
"""
Compute and plot stats about COMBSPEC HDU-list.
"""
if ax is None:
fig, ax = P.subplots(1, 1)
ax.set_title(f"{hdl._file.name} [{hdl[0].header['N_OBJ']} objects]",
fontsize='small')
fig.tight_layout()
# N_DITH from each XXX_META extension header
ndithers = N.array([ hdu.header['N_DITH']
for hdu in hdl if hdu.name.endswith('_META') ])
# Count occurence of each nb of dithers (except 0)
counts = N.bincount(ndithers)[1:]
labels = '1 2 3 4'.split()
w, _, _ = ax.pie(counts, autopct='%.1f%%') # labels=labels
ax.legend(w, labels, title="N_DITH")
return fig
if __name__ == '__main__':
import statistics as S
from glob import glob
clip = 4 # pull-clipping
pmax = 1e-3 # autofit maximal p-value
dmax = 3 # autofit maximal continuum degree
nmax = 3 # autofit maximal number of emission lines
path = "~/Science/Euclid/SGS/SIR/" \
"EUC_SIR_SCIENCE_31279_SC8_GSIR_SWF1_R1_H_V1.3_C6_P2.1_001"
fglob = "EUC_SIR_W-COMBSPEC_45033_*.fits"
fnames = glob(os.path.join(os.path.expanduser(path), fglob))
fname = fnames[0]
hdl = F.open(fname)
obj_ids = (781, 788, 810, 819, 821, 822)
obj_ids = (925, 926, 930, 932, 939, 990, 994)
obj_ids = (337, 361, 449, 498) # 149
obj_ids = (181, 245, 332, 333, 339, 374, 402, 466)
obj_ids = [4,]
obj_ids = range(1000) # All objects, BEWARE!
obj_ids = range(100)
if len(obj_ids) < 10:
for obj_idx in obj_ids:
d = Dithers.init_from_combspec(hdl, obj_idx)
d.combine(stat='wmean', clipping='pull', clip=clip)
# Auto-fit of combined spectrum
s = d.spectra['pull-wmean']
cont, lines = S.fit_adaptive_spectrum(
s.w.data, s.f.filled(0), s.df.filled(0)**2,
pmax=pmax, dmax=dmax, nmax=nmax)
label = f"autofit C{cont.degree}+{len(lines.lines)}L"
d.spectra[label] = Spectrum(
s.w, cont(s.w) + lines(s.w), s.w * 0,
label=label)
ax = d.plot(discard=['combined'])
ax = plot_pulls(hdl, obj_ids, clip=clip)
# figs = plot_spectra(hdl, obj_ids,
# keys='combined pull-wmean'.split(), #
# nperfig=20)
if obj_ids == range(1000) and False:
save_figures(prefix=fname, ext='pdf', joint=True)
P.close("all") # Prevent figure accumulation
else:
P.show()