# -*- coding: utf-8 -*-
# Time-stamp: <2021-03-16 16:38:41 ycopin>
"""
.. _synth:
synth - Flux integration and synthetic magnitudes
=================================================
.. autosummary::
Filter
pixel_fracs
"""
__author__ = "Yannick Copin <y.copin@ipnl.in2p3.fr>"
import os
import warnings
from math import log10, log
import numpy as N
from . import spectrum
CLIGHT = 2.99792458e18 #: Speed of light [A/s]
HPLANCK = 6.62606896e-27 #: Planck constant [erg s]
# Functions and classes ##############################
[docs]class SynthComputationWarning(RuntimeWarning):
"""
Synthetic photometry computation warning.
"""
pass
[docs]class Filter:
"""Filter class."""
def __init__(self, string):
"""
Read filter curve from ascii-file (if `string` is a valid filename) or
define top-hat bandpass from `string` = 'name,lmin,lmax'.
"""
if os.path.exists(string):
self.read(string) # will set self attributes
elif len(string.split(',')) == 3:
self.filename = ''
self.name, lmin, lmax = string.split(',')
self.x = [float(lmin), float(lmax)]
assert 0 < self.x[0] < self.x[1], \
"Bandpass is not defined by increasing wavelength boundaries."
self.y = None
else:
raise IOError(f"Cannot find filter file '{string}'")
# Compute some useful quantities
self.start = self.x[0]
self.end = self.x[-1]
self._zp = {} # Zero-point cache
[docs] @classmethod
def bandpass(cls, name, lmin, lmax):
"""Create a bandpass filter."""
return cls(','.join((name, str(lmin), str(lmax))))
def __str__(self):
if self.filename:
s = "Filter '%s' [%s], %d pts in [%.1f,%.1f], " % \
(self.name, self.filename,
len(self.x), self.start, self.end)
else:
s = f"Bandpass '{self.name}' [{self.start:.1f},{self.end:.1f}], "
return s
[docs] def read(self, filename):
"""
Read filter transmission curve from ascii-file `filename`.
Expected format (all initial `KEYWORD` lines are optional)::
# KEYWORD FNAME filtername
# lbda [A] transmission [fraction]
"""
import re
self.filename = os.path.basename(filename)
self.name = self.filename # Plain name
x, y = [], []
for line in open(filename, 'r'):
line = line.strip()
if not line: # Skip blank lines
continue
elif line[0] == '#':
# This is a comment: look for "KEYWORD FNAME" filter name
match = re.match(r"# KEYWORD FNAME (.+)", line)
if match:
self.name = match.groups()[0].strip()
else:
# This is a data line
tokens = line.split()
if len(tokens) != 2:
raise IOError(
f"Cannot decipher lambda,transm in '{line}'.")
x.append(float(tokens[0]))
y.append(float(tokens[1]))
# Convert data to arrays
self.x = N.asarray(x, dtype='f')
self.y = N.asarray(y, dtype='f')
# Sanity checks
if not (N.diff(self.x) > 0).all():
raise IOError("Filter wavelengths not strictly increasing.")
if not ((self.y >= 0) & (self.y <= 1)).all():
raise IOError("Filter transmissions outside of [0, 1].")
[docs] def effLambda(self):
r"""
Return filter effective wavelength:
.. math::
\lambda_{\text{eff}} &= \int \lambda T(\lambda) d\lambda /
\int T(\lambda) d\lambda \\
&= (\lambda_{\min} + \lambda_{\max}) / 2
\quad\text{for a top-hat bandpass}
"""
if self.filename:
z = self.x * self.y
m = z.mean() / self.y.mean() # Transmission averaged wavelength
else:
m = (self.start + self.end) / 2 # Mid-wavelength
return m
[docs] def interpolate(self, x, monotonic=True):
"""
Filter interpolation.
Interpolate filter over input array `x` (not necessarily
regularly sampled).
`monotonic` uses PCHIP monotonic cubic interpolation to preserve
monotonicity in the interpolated data at the price of potential
discontinuities in second-derivatives. Non-`monotonic` uses
standard spline interpolation: beware that interpolation doesn't
go wild *outside* the filter definition range, or even *inside*.
"""
from scipy.interpolate import UnivariateSpline, PchipInterpolator
y = pixel_fracs(x, self.start, self.end)
if self.filename: # Real interpolation of filter curve
inside = y > 0
# Interpolate only over initial extent of filter
if monotonic:
spline = PchipInterpolator(self.x, self.y)
y[inside] = spline(x[inside])
else:
spline = UnivariateSpline(self.x, self.y, s=0)
y[inside] = N.maximum(spline(x[inside]), 0)
return y
[docs] def integrate(self, spec, p=1, wrange=None, monotonic=True):
r"""
Integrate spectrum flux over a filter.
.. math::
I_p &= \int (h\nu)^{-p} F_\nu(\nu) T(\nu) d\nu \\
&= \int \left(\frac{\lambda}{hc}\right)^{p}
F_\lambda(\lambda) T(\lambda) d\lambda
**Special cases:**
* p = 0: :math:`\int F_\lambda(\lambda) T(\lambda) d\lambda`
(energy magnitudes)
* p = 1: :math:`\int \lambda/(hc)
F_\lambda(\lambda) T(\lambda) d\lambda` (photon magnitudes)
Set `wrange` to integrate over a restricted wavelength range
(`None` is full range).
.. Note:: spectrum is supposed to be regularly sampled in
wavelength.
.. TODO::
* split this function into elementary parts, and compute integral
from linear algebraic expression (easier to manipulate for
variance/covariance computations).
* **validate computations**.
"""
if wrange is None: # Full range by default
start = spec.start - spec.step / 2
end = spec.end + spec.step / 2
else: # Restricted range
start, end = wrange
if start is None:
start = spec.start - spec.step / 2
if end is None:
end = spec.end + spec.step / 2
if self.end < start or self.start > end:
# Filter is fully outside spectrum range: nothing to be done
return N.nan, N.nan
elif self.start < start or self.end > end:
# Filter is partially outside spectrum range
if wrange is None:
# By default, don't compute incomplete integral
warnings.warn(
"Filter '%s' [%.1f-%.1f] and spectrum '%s' "
"[%.1f-%.1f] are only partially overlapping." %
(self.name, self.start, self.end, spec.name, start, end),
SynthComputationWarning)
return N.nan, N.nan
# Interpolate filter over spectrum (natively regularly sampled)
f = self.interpolate(spec.x, monotonic=monotonic)
# Take into account fractions of pixels if needed
if wrange is not None:
if self.filename:
f *= spec.fracs(start, end)
else: # f already contains fraction of pixels
f = N.minimum(f, spec.fracs(start, end))
# For the integral, the (lambda/hc)**p term is transfered in
# the filter transmission curve f.
if not p == 0:
f *= (spec.x / (HPLANCK * CLIGHT)) ** p
# Since F_nu dnu := F_l dl, one has int [1/(h nu)]
# F_nu(nu) S(nu) dnu = int [l/(hc)] F_l(l) S(l) dl, and
# the integral can be done in lambda.
fx = (spec.y * f).sum() * spec.step # Plain summation
dfx = N.sqrt((spec.v * f ** 2).sum()) * spec.step if spec.hasVar else 0
return fx, dfx
[docs] def zeroPoint(self, p=1):
r"""
Filter zero-point.
.. math::
10^{z_{p} / 2.5} &= \int (h\nu)^{-p} T(\nu) d\nu \\
&= \int \left(\frac{\lambda}{hc}\right)^{p}
\left(\frac{c}{\lambda^2}\right) T(\lambda) d\lambda
The integral is performed on sampled filter transmissions
using Simpson's integration rule, since the
wavelengthes are not necessarily regularly sampled.
**Special cases:**
* p = 0: :math:`z_0 = \int T(\nu) d\nu` (energy magnitudes)
* p = 1: :math:`z_1 = \int T(\nu) / (h\nu) d\nu` (photon magnitudes)
"""
if p in self._zp:
return self._zp[p] # Return cached value
from scipy.integrate import simps
if self.filename: # Simpson integration
if p == 0:
zp = simps(self.y, self.x)
else:
zp = simps(self.y * (self.x / (HPLANCK * CLIGHT)) ** p, self.x)
else: # Analytic expression
lmin, lmax = self.x
if p == 1:
zp = log(lmax / lmin) / HPLANCK
else:
zp = (CLIGHT / (HPLANCK * CLIGHT) ** p / (p - 1) *
(lmax ** (p - 1) - lmin ** (p - 1)))
# Sanity check
assert zp > 0, "Filter zero-point is supposed to be positive."
zp = 2.5 * log10(zp) # Convert to magnitude
self._zp[p] = zp # Cache value
return zp
[docs] def magn(self, spec, p=1, m0=48.59):
r"""
Compute magnitude in filter for spectrum.
.. math::
m = -2.5\log\left(\int (h\nu)^{-p} F_\nu(\nu) T(\nu) d\nu\right) +
z_p - m_0
**Reference:** `Vega and AB Photometric Systems
<http://dls.physics.ucdavis.edu/calib/vegaab.html>`_
"""
fx, dfx = self.integrate(spec, p=p)
if fx <= 0 or N.isnan(fx): # Nothing we can do about negative flux!
warnings.warn(
"Integral of spectrum '%s' in filter '%s' is negative (%f)" %
(spec.name, self.name, fx),
SynthComputationWarning)
return N.nan, N.nan
elif N.isnan(fx): # Could not compute integral
return N.nan, N.nan
else:
m = -2.5 * log10(fx) + self.zeroPoint(p=p) - m0
dm = 1.0857362047581294 / fx * dfx if dfx > 0 else 0 # 2.5/log(10) = 1.0857...
return m, dm
[docs]def pixel_fracs(x, xmin, xmax):
"""
Return which fraction of each (Voronoi) pixel centered on (not necessarily
regularly sampled, but strictly increasing) `x` is contained within
wavelength range [`xmin`, `xmax`].
"""
assert xmin < xmax, "pixel_fracs: xmin >= xmax"
x = N.asarray(x) # Center of pixels
n = len(x)
# Pixel boundaries
hdx = N.diff(x) / 2 # Half px width
assert (hdx > 0).all(), "pixel_fracs: input array not strictly increasing"
xb = N.concatenate(([x[0] - hdx[0]], # 1st px left boundary
x[1:] - hdx, # Middle pixel left boundaries
[x[-1] + hdx[-1]])) # Last px right boundary
# Fractions
f = N.zeros(n)
f[(xmin <= x) & (x <= xmax)] = 1. # Rough approach 1st
# Compute precise fractions at domain boundaries
imin = xb.searchsorted(xmin) # xb[imin - 1] < xmin < xb[imin]
imax = xb.searchsorted(xmax) # xb[imax - 1] < xmax < xb[imax]
if 0 < imin <= n:
f[imin - 1] = (min(xb[imin], xmax) - xmin) / (xb[imin] - xb[imin - 1])
if 0 < imax <= n:
f[imax - 1] = (xmax - max(xb[imax - 1], xmin)) / \
(xb[imax] - xb[imax - 1])
return f
# Make pixel_fracs a method of the spectrum.Spectrum class
spectrum.Spectrum.fracs = lambda self, xmin, xmax: pixel_fracs(self.x, xmin, xmax)