-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DetectorMap: refactor to use I/O from datamodel
By using the I/O in datamodel, we ensure that we're always writing the correct format, but it means a bit of a refactor. We no longer have to determine the subclass of data on disk (that's done in the datamodel I/O code); now we just have to convert to/from the datamodel representation.
- Loading branch information
Showing
3 changed files
with
135 additions
and
297 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,161 +1,80 @@ | ||
import io | ||
import numpy as np | ||
import astropy.io.fits | ||
|
||
import lsst.afw.fits | ||
import lsst.geom | ||
|
||
from lsst.utils import continueClass | ||
|
||
import pfs.datamodel | ||
from .GlobalDetectorMap import GlobalDetectorMap, GlobalDetectorModel, GlobalDetectorModelScaling, FiberMap | ||
from .DetectorMapContinued import DetectorMap | ||
|
||
from .utils import headerToMetadata | ||
|
||
__all__ = ["GlobalDetectorMap", "GlobalDetectorModel", "GlobalDetectorModelScaling", "FiberMap"] | ||
|
||
|
||
@continueClass # noqa: F811 (redefinition) | ||
class GlobalDetectorMap: | ||
@classmethod | ||
def canReadFits(cls, fits): | ||
"""Return whether this class can read the FITS file | ||
def fromDatamodel(cls, detMap): | ||
"""Construct from the pfs.datamodel representation | ||
Parameters | ||
---------- | ||
fits : `astropy.io.fits.HDUList` | ||
FITS file. | ||
detMap : `pfs.datamodel.GlobalDetectorMap` | ||
datamodel representation of GlobalDetectorMap. | ||
Returns | ||
------- | ||
canRead : `bool` | ||
Whether we can read the file. | ||
self : `pfs.drp.stella.GlobalDetectorMap` | ||
drp_stella representation of GlobalDetectorMap. | ||
""" | ||
keyword = "HIERARCH pfs_detectorMap_class" | ||
return keyword in fits[0].header and fits[0].header[keyword] == "GlobalDetectorMap" | ||
|
||
@classmethod | ||
def fromFits(cls, fits): | ||
"""Read GlobalDetectorMap from open FITS file | ||
Parameters | ||
---------- | ||
fits : `astropy.io.fits.HDUList` | ||
FITS file. | ||
Returns | ||
------- | ||
out : `pfs.drp.stella.GlobalDetectorMap` | ||
DetectorMap read from FITS file. | ||
""" | ||
header = fits["FIBERS"].header | ||
table = fits["FIBERS"].data | ||
|
||
bbox = lsst.geom.BoxI(lsst.geom.PointI(header['MINX'], header['MINY']), | ||
lsst.geom.PointI(header['MAXX'], header['MAXY'])) | ||
|
||
# array.astype() required to force byte swapping (e.g., dtype('>f4') --> np.float32) | ||
# otherwise pybind doesn't recognise them as the proper type. | ||
fiberId = table["fiberId"].astype(np.int32) | ||
distortionOrder = header["ORDER"] | ||
|
||
scaling = GlobalDetectorModelScaling( | ||
fiberPitch=header["scaling.fiberPitch"], | ||
dispersion=header["scaling.dispersion"], | ||
wavelengthCenter=header["scaling.wavelengthCenter"], | ||
minFiberId=header["scaling.minFiberId"], | ||
maxFiberId=header["scaling.maxFiberId"], | ||
height=header["scaling.height"], | ||
buffer=header["scaling.buffer"], | ||
) | ||
|
||
spatialOffsets = table["spatialOffsets"].astype(np.float32) | ||
spectralOffsets = table["spectralOffsets"].astype(np.float32) | ||
xCoeff = fits["COEFFICIENTS"].data["x"].astype(float) | ||
yCoeff = fits["COEFFICIENTS"].data["y"].astype(float) | ||
rightCcd = fits["RIGHTCCD"].data["coeff"].astype(float) | ||
|
||
model = GlobalDetectorModel(bbox, distortionOrder, fiberId, scaling, xCoeff, yCoeff, rightCcd, | ||
spatialOffsets, spectralOffsets) | ||
|
||
# Read the primary header with lsst.afw.fits | ||
# This requires writing the FITS file into memory and reading it from there | ||
buffer = io.BytesIO() | ||
fits.writeto(buffer) | ||
ss = buffer.getvalue() | ||
size = len(ss) | ||
ff = lsst.afw.fits.MemFileManager(size) | ||
ff.setData(ss, size) | ||
metadata = ff.readMetadata(0) | ||
|
||
if not isinstance(detMap, pfs.datamodel.GlobalDetectorMap): | ||
raise RuntimeError(f"Wrong type: {detMap}") | ||
bbox = detMap.box.toLsst() | ||
scalingKwargs = {name: getattr(detMap.scaling, name) for name in | ||
("fiberPitch", "dispersion", "wavelengthCenter", "minFiberId", "maxFiberId", | ||
"height", "buffer")} | ||
scaling = GlobalDetectorModelScaling(**scalingKwargs) | ||
model = GlobalDetectorModel(bbox, detMap.order, detMap.fiberId, scaling, | ||
detMap.xCoeff, detMap.yCoeff, detMap.rightCcdCoeff, | ||
detMap.spatialOffsets, detMap.spectralOffsets) | ||
metadata = headerToMetadata(detMap.metadata) | ||
visitInfo = lsst.afw.image.VisitInfo(metadata) | ||
lsst.afw.image.stripVisitInfoKeywords(metadata) | ||
|
||
return cls(bbox, model, visitInfo, metadata) | ||
|
||
def toFits(self): | ||
"""Write DetectorMap to FITS | ||
def toDatamodel(self, identity=None): | ||
"""Convert to the pfs.datamodel representation | ||
Parameters | ||
---------- | ||
identity : `pfs.datamodel.CalibIdentity`, optional | ||
Identification of the calibration. Providing this is only necessary | ||
if you intend to write via the datamodel representation's ``write`` | ||
method; other means of writing that provide a filename directly do | ||
not require providing an ``identity`` here. | ||
Returns | ||
------- | ||
hdus : `astropy.io.fits.HDUList` | ||
FITS file. | ||
detMap : `pfs.datamodel.GlobalDetectorMap` | ||
datamodel representation of GlobalDetectorMap. | ||
""" | ||
bbox = self.getBBox() | ||
fiberId = self.getFiberId() | ||
model = self.getModel() | ||
scaling = model.getScaling() | ||
scalingKwargs = {name: getattr(scaling, name) for name in | ||
("fiberPitch", "dispersion", "wavelengthCenter", "minFiberId", "maxFiberId", | ||
"height", "buffer")} | ||
|
||
fits = astropy.io.fits.HDUList() | ||
header = astropy.io.fits.Header() | ||
metadata = self.metadata.deepCopy() | ||
if self.visitInfo is not None: | ||
lsst.afw.image.setVisitInfoMetadata(metadata, self.visitInfo) | ||
for key in metadata.names(): | ||
header[key if len(key) <= 8 else "HIERARCH " + key] = metadata.get(key) | ||
|
||
header["HIERARCH pfs_detectorMap_class"] = "GlobalDetectorMap" | ||
header["OBSTYPE"] = 'detectorMap' | ||
date = self.getVisitInfo().getDate() | ||
header["HIERARCH calibDate"] = date.toPython(date.UTC).strftime("%Y-%m-%d") | ||
|
||
phu = astropy.io.fits.PrimaryHDU(header=header) | ||
fits.append(phu) | ||
|
||
header = astropy.io.fits.Header() | ||
header["MINX"] = bbox.getMinX() | ||
header["MINY"] = bbox.getMinY() | ||
header["MAXX"] = bbox.getMaxX() | ||
header["MAXY"] = bbox.getMaxY() | ||
header["ORDER"] = self.getDistortionOrder() | ||
header["INHERIT"] = True | ||
|
||
model = self.getModel() | ||
scaling = model.getScaling() | ||
header["HIERARCH scaling.fiberPitch"] = scaling.fiberPitch | ||
header["HIERARCH scaling.dispersion"] = scaling.dispersion | ||
header["HIERARCH scaling.wavelengthCenter"] = scaling.wavelengthCenter | ||
header["HIERARCH scaling.minFiberId"] = scaling.minFiberId | ||
header["HIERARCH scaling.maxFiberId"] = scaling.maxFiberId | ||
header["HIERARCH scaling.height"] = scaling.height | ||
header["HIERARCH scaling.buffer"] = scaling.buffer | ||
|
||
table = astropy.io.fits.BinTableHDU.from_columns([ | ||
astropy.io.fits.Column(name="fiberId", format="J", array=fiberId), | ||
astropy.io.fits.Column(name="spatialOffsets", format="E", array=self.getSpatialOffsets()), | ||
astropy.io.fits.Column(name="spectralOffsets", format="E", array=self.getSpectralOffsets()), | ||
], header=header, name="FIBERS") | ||
fits.append(table) | ||
|
||
table = astropy.io.fits.BinTableHDU.from_columns([ | ||
astropy.io.fits.Column(name="x", format="E", array=model.getXCoefficients()), | ||
astropy.io.fits.Column(name="y", format="E", array=model.getYCoefficients()), | ||
], header=header, name="COEFFICIENTS") | ||
fits.append(table) | ||
|
||
table = astropy.io.fits.BinTableHDU.from_columns([ | ||
astropy.io.fits.Column(name="coeff", format="E", array=model.getRightCcdCoefficients()), | ||
], header=header, name="RIGHTCCD") | ||
fits.append(table) | ||
|
||
return fits | ||
return pfs.datamodel.GlobalDetectorMap( | ||
identity, pfs.datamodel.Box.fromLsst(self.bbox), self.getDistortionOrder(), self.fiberId, | ||
pfs.datamodel.GlobalDetectorMapScaling(**scalingKwargs), | ||
model.getXCoefficients(), model.getYCoefficients(), model.getRightCcdCoefficients(), | ||
self.getSpatialOffsets(), self.getSpectralOffsets(), metadata.toDict() | ||
) | ||
|
||
|
||
DetectorMap.register(GlobalDetectorMap) |
Oops, something went wrong.