# LSST Data Management System
# Copyright 2008-2016 AURA/LSST.
#
# This product includes software developed by the
# LSST Project (http://www.lsst.org/).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the LSST License Statement and
# the GNU General Public License along with this program. If not,
# see <https://www.lsstcorp.org/LegalNotices/>.
"""Miscellaneous functions to support lsst.validate.drp."""
from __future__ import print_function, division
import os
import math
import numpy as np
import scipy.stats
import yaml
import lsst.afw.geom as afwGeom
import lsst.afw.coord as afwCoord
import lsst.daf.persistence as dafPersist
import lsst.pipe.base as pipeBase
[docs]def averageRaDec(ra, dec):
"""Calculate average RA, Dec from input lists using spherical geometry.
Parameters
----------
ra : list of float
RA in [radians]
dec : list of float
Dec in [radians]
Returns
-------
float, float
meanRa, meanDec -- Tuple of average RA, Dec [radians]
"""
assert(len(ra) == len(dec))
angleRa = [afwGeom.Angle(r, afwGeom.radians) for r in ra]
angleDec = [afwGeom.Angle(d, afwGeom.radians) for d in dec]
coords = [afwCoord.IcrsCoord(ar, ad) for (ar, ad) in zip(angleRa, angleDec)]
meanRa, meanDec = afwCoord.averageCoord(coords)
return meanRa.asRadians(), meanDec.asRadians()
[docs]def averageRaDecFromCat(cat):
return averageRaDec(cat.get('coord_ra'), cat.get('coord_dec'))
[docs]def averageRaFromCat(cat):
meanRa, meanDec = averageRaDecFromCat(cat)
return meanRa
[docs]def averageDecFromCat(cat):
meanRa, meanDec = averageRaDecFromCat(cat)
return meanDec
[docs]def getCcdKeyName(dataid):
"""Return the key in a dataId that's referring to the CCD or moral equivalent.
Parameters
----------
dataid : dict
A dictionary that will be searched for a key that matches
an entry in the hardcoded list of possible names for the CCD field.
Notes
-----
Motiviation: Different camera mappings use different keys to indicate
the different amps/ccds in the same exposure. This function looks
through the reference dataId to locate a field that could be the one.
"""
possibleCcdFieldNames = ['ccd', 'ccdnum', 'camcol']
for name in possibleCcdFieldNames:
if name in dataid:
return name
else:
return 'ccd'
[docs]def repoNameToPrefix(repo):
"""Generate a base prefix for plots based on the repo name.
Examples
--------
>>> repoNameToPrefix('a/b/c')
'a_b_c_'
>>> repoNameToPrefix('/bar/foo/')
'bar_foo_'
>>> repoNameToPrefix('CFHT/output')
'CFHT_output_'
>>> repoNameToPrefix('./CFHT/output')
'CFHT_output_'
>>> repoNameToPrefix('.a/CFHT/output')
'a_CFHT_output_'
"""
return repo.lstrip('\.').strip(os.sep).replace(os.sep, "_") + "_"
[docs]def discoverDataIds(repo, **kwargs):
"""Retrieve a list of all dataIds in a repo.
Parameters
----------
repo : str
Path of a repository with 'src' entries.
Returns
-------
list
dataIds in the butler that exist.
Notes
-----
May consider making this an iterator if large N becomes important.
However, will likely need to know things like, "all unique filters"
of a data set anyway, so would need to go through chain at least once.
"""
butler = dafPersist.Butler(repo)
thisSubset = butler.subset(datasetType='src', **kwargs)
# This totally works, but would be better to do this as a TaskRunner?
dataIds = [dr.dataId for dr in thisSubset
if dr.datasetExists(datasetType='src') and dr.datasetExists(datasetType='calexp')]
# Make sure we have the filter information
for dId in dataIds:
response = butler.queryMetadata(datasetType='src', format=['filter'], dataId=dId)
filterForThisDataId = response[0]
dId['filter'] = filterForThisDataId
return dataIds
[docs]def loadParameters(configFile):
"""Load configuration parameters from a yaml file.
Parameters
----------
configFile : str
YAML file that stores visit, filter, ccd,
good_mag_limit, medianAstromscatterRef, medianPhotoscatterRef, matchRef
and other parameters
Returns
-------
pipeBase.Struct
with configuration parameters
"""
with open(configFile, mode='r') as stream:
data = yaml.load(stream)
return pipeBase.Struct(**data)
[docs]def loadDataIdsAndParameters(configFile):
"""Load data IDs, magnitude range, and expected metrics from a yaml file.
Parameters
----------
configFile : str
YAML file that stores visit, filter, ccd,
and additional configuration parameters such as
brightSnr, medianAstromscatterRef, medianPhotoscatterRef, matchRef
Returns
-------
pipeBase.Struct
with attributes of
dataIds - dict
and configuration parameters
"""
parameters = loadParameters(configFile).getDict()
ccdKeyName = getCcdKeyName(parameters)
try:
dataIds = constructDataIds(parameters['filter'], parameters['visits'],
parameters[ccdKeyName], ccdKeyName)
except KeyError:
# If the above parameters are not in the `parameters` dict,
# presumably because they were not in the configFile
# then we return no dataIds.
dataIds = []
return pipeBase.Struct(dataIds=dataIds, **parameters)
[docs]def constructDataIds(filters, visits, ccds, ccdKeyName='ccd'):
"""Returns a list of dataIds consisting of every combination of visit & ccd for each filter.
Parameters
----------
filters : str or list
If str, will be interpreted as one filter to be applied to all visits.
visits : list of int
ccds : list of int
ccdKeyName : str, optional
Name to distinguish different parts of a focal plane.
Generally 'ccd', but might be 'ccdnum', or 'amp', or 'ccdamp'.
Refer to your `obs_*/policy/*Mapper.paf`.
Returns
-------
list
dataIDs suitable to be used with the LSST Butler.
Examples
--------
>>> dataIds = constructDataIds('r', [100, 200], [10, 11, 12])
>>> print(dataIds)
[{'filter': 'r', 'visit': 100, 'ccd': 10}, {'filter': 'r', 'visit': 100, 'ccd': 11}, {'filter': 'r', 'visit': 100, 'ccd': 12}, {'filter': 'r', 'visit': 200, 'ccd': 10}, {'filter': 'r', 'visit': 200, 'ccd': 11}, {'filter': 'r', 'visit': 200, 'ccd': 12}]
"""
if isinstance(filters, str):
filters = [filters for _ in visits]
assert len(filters) == len(visits)
dataIds = [{'filter': f, 'visit': v, ccdKeyName: c}
for (f, v) in zip(filters, visits)
for c in ccds]
return dataIds
[docs]def loadRunList(configFile):
"""Load run list from a YAML file.
Parameters
----------
configFile : str
YAML file that stores visit, filter, ccd,
Returns
-------
list
run list lines.
Examples
--------
An example YAML file would include entries of (for some CFHT data)
visits: [849375, 850587]
filter: 'r'
ccd: [12, 13, 14, 21, 22, 23]
or (for some DECam data)
visits: [176837, 176846]
filter: 'z'
ccdnum: [10, 11, 12, 13, 14, 15, 16, 17, 18]
Note 'ccd' for CFHT and 'ccdnum' for DECam. These entries will be used to build
dataIds, so these fields should be as the camera mapping defines them.
`visits` and `ccd` (or `ccdnum`) must be lists, even if there's only one element.
"""
stream = open(configFile, mode='r')
data = yaml.load(stream)
ccdKeyName = getCcdKeyName(data)
runList = constructRunList(data['filter'], data['visits'],
data[ccdKeyName], ccdKeyName=ccdKeyName)
return runList
[docs]def constructRunList(filter, visits, ccds, ccdKeyName='ccd'):
"""Construct a comprehensive runList for processCcd.py.
Parameters
----------
filter : str or list
visits : list of int
ccds : list of int
Returns
-------
list
list of strings suitable to be used with the LSST Butler.
Examples
--------
>>> runList = constructRunList([100, 200], 'r', [10, 11, 12])
>>> print(runList)
['--id visit=100 ccd=10^11^12', '--id visit=200 ccd=10^11^12']
>>> runList = constructRunList([100, 200], 'r', [10, 11, 12], ccdKeyName='ccdnum')
>>> print(runList)
['--id visit=100 ccdnum=10^11^12', '--id visit=200 ccdnum=10^11^12']
Note
-----
The LSST parsing convention is to use '^' as list separators
for arguments to `--id`. While surprising, this convention
allows for CCD names to include ','. E.g., 'R1,2'.
Currently ignores `filter` because `visit` should be unique w.r.t filter.
"""
runList = ["--id visit=%d %s=%s" % (v, ccdKeyName, "^".join([str(c) for c in ccds]))
for v in visits]
return runList
[docs]def calcOrNone(func, x, ErrorClass, **kwargs):
"""Calculate the `func` and return result. If it raises ErrorClass, return None."""
try:
out = func(x, **kwargs)
except ErrorClass as e:
print(e)
out = None
return out
[docs]def getRandomDiffRmsInMas(array):
"""Calculate the RMS difference in mmag between a random pairs of magnitudes.
Input
-----
array : list or np.array
Magnitudes from which to select the pair [mag]
Returns
-------
float
RMS difference
Notes
-----
The LSST SRD recommends computing repeatability from a histogram of
magnitude differences for the same star measured on two visits
(using a median over the magDiffs to reject outliers).
Because we have N>=2 measurements for each star, we select a random
pair of visits for each star. We divide each difference by sqrt(2)
to obtain RMS about the (unknown) mean magnitude,
instead of obtaining just the RMS difference.
See Also
--------
getRandomDiff : Get the difference
Examples
--------
>>> mag = [24.2, 25.5]
>>> rms = getRandomDiffRmsInMas(mag)
>>> print(rms)
212.132034
"""
# For scalars, math.sqrt is several times faster than numpy.sqrt.
return (1000/math.sqrt(2)) * getRandomDiff(array)
[docs]def getRandomDiff(array):
"""Get the difference between two randomly selected elements of an array.
Input
-----
array : list or np.array
Returns
-------
float or int
Difference between two random elements of the array.
Notes
-----
* As implemented the returned value is the result of subtracting
two elements of the input array. In all of the imagined uses
that's going to be a scalar (float, maybe int).
In principle, however the code as implemented returns the result
of subtracting two elements of the array, which could be any
arbitrary object that is the result of the subtraction operator
applied to two elements of the array.
* This is not the most efficient way to extract a pair,
but it's the easiest to write.
* Shuffling works correctly for low N (even N=2), where a naive
random generation of entries would result in duplicates.
* In principle it might be more efficient to shuffle the indices,
then extract the difference. But this probably only would make a
difference for arrays whose elements were objects that were
substantially larger than a float. And that would only make
sense for objects that had a subtraction operation defined.
"""
copy = array.copy()
np.random.shuffle(copy)
return copy[0] - copy[1]
[docs]def computeWidths(array):
"""Compute the RMS and the scaled inter-quartile range of an array.
Input
-----
array : list or np.array
Returns
-------
float, float
RMS and scaled inter-quartile range (IQR).
Notes
-----
We estimate the width of the histogram in two ways:
using a simple RMS,
using the interquartile range (IQR)
The IQR is scaled by the IQR/RMS ratio for a Gaussian such that it
if the array is Gaussian distributed, then the scaled IQR = RMS.
"""
rmsSigma = math.sqrt(np.mean(array**2))
iqrSigma = np.subtract.reduce(np.percentile(array, [75, 25])) / (scipy.stats.norm.ppf(0.75)*2)
return rmsSigma, iqrSigma
[docs]def sphDist(ra1, dec1, ra2, dec2):
"""Calculate distance on the surface of a unit sphere.
Input and Output are in radians.
Notes
-----
Uses the Haversine formula to preserve accuracy at small angles.
Law of cosines approach doesn't work well for the typically very small
differences that we're looking at here.
"""
# Haversine
dra = ra1-ra2
ddec = dec1-dec2
a = np.square(np.sin(ddec/2)) + \
np.cos(dec1)*np.cos(dec2)*np.square(np.sin(dra/2))
dist = 2 * np.arcsin(np.sqrt(a))
# This is what the law of cosines would look like
# dist = np.arccos(np.sin(dec1)*np.sin(dec2) + np.cos(dec1)*np.cos(dec2)*np.cos(ra1 - ra2))
# Could use afwCoord.angularSeparation()
# but (a) that hasn't been made accessible through the Python interface
# and (b) I'm not sure that it would be faster than the numpy interface.
# dist = afwCoord.angularSeparation(ra1-ra2, dec1-dec2, np.cos(dec1), np.cos(dec2))
return dist
[docs]def matchVisitComputeDistance(visit_obj1, ra_obj1, dec_obj1,
visit_obj2, ra_obj2, dec_obj2):
"""Calculate obj1-obj2 distance for each visit in which both objects are seen.
For each visit shared between visit_obj1 and visit_obj2,
calculate the spherical distance between the obj1 and obj2.
Parameters
----------
visit_obj1 : scalar, list, or numpy.array of int or str
List of visits for object 1.
ra_obj1 : scalar, list, or numpy.array of float
List of RA in each visit for object 1.
dec_obj1 : scalar, list or numpy.array of float
List of Dec in each visit for object 1.
visit_obj2 : list or numpy.array of int or str
List of visits for object 2.
ra_obj2 : list or numpy.array of float
List of RA in each visit for object 2.
dec_obj2 : list or numpy.array of float
List of Dec in each visit for object 2.
Results
-------
list of float
spherical distances (in radians) for matching visits.
"""
distances = []
for i in range(len(visit_obj1)):
for j in range(len(visit_obj2)):
if (visit_obj1[i] == visit_obj2[j]):
if np.isfinite([ra_obj1[i], dec_obj1[i],
ra_obj2[j], dec_obj2[j]]).all():
distances.append(sphDist(ra_obj1[i], dec_obj1[i],
ra_obj2[j], dec_obj2[j]))
return distances
[docs]def arcminToRadians(arcmin):
return np.deg2rad(arcmin/60)
[docs]def radiansToMilliarcsec(rad):
return np.rad2deg(rad)*3600*1000
[docs]def calcRmsDistances(groupView, annulus, magRange=None, verbose=False):
"""Calculate the RMS distance of a set of matched objects over visits.
Parameters
----------
groupView : lsst.afw.table.GroupView
GroupView object of matched observations from MultiMatch.
annulus : 2-element list or tuple of float
Distance range in which to compare object. [arcmin]
E.g., `annulus=[19, 21]` would consider all objects
separated from each other between 19 and 21 arcminutes.
magRange : 2-element list or tuple of float, optional
Magnitude range from which to select objects.
Default of `None` will result in all objects being considered.
verbose : bool, optional
Output additional information on the analysis steps.
Returns
-------
list
rmsDistMas
"""
# Default is specified here separately because defaults that are mutable
# get overridden by previous calls of the function.
if magRange is None:
magRange = [17.0, 21.5]
# First we make a list of the keys that we want the fields for
importantKeys = [groupView.schema.find(name).key for
name in ['id', 'coord_ra', 'coord_dec',
'object', 'visit', 'base_PsfFlux_mag']]
# Includes magRange through closure
def magInRange(cat):
mag = cat.get('base_PsfFlux_mag')
w, = np.where(np.isfinite(mag))
medianMag = np.median(mag[w])
return magRange[0] <= medianMag and medianMag < magRange[1]
groupViewInMagRange = groupView.where(magInRange)
# List of lists of id, importantValue
matchKeyOutput = [obj.get(key)
for key in importantKeys
for obj in groupViewInMagRange.groups]
jump = len(groupViewInMagRange)
ra = matchKeyOutput[1*jump:2*jump]
dec = matchKeyOutput[2*jump:3*jump]
visit = matchKeyOutput[4*jump:5*jump]
# Calculate the mean position of each object from its constituent visits
# `aggregate` calulates a quantity for each object in the groupView.
meanRa = groupViewInMagRange.aggregate(averageRaFromCat)
meanDec = groupViewInMagRange.aggregate(averageDecFromCat)
annulusRadians = arcminToRadians(annulus)
rmsDistances = list()
for obj1, (ra1, dec1, visit1) in enumerate(zip(meanRa, meanDec, visit)):
dist = sphDist(ra1, dec1, meanRa[obj1+1:], meanDec[obj1+1:])
objectsInAnnulus, = np.where((annulusRadians[0] <= dist) &
(dist < annulusRadians[1]))
for obj2 in objectsInAnnulus:
distances = matchVisitComputeDistance(
visit[obj1], ra[obj1], dec[obj1],
visit[obj2], ra[obj2], dec[obj2])
if not distances:
if verbose:
print("No matching visits found for objs: %d and %d" %
(obj1, obj2))
continue
finiteEntries, = np.where(np.isfinite(distances))
if len(finiteEntries) > 0:
rmsDist = np.std(np.array(distances)[finiteEntries])
rmsDistances.append(rmsDist)
return rmsDistances, annulus, magRange