# LSST Data Management System
# Copyright 2016 AURA/LSST.
#
# This product includes software developed by the
# LSST Project (http://www.lsst.org/).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the LSST License Statement and
# the GNU General Public License along with this program. If not,
# see <https://www.lsstcorp.org/LegalNotices/>.
"""Blob classes that reduce a multi-visit dataset and encapsulate data
for measurement classes, plotting functions, and JSON persistence.
"""
from __future__ import print_function, absolute_import
from scipy.optimize import curve_fit
import numpy as np
import lsst.afw.geom as afwGeom
import lsst.afw.image as afwImage
import lsst.afw.image.utils as afwImageUtils
import lsst.daf.persistence as dafPersist
from lsst.afw.table import (SourceCatalog, SchemaMapper, Field,
MultiMatch, SimpleRecord, GroupView)
from lsst.afw.fits.fitsLib import FitsError
from .util import (getCcdKeyName, averageRaDecFromCat)
from .base import BlobBase
__all__ = ['MatchedMultiVisitDataset', 'AnalyticAstrometryModel',
'AnalyticPhotometryModel', 'isExtended', 'magNormDiff',
'fitExp', 'fitAstromErrModel', 'fitPhotErrModel',
'positionRms', 'astromErrModel', 'photErrModel']
[docs]class MatchedMultiVisitDataset(BlobBase):
"""Container for matched star catalogs from multple visits, with filtering,
summary statistics, and modelling.
`MatchedMultiVisitDataset` instances are serializable to JSON.
Parameters
----------
repo : string
The repository. This is generally the directory on disk
that contains the repository and mapper.
dataIds : list of dict
List of `butler` data IDs of Image catalogs to compare to reference.
The `calexp` cpixel image is needed for the photometric calibration.
matchRadius : afwGeom.Angle(), optional
Radius for matching. Default is 1 arcsecond.
safeSnr : float, optional
Minimum median SNR for a match to be considered "safe".
verbose : bool, optional
Output additional information on the analysis steps.
Attributes
----------
filterName : `str`
Name of filter used for all observations.
mag : ndarray
Mean PSF magnitudes of stars over multiple visits (magnitudes).
magerr : ndarray
Median 1-sigma uncertainty of PSF magnitudes over multiple visits
(magnitudes).
magrms
RMS of PSF magnitudes over multiple visits (magnitudes).
snr
Median signal-to-noise ratio of PSF magnitudes over multiple visits
(magnitudes).
dist
RMS of sky coordinates of stars over multiple visits (milliarcseconds).
goodMatches
all good matches, as an afw.table.GroupView;
good matches contain only objects whose detections all have
1. a PSF Flux measurement with S/N > 1
2. a finite (non-nan) PSF magnitude. This separate check is largely
to reject failed zeropoints.
3. and do not have flags set for bad, cosmic ray, edge or saturated
*Not serialized.*
safeMatches
safe matches, as an afw.table.GroupView. Safe matches
are good matches that are sufficiently bright and sufficiently
compact.
*Not serialized.*
magKey
Key for `"base_PsfFlux_mag"` in the `goodMatches` and `safeMatches`
catalog tables.
*Not serialized.*
"""
def __init__(self, repo, dataIds, matchRadius=None, safeSnr=50.,
verbose=False):
BlobBase.__init__(self)
self.verbose = verbose
if not matchRadius:
matchRadius = afwGeom.Angle(1, afwGeom.arcseconds)
# Extract single filter
self.registerDatum(
'filterName', value=set([dId['filter'] for dId in dataIds]).pop(),
units='', description='Filter name')
# Register datums stored by this blob; will be set later
self.registerDatum(
'mag',
units='mag',
label='{band}'.format(band=self.filterName),
description='Mean PSF magnitudes of stars over multiple visits')
self.registerDatum(
'magrms',
units='mag',
label='RMS({band})'.format(band=self.filterName),
description='RMS of PSF magnitudes over multiple visits')
self.registerDatum(
'magerr',
units='mag',
label='sigma({band})'.format(band=self.filterName),
description='Median 1-sigma uncertainty of PSF magnitudes over '
'multiple visits')
self.registerDatum(
'snr',
units='',
label='SNR({band})'.format(band=self.filterName),
description='Median signal-to-noise ratio of PSF magnitudes over '
'multiple visits')
self.registerDatum(
'dist',
units='milliarcsecond',
label='d',
description='RMS of sky coordinates of stars over multiple visits')
# Match catalogs across visits
self._matchedCatalog = self._loadAndMatchCatalogs(
repo, dataIds, matchRadius)
self.magKey = self._matchedCatalog.schema.find("base_PsfFlux_mag").key
# Reduce catalogs into summary statistics.
# These are the serialiable attributes of this class.
self._reduceStars(self._matchedCatalog, safeSnr)
def _loadAndMatchCatalogs(self, repo, dataIds, matchRadius):
"""Load data from specific visit. Match with reference.
Parameters
----------
repo : string
The repository. This is generally the directory on disk
that contains the repository and mapper.
dataIds : list of dict
List of `butler` data IDs of Image catalogs to compare to
reference. The `calexp` cpixel image is needed for the photometric
calibration.
matchRadius : afwGeom.Angle(), optional
Radius for matching. Default is 1 arcsecond.
Returns
-------
afw.table.GroupView
An object of matched catalog.
"""
# Following
# https://github.com/lsst/afw/blob/tickets/DM-3896/examples/repeatability.ipynb
butler = dafPersist.Butler(repo)
dataset = 'src'
# 2016-02-08 MWV:
# I feel like I could be doing something more efficient with
# something along the lines of the following:
# dataRefs = [dafPersist.ButlerDataRef(butler, vId) for vId in dataIds]
ccdKeyName = getCcdKeyName(dataIds[0])
schema = butler.get(dataset + "_schema", immediate=True).schema
mapper = SchemaMapper(schema)
mapper.addMinimalSchema(schema)
mapper.addOutputField(Field[float]('base_PsfFlux_snr',
'PSF flux SNR'))
mapper.addOutputField(Field[float]('base_PsfFlux_mag',
'PSF magnitude'))
mapper.addOutputField(Field[float]('base_PsfFlux_magerr',
'PSF magnitude uncertainty'))
newSchema = mapper.getOutputSchema()
# Create an object that matches multiple catalogs with same schema
mmatch = MultiMatch(newSchema,
dataIdFormat={'visit': int, ccdKeyName: int},
radius=matchRadius,
RecordClass=SimpleRecord)
# create the new extented source catalog
srcVis = SourceCatalog(newSchema)
for vId in dataIds:
try:
calexpMetadata = butler.get("calexp_md", vId, immediate=True)
except FitsError as fe:
print(fe)
print("Could not open calibrated image file for ", vId)
print("Skipping %s " % repr(vId))
continue
except TypeError as te:
# DECam images that haven't been properly reformatted
# can trigger a TypeError because of a residual FITS header
# LTV2 which is a float instead of the expected integer.
# This generates an error of the form:
#
# lsst::pex::exceptions::TypeError: 'LTV2 has mismatched type'
#
# See, e.g., DM-2957 for details.
print(te)
print("Calibration image header information malformed.")
print("Skipping %s " % repr(vId))
continue
calib = afwImage.Calib(calexpMetadata)
oldSrc = butler.get('src', vId, immediate=True)
print(len(oldSrc), "sources in ccd %s visit %s" %
(vId[ccdKeyName], vId["visit"]))
# create temporary catalog
tmpCat = SourceCatalog(SourceCatalog(newSchema).table)
tmpCat.extend(oldSrc, mapper=mapper)
tmpCat['base_PsfFlux_snr'][:] = tmpCat['base_PsfFlux_flux'] \
/ tmpCat['base_PsfFlux_fluxSigma']
with afwImageUtils.CalibNoThrow():
_ = calib.getMagnitude(tmpCat['base_PsfFlux_flux'],
tmpCat['base_PsfFlux_fluxSigma'])
tmpCat['base_PsfFlux_mag'][:] = _[0]
tmpCat['base_PsfFlux_magerr'][:] = _[1]
srcVis.extend(tmpCat, False)
mmatch.add(catalog=tmpCat, dataId=vId)
# Complete the match, returning a catalog that includes
# all matched sources with object IDs that can be used to group them.
matchCat = mmatch.finish()
# Create a mapping object that allows the matches to be manipulated
# as a mapping of object ID to catalog of sources.
allMatches = GroupView.build(matchCat)
return allMatches
def _reduceStars(self, allMatches, safeSnr=50.0):
"""Calculate summary statistics for each star. These are persisted
as object attributes.
Parameters
----------
allMatches : afw.table.GroupView
GroupView object with matches.
safeSnr : float, optional
Minimum median SNR for a match to be considered "safe".
"""
# Filter down to matches with at least 2 sources and good flags
flagKeys = [allMatches.schema.find("base_PixelFlags_flag_%s" % flag).key
for flag in ("saturated", "cr", "bad", "edge")]
nMatchesRequired = 2
psfSnrKey = allMatches.schema.find("base_PsfFlux_snr").key
psfMagKey = allMatches.schema.find("base_PsfFlux_mag").key
psfMagErrKey = allMatches.schema.find("base_PsfFlux_magerr").key
extendedKey = allMatches.schema.find("base_ClassificationExtendedness_value").key
def goodFilter(cat, goodSnr=3):
if len(cat) < nMatchesRequired:
return False
for flagKey in flagKeys:
if cat.get(flagKey).any():
return False
if not np.isfinite(cat.get(psfMagKey)).all():
return False
psfSnr = np.median(cat.get(psfSnrKey))
# Note that this also implicitly checks for psfSnr being non-nan.
return psfSnr >= goodSnr
goodMatches = allMatches.where(goodFilter)
# Filter further to a limited range in S/N and extendedness
# to select bright stars.
safeMaxExtended = 1.0
def safeFilter(cat):
psfSnr = np.median(cat.get(psfSnrKey))
extended = np.max(cat.get(extendedKey))
return psfSnr >= safeSnr and extended < safeMaxExtended
safeMatches = goodMatches.where(safeFilter)
# Pass field=psfMagKey so np.mean just gets that as its input
self.snr = goodMatches.aggregate(np.median, field=psfSnrKey) # SNR
self.mag = goodMatches.aggregate(np.mean, field=psfMagKey) # mag
self.magrms = goodMatches.aggregate(np.std, field=psfMagKey) # mag
self.magerr = goodMatches.aggregate(np.median, field=psfMagErrKey)
# positionRms knows how to query a group so we give it the whole thing
# by going with the default `field=None`.
self.dist = goodMatches.aggregate(positionRms)
# These attributes are not serialized
self.goodMatches = goodMatches
self.safeMatches = safeMatches
[docs]class AnalyticPhotometryModel(BlobBase):
"""Serializable analytic photometry error model for multi-visit catalogs.
This model is originally presented in http://arxiv.org/abs/0805.2366v4
(Eq 4, 5):
.. math::
\sigma_1^2 &= \sigma_\mathrm{sys}^2 + \sigma_\mathrm{rand}^2 \\
x &= 10^{0.4(m-m_5)} \\
\sigma_\mathrm{rand}^2 &= (0.04 - \gamma) x + \gamma x^2~[\mathrm{mag}^2]
Parameters
----------
matchedMultiVisitDataset : `MatchedMultiVisitDataset`
A dataset containing matched statistics for stars across multiple
visits.
brightSnr : float, optional
Minimum SNR for a star to be considered "bright".
medianRef : float, optional
Median reference astrometric scatter in millimagnitudes
matchRef : int, optional
Should match at least matchRef stars.
Attributes
----------
brightSnr : float
Threshold in SNR for bright sources used in this model.
sigmaSys : float
Systematic error floor.
gamma : float
Proxy for sky brightness and read noise.
m5 : float
5-sigma photometric depth (magnitudes).
photRms : float
RMS photometric scatter for 'good' stars (millimagnitudes).
Notes
-----
The scatter and match defaults are appropriate to SDSS are stored here.
For SDSS, stars with mag < 19.5 should be completely well measured.
This limit is a band-dependent statement most appropriate to r.
"""
def __init__(self, matchedMultiVisitDataset, brightSnr=100, medianRef=100,
matchRef=500):
BlobBase.__init__(self)
self.registerDatum(
'brightSnr',
units='',
label='Bright SNR',
description='Threshold in SNR for bright sources used in this '
'model')
self.registerDatum(
'sigmaSys',
units='mag',
label='sigma(sys)',
description='Systematic error floor')
self.registerDatum(
'gamma',
units='',
label='gamma',
description='Proxy for sky brightness and read noise')
self.registerDatum(
'm5',
units='mag',
label='m5',
description='5-sigma depth')
self.registerDatum(
'photScatter',
units='mmag',
label='RMS',
description='RMS photometric scatter for good stars')
# FIXME add a description field to blobs?
# self._doc['doc'] \
# = "Photometric uncertainty model from " \
# "http://arxiv.org/abs/0805.2366v4 (Eq 4, 5): " \
# "sigma_1^2 = sigma_sys^2 + sigma_rand^2, " \
# "sigma_rand^2 = (0.04 - gamma) * x + gamma * x^2 [mag^2] " \
# "where x = 10**(0.4*(m-m_5))"
self._compute(
matchedMultiVisitDataset.snr,
matchedMultiVisitDataset.mag,
matchedMultiVisitDataset.magerr * 1000.,
matchedMultiVisitDataset.magrms * 1000.,
matchedMultiVisitDataset.dist,
len(matchedMultiVisitDataset.goodMatches),
brightSnr=brightSnr,
medianRef=medianRef,
matchRef=matchRef)
def _compute(self, snr, mag, mmagErr, mmagrms, dist, nMatch,
brightSnr=100,
medianRef=100, matchRef=500):
self.brightSnr = brightSnr
bright = np.where(np.asarray(snr) > self.brightSnr)
self.photScatter = np.median(np.asarray(mmagrms)[bright])
print("Photometric scatter (median) - SNR > %.1f : %.1f %s" %
(self.brightSnr, self.photScatter, "mmag"))
fit_params = fitPhotErrModel(mag[bright], mmagErr[bright])
self.sigmaSys = fit_params['sigmaSys']
self.gamma = fit_params['gamma']
self.m5 = fit_params['m5']
if self.photScatter > medianRef:
print('Median photometric scatter %.3f %s is larger than '
'reference : %.3f %s '
% (self.photScatter, "mmag", medianRef, "mag"))
if nMatch < matchRef:
print("Number of matched sources %d is too small (shoud be > %d)"
% (nMatch, matchRef))
[docs]class AnalyticAstrometryModel(BlobBase):
"""Serializable model of astronometry errors across multiple visits.
.. math::
\mathrm{astromRms} = C \theta / \mathrm{SNR} + \sigma_\mathrm{sys}
Parameters
----------
matchedMultiVisitDataset : `MatchedMultiVisitDataset`
A dataset containing matched statistics for stars across multiple
visits.
brightSnr : float, optional
Minimum SNR for a star to be considered "bright".
medianRef : float, optional
Median reference astrometric scatter in millimagnitudes
matchRef : int, optional
Should match at least matchRef stars.
Attributes
----------
brightSnr : float
Threshold SNR for bright sources used in this model.
C : float
Model scaling factor.
theta : float
Seeing (milliarcsecond).
sigmaSys : float
Systematic error floor (milliarcsecond).
astromRms : float
Astrometric scatter (RMS) for good stars (milliarcsecond).
Notes
-----
The scatter and match defaults are appropriate to SDSS are the defaults
for `medianRef` and `matchRef`
For SDSS, stars with mag < 19.5 should be completely well measured.
"""
def __init__(self, matchedMultiVisitDataset, brightSnr=100,
medianRef=100, matchRef=500):
BlobBase.__init__(self)
# FIXME add description field to blobs
# self._doc['doc'] \
# = "Astrometric astrometry model: mas = C*theta/SNR + sigmaSys"
self._compute(
matchedMultiVisitDataset.snr,
matchedMultiVisitDataset.dist,
len(matchedMultiVisitDataset.goodMatches),
brightSnr=brightSnr, medianRef=medianRef, matchRef=matchRef)
def _compute(self, snr, dist, nMatch,
brightSnr=100,
medianRef=100, matchRef=500):
print('Median value of the astrometric scatter - all magnitudes: '
'%.3f %s' % (np.median(dist), "mas"))
bright = np.where(np.asarray(snr) > brightSnr)
astromScatter = np.median(np.asarray(dist)[bright])
print("Astrometric scatter (median) - snr > %.1f : %.1f %s" %
(brightSnr, astromScatter, "mas"))
fit_params = fitAstromErrModel(snr[bright], dist[bright])
if astromScatter > medianRef:
print('Median astrometric scatter %.1f %s is larger than '
'reference : %.1f %s ' %
(astromScatter, "mas", medianRef, "mas"))
if nMatch < matchRef:
print("Number of matched sources %d is too small (shoud be > %d)" % (nMatch, matchRef))
self.registerDatum(
'brightSnr',
value=brightSnr,
units='',
label='Bright SNR',
description='Threshold in SNR for bright sources used in this '
'model')
self.registerDatum(
'C',
value=fit_params['C'],
units='',
description='Scaling factor')
self.registerDatum(
'theta',
value=fit_params['theta'],
units='milliarcsecond',
label='theta',
description='Seeing')
self.registerDatum(
'sigmaSys',
value=fit_params['sigmaSys'],
units='milliarcsecond',
label='sigma(sys)',
description='Systematic error floor')
self.registerDatum(
'astromRms',
value=astromScatter,
units='milliarcsecond',
label='RMS',
description='Astrometric scatter (RMS) for good stars')
[docs]def isExtended(source, extendedKey, extendedThreshold=1.0):
"""Is the source extended attribute above the threshold.
Parameters
----------
cat : collection with a .get method
for `extendedKey`
extendedKey
key to look up the extended object parameter from a schema.
Higher values of extendedness indicate a resolved object
that is larger than a point source.
"""
return source.get(extendedKey) >= extendedThreshold
[docs]def magNormDiff(cat):
"""Calculate the normalized mag/mag_err difference from the mean for a
set of observations of an objection.
Parameters
----------
cat : collection with a .get method
for flux, flux+"-"
Returns
-------
pos_median : float
median diff of positions in milliarcsec.
"""
mag = cat.get('base_PsfFlux_mag')
magerr = cat.get('base_PsfFlux_magerr')
mag_avg = np.mean(mag)
normDiff = (mag - mag_avg) / magerr
return normDiff
def expModel(x, a, b, norm):
return a * np.exp(x/norm) + b
def magerrModel(x, a, b):
return expModel(x, a, b, norm=5)
[docs]def fitExp(x, y, y_err, deg=2):
"""Fit an exponential quadratic to x, y, y_err.
"""
fit_params, fit_param_covariance = \
curve_fit(expModel, x, y, p0=[1, 0.02, 5], sigma=y_err)
return fit_params
[docs]def fitAstromErrModel(snr, dist):
"""Fit model of astrometric error from LSST Overview paper
Parameters
----------
snr : list or numpy.array
Signal-to-noise ratio of photometric observations
dist : list or numpy.array
Scatter in measured positions [mas]
Returns
-------
dict
The fit results for C, theta, sigmaSys along with their Units.
"""
fit_params, fit_param_covariance = \
curve_fit(astromErrModel, snr, dist, p0=[1, 0.01])
params = {'C': 1, 'theta': fit_params[0], 'sigmaSys': fit_params[1],
'cUnits': '', 'thetaUnits': 'mas', 'sigmaSysUnits': 'mas'}
return params
[docs]def fitPhotErrModel(mag, mmag_err):
"""Fit model of photometric error from LSST Overview paper
Parameters
----------
mag : list or numpy.array
Magnitude
mmag_err : list or numpy.array
Magnitude uncertainty or variation in *mmag*.
Returns
-------
dict
The fit results for sigmaSys, gamma, and m5 along with their Units.
"""
mag_err = mmag_err / 1000
fit_params, fit_param_covariance = \
curve_fit(photErrModel, mag, mag_err, p0=[0.01, 0.039, 24.35])
params = {'sigmaSys': fit_params[0], 'gamma': fit_params[1], 'm5': fit_params[2],
'sigmaSysUnits': 'mmag', 'gammaUnits': '', 'm5Units': 'mag'}
return params
[docs]def positionRms(cat):
"""Calculate the RMS for RA, Dec for a set of observations an object.
Parameters
----------
cat -- collection with a .get method
for 'coord_ra', 'coord_dec' that returns radians.
Returns
-------
pos_rms -- RMS of positions in milliarcsecond. Float.
This routine doesn't handle wrap-around
"""
ra_avg, dec_avg = averageRaDecFromCat(cat)
ra, dec = cat.get('coord_ra'), cat.get('coord_dec')
# Approximating that the cos(dec) term is roughly the same
# for all observations of this object.
ra_var = np.var(ra) * np.cos(dec_avg)**2
dec_var = np.var(dec)
pos_rms = np.sqrt(ra_var + dec_var) # radians
pos_rms = afwGeom.radToMas(pos_rms) # milliarcsec
return pos_rms
[docs]def astromErrModel(snr, theta=1000, sigmaSys=10, C=1, **kwargs):
"""Calculate expected astrometric uncertainty based on SNR.
mas = C*theta/SNR + sigmaSys
Parameters
----------
snr : list or numpy.array
S/N of photometric measurements
theta : float or numpy.array, optional
Seeing
sigmaSys : float
Systematic error floor
C : float
Scaling factor
theta and sigmaSys must be given in the same units.
Typically choices might be any of arcsec, milli-arcsec, or radians
The default values are reasonable astronominal values in milliarcsec.
But the only thing that matters is that they're the same.
Returns
-------
np.array
Expected astrometric uncertainty.
Units will be those of theta + sigmaSys.
"""
return C*theta/snr + sigmaSys
[docs]def photErrModel(mag, sigmaSys, gamma, m5, **kwargs):
"""Fit model of photometric error from LSST Overview paper
http://arxiv.org/abs/0805.2366v4
Photometric errors described by
Eq. 4
sigma_1^2 = sigma_sys^2 + sigma_rand^2
Eq. 5
sigma_rand^2 = (0.04 - gamma) * x + gamma * x^2 [mag^2]
where x = 10**(0.4*(m-m_5))
Parameters
----------
mag : list or numpy.array
Magnitude
sigmaSq : float
Limiting systematics floor [mag]
gamma : float
proxy for sky brightness and readout noise
m5 : float
5-sigma depth [mag]
Returns
-------
numpy.array
Result of noise estimation function
"""
x = 10**(0.4*(mag - m5))
sigmaRandSq = (0.04 - gamma) * x + gamma * x**2
sigmaSq = sigmaSys**2 + sigmaRandSq
return np.sqrt(sigmaSq)