Source code for inspec.sir

# -*- coding: utf-8 -*-
# Time-stamp: <2021-04-13 02:28 ycopin@lyonovae03>

"""
.. _sir:

sir - SIR spectrum manipulations
================================

Single-dither and combined spectra from `EUC_SIR_W-COMBSPEC*.fits`.

.. rubric:: Data repository:

* repository: `ousir:S1RD@ta@metis.lambrate.inaf.it`
* path: `home/ousir/ftp/data/euclid/[prodname]/data/`
* FTP command: `sftp://[repository]/[path]/EUC_SIR_W-COMBSPEC*.fits`

.. rubric:: Internal structure:

* `ext[0].header`: `TILE_ID` and `N_OBJ`
* for each object `NNN` from 0 to `N_OBJ-1`,

  * `ext['N_META'].header`: `N_DITH`, `OBJECT_ID`, `RA_OBJ`, `DEC_OBJ`
  * `ext['N_COMBINED1D_SIGNAL'].data`: columns `WAVELENGTH`, `SIGNAL`, `QUALITY`
    and `VAR`. `QUALITY=0` for good data.
  * for each dither `X` from 1 to `N_DITH`:

    * `ext['N_DITH1D_X_SIGNAL']`, with `X`=`DITH_ID` (1-4)
    * data: same columns as combined extension
    * header: `GWA_POS` (str) and `GWA_TILT` (int)

.. autosummary::

   Spectrum
   Dithers
"""

import os
import warnings
from collections import OrderedDict

import numpy as N
import matplotlib.pyplot as P
import astropy.io.fits as F

import inspec.combine as C
from inspec.mpl import save_figures
from inspec.statistics import mad


__author__ = "Yannick Copin <y.copin@ipnl.in2p3.fr>"


[docs]def warning_simple(message, category, filename, lineno, file=None, line=None): return f'\033[93mWARNING\033[0m {message}\n'
warning_default = warnings.formatwarning warnings.formatwarning = warning_simple
[docs]class Spectrum: """ A spectrum, made of (w, f ± df) masked-array. """ binwidth = 13.4 #: Wavelength step [Å] def __init__(self, w, f, df, label='anonymous'): self.label = label self.w = N.ma.masked_array(w) self.f = N.ma.masked_array(f) self.df = N.ma.masked_array(df) # Check mask consistency assert N.array_equal(f.mask, w.mask) assert N.array_equal(f.mask, df.mask) def __str__(self): return f"{self.label}: {N.ma.count(self.f)}/{len(self.f)} px"
[docs]class Dithers: """ A dict of single-dither and combined spectra. """ def __init__(self): self.spectra = OrderedDict()
[docs] def get_dithers(self, fdf=False): """ Return list of single-dither spectra, or stacked (f, df) arrays. """ specs = [ self.spectra[key] for key in self.spectra if key.startswith('D#') ] if fdf: # (2, ndith, nw) return N.ma.swapaxes([ N.ma.stack([s.f, s.df]) for s in specs ], 0, 1) else: return specs
[docs] @classmethod def init_from_combspec(cls, hdl, obj_idx): """ Initialize Dithers from object in HDU list. """ self = cls() self.filename = hdl._file.name # Filename assert 0 <= obj_idx < hdl[0].header['N_OBJ'], \ f"Invalid objidx {obj_idx} (max={hdl[0].header['N_OBJ'] - 1})." self.obj_idx = obj_idx # Object index (0-N_OBJ) meta_name = f'{obj_idx:d}_META' # Extension name meta_idx = hdl.index_of(meta_name) # Extension idx (from name) self.ndith = hdl[meta_name].header['N_DITH'] # Nb of dithers self.objID = hdl[meta_name].header['HIERARCH OBJECT_ID'] for ext_idx in range(meta_idx + 1, meta_idx + self.ndith + 2): ext_name = hdl[ext_idx].name # Extension name (from idx) q = hdl[ext_idx].data["QUALITY"] mask = q > 0 w = N.ma.masked_array(hdl[ext_idx].data['WAVELENGTH'], mask=mask) f = N.ma.masked_array(hdl[ext_idx].data['SIGNAL'], mask=mask) df = N.ma.masked_array(hdl[ext_idx].data['VAR']**0.5, mask=mask) # Unflagged NaNs: https://euclid.roe.ac.uk/issues/14802 if N.any(N.isnan(f)): warnings.warn( f"#{obj_idx:03d} [{self.objID}]: " f"{N.count_nonzero(N.isnan(f))} unflagged NaNs in signal.") f = N.ma.fix_invalid(f) w.mask |= f.mask df.mask |= f.mask label = 'combined' if 'COMBINED' in ext_name \ else f"D#{hdl[ext_idx].header['DITH_ID']}" self.spectra[label] = Spectrum(w, f, df, label=label) # # Inconsistent flags: https://euclid.roe.ac.uk/issues/14804 # cmask = self.spectra['combined'].f.mask # dmask = self.combined_dither_masks() # if not N.all(cmask == dmask): # warnings.warn( # "#{:03d}: inconsistent masks (combined: {}, dithers: {})." # .format(obj_idx, # N.count_nonzero(cmask), N.count_nonzero(dmask))) if not self.spectra['combined'].f.count(): # No valid combined px dmask = self.combined_dither_masks() if N.count_nonzero(~dmask): # Valid dither px warnings.warn( f"#{obj_idx:03d} [{self.objID}]: " "empty combined while non-empty dithers.") return self
[docs] def combined_dither_masks(self): """ Combined mask from dithers. """ return ~N.logical_or.reduce([ ~s.f.mask for s in self.get_dithers() ], axis=0)
def __str__(self): basename = os.path.basename(self.filename) s = f"{basename}#{self.obj_idx}: {self.ndith} dithers" # for label, spec in self.spectra.items(): # s += f"\n {label}: {spec.f.count()}/{len(spec.f)} px" return s
[docs] def combine(self, stat='wmean', clipping=None, **kwargs): """ Combine using stat and clipping, and store under key='[clipping-]stat'. stat: mean|wmean|median clipping: None|sigma|wsigma|robust|pull """ w = self.spectra['combined'].w.copy() f, df = self.get_dithers(fdf=True) if clipping in ['pull', 'sigma', 'weighted', 'robust']: method = clipping elif clipping == 'wsigma': method = 'weighted' elif clipping is not None: raise NotImplementedError(f"Unknown clipping method {clipping!r}") if stat == 'wmean': fn = C.sample_weighted_mean elif stat == 'mean': fn = C.sample_mean elif stat == 'median': fn = C.sample_median else: raise NotImplementedError(f"Unknown combine statistic {stat!r}") if clipping: # Clipping only works for at least 2 dithers. if self.ndith > 1: identified = C.outlier_clipping( f.T, df.T, method=method, **kwargs).T f.mask |= identified df.mask |= identified key = f'{clipping}-{stat}' # See decipher_strategy else: key = stat c, dc = fn(f.T, df.T) w.mask = c.mask dc.mask = c.mask label = f"{os.path.basename(self.filename)}#{self.obj_idx}: {key}" self.spectra[key] = Spectrum(w, c, dc, label=label) return c, dc
[docs] def plot(self, ax=None, discard=None): """ Plot combined and single-dither spectra. """ if ax is None: fig, ax = P.subplots(1, 1) ax.set(xlabel="Wavelength [Å]", ylabel="Flux") basename = os.path.basename(self.filename) ax.set_title(f"{basename}#{self.obj_idx} [{self.objID}]", fontsize='x-small') ax.grid() # Secondary axis in px w = self.spectra['combined'].w.data w2i = lambda x: (x - w[0]) / Spectrum.binwidth i2w = lambda i: w[0] + i * Spectrum.binwidth ax2 = ax.secondary_xaxis('top', functions=(w2i, i2w)) ax2.set_xlabel("Pixel", fontsize='small') for name, spec in self.spectra.items(): if discard and name in discard: continue isDither = name.startswith('D#') kwargs = dict(lw=2 if not isDither else 1, zorder=1 if not isDither else 0) l, = ax.plot(spec.w, spec.f, label=f"{name} ({spec.w.count()} px)", **kwargs) ax.fill_between(spec.w, spec.f - spec.df, spec.f + spec.df, fc=l.get_color(), alpha=0.2, label='_') ax.legend(fontsize='small') return ax
[docs] def get_pull(self, clip=5, side=0, maxiter=None, return_final=False): """ Get pull of individual dithers. It should not be straight single-iteration pull values because of screening effects. Rather, it should iteratively correspond to pull values just before clipping. """ f, df = self.get_dithers(fdf=True) if maxiter is None: maxiter = f.shape[0] - 1 pull = C.pull(f.T, df.T).T # 1st iteration pull (ndither, nlbda) p = None niter = 0 while niter < maxiter: niter += 1 # Outlier clipping identified = C.outlier_clipping(f.T, df.T, method='pull', clip=clip, side=side, maxiter=1).T # WARNING: N.count_nonzero does not apply to masked array # https://github.com/numpy/numpy/issues/18573 if N.count_nonzero(identified.compressed()) == 0: break f.mask |= identified # mask identified outliers df.mask |= identified p = C.pull(f.T, df.T).T # new pull (ndither, nlbda) # Update pull array with new pull values pull[~p.mask] = p[~p.mask] return pull if not return_final else (pull, p)
[docs]def decipher_strategy(key): """ Decipher '[clipping-]stat' strategy. """ tokens = key.split('-') if len(tokens) == 1: clipping, stat = None, tokens[0] elif len(tokens) == 2: clipping, stat = tokens else: raise NotImplementedError(f"Unknown combine strategy {key!r}") return clipping, stat
[docs]def plot_spectra(hdl, obj_ids, keys=['combined'], nperfig=50, **kwargs): sub_ids = [ obj_ids[i:i+nperfig] for i in range(0, len(obj_ids), nperfig) ] figs = [] for ids in sub_ids: fig, ax = P.subplots(1, 1, figsize=(8, 4 + 4*N.log10(len(ids)))) figs.append(fig) ax.set(xlabel="Wavelength [Å]", ylabel="Flux + offset [a.u.]", yticklabels=[]) ax.set_title(os.path.basename(hdl._file.name), fontsize='small') fig.tight_layout() for i, idx in enumerate(ids): d = Dithers.init_from_combspec(hdl, idx) print(d) norm = 0 for j, key in enumerate(keys): if key not in d.spectra: # Perform on-the-fly combination clipping, stat = decipher_strategy(key) d.combine(stat=stat, clipping=clipping, **kwargs) s = d.spectra[key] if j == 0: # Plot baseline and annotation ax.plot([s.w.data[0], s.w.data[-1]], [-i, -i], lw=0.5, c='0.5', zorder=-1) c = 'k' if s.w.count() else 'r' # Annotation color ax.annotate(f"#{idx:03d}[{d.ndith}]", (s.w.data[-1], -i), ha='right', c=c) if not s.w.count(): # Nothing to plot continue if norm == 0: # Set norm and offset from 1st valid spectrum #norm = 0.2 / s.f.std() #offset = norm * s.f.mean() norm = 0.1 / mad(s.f.compressed()) # Robust estimate of std offset = norm * N.median(s.f.compressed()) y = norm * s.f - offset - i dy = norm * s.df c = f'C{j:02d}' ax.plot(s.w, y, lw=0.5, c=c) ax.fill_between(s.w, y - dy, y + dy, fc=c, alpha=0.2) if d.ndith <= 2: break for j, key in enumerate(keys): ax.plot([], [], color=f'C{j:02d}', label=key) #ax.legend() fig.legend() ax.set_ylim(-i - 2, +2) return figs
[docs]def plot_pulls(hdl, obj_ids, clip=5, ax=None): import scipy.stats as SS pulls = [] finalPulls = [] for idx in obj_ids: d = Dithers.init_from_combspec(hdl, idx) print(d) p, fp = d.get_pull(clip=clip, return_final=True) pulls.append(p) if fp is not None: finalPulls.append(fp) pulls = N.ma.concatenate(pulls).compressed() finalPulls = N.ma.concatenate(finalPulls).compressed() pmean = finalPulls.mean() pstd = finalPulls.std(ddof=1) if ax is None: fig, ax = P.subplots(1, 1) ax.set(xlabel="Pull (unscreened)") ax.set_title(os.path.basename(hdl._file.name), fontsize='small') fig.tight_layout() # pmin, pmax = N.percentile(pulls, (1, 100 - 1)) pmin, pmax = -10, +20 bins = N.histogram_bin_edges(pulls, bins='auto', range=(pmin, pmax)) ax.hist(pulls, histtype='step', bins=bins, density=True, log=True, label=f"{len(pulls)} entries [{pulls.min():+.2f},{pulls.max():+.2f}]") ax.hist(finalPulls, histtype='stepfilled', alpha=0.2, bins=bins, density=True, log=True, label=f"{len(finalPulls)} entries: μ={pmean:+.3f}, σ={pstd:.3f}") x = N.linspace(-4, 4, 100) ax.plot(x, SS.norm(0, 1).pdf(x), c='k', lw=2, label="N(μ=0,σ=1)") ax.plot(x, SS.norm(pmean, pstd).pdf(x), c='C03', lw=2, ls='--', label=f"N(μ={pmean:+.2f},σ={pstd:+.2f})") ax.legend(loc='upper right') # C.probplot(pulls, ax=ax2) # C.probplot(finalPulls, ax=ax2) return ax
[docs]def plot_stats(hdl, ax=None): """ Compute and plot stats about COMBSPEC HDU-list. """ if ax is None: fig, ax = P.subplots(1, 1) ax.set_title(f"{hdl._file.name} [{hdl[0].header['N_OBJ']} objects]", fontsize='small') fig.tight_layout() # N_DITH from each XXX_META extension header ndithers = N.array([ hdu.header['N_DITH'] for hdu in hdl if hdu.name.endswith('_META') ]) # Count occurence of each nb of dithers (except 0) counts = N.bincount(ndithers)[1:] labels = '1 2 3 4'.split() w, _, _ = ax.pie(counts, autopct='%.1f%%') # labels=labels ax.legend(w, labels, title="N_DITH") return fig
if __name__ == '__main__': import statistics as S from glob import glob clip = 4 # pull-clipping pmax = 1e-3 # autofit maximal p-value dmax = 3 # autofit maximal continuum degree nmax = 3 # autofit maximal number of emission lines path = "~/Science/Euclid/SGS/SIR/" \ "EUC_SIR_SCIENCE_31279_SC8_GSIR_SWF1_R1_H_V1.3_C6_P2.1_001" fglob = "EUC_SIR_W-COMBSPEC_45033_*.fits" fnames = glob(os.path.join(os.path.expanduser(path), fglob)) fname = fnames[0] hdl = F.open(fname) obj_ids = (781, 788, 810, 819, 821, 822) obj_ids = (925, 926, 930, 932, 939, 990, 994) obj_ids = (337, 361, 449, 498) # 149 obj_ids = (181, 245, 332, 333, 339, 374, 402, 466) obj_ids = [4,] obj_ids = range(1000) # All objects, BEWARE! obj_ids = range(100) if len(obj_ids) < 10: for obj_idx in obj_ids: d = Dithers.init_from_combspec(hdl, obj_idx) d.combine(stat='wmean', clipping='pull', clip=clip) # Auto-fit of combined spectrum s = d.spectra['pull-wmean'] cont, lines = S.fit_adaptive_spectrum( s.w.data, s.f.filled(0), s.df.filled(0)**2, pmax=pmax, dmax=dmax, nmax=nmax) label = f"autofit C{cont.degree}+{len(lines.lines)}L" d.spectra[label] = Spectrum( s.w, cont(s.w) + lines(s.w), s.w * 0, label=label) ax = d.plot(discard=['combined']) ax = plot_pulls(hdl, obj_ids, clip=clip) # figs = plot_spectra(hdl, obj_ids, # keys='combined pull-wmean'.split(), # # nperfig=20) if obj_ids == range(1000) and False: save_figures(prefix=fname, ext='pdf', joint=True) P.close("all") # Prevent figure accumulation else: P.show()