Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added the IO functions removed from PR #141 #150

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
229 changes: 228 additions & 1 deletion pyirf/io/gadf.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from astropy.table import QTable
import astropy.units as u
from astropy.io import fits
from astropy.io.fits import Header, BinTableHDU
import numpy as np
from astropy.time import Time
import pyirf.binning as binning
from pyirf.cuts import compare_irf_cuts

from ..version import __version__

Expand All @@ -13,6 +15,11 @@
"create_energy_dispersion_hdu",
"create_psf_table_hdu",
"create_rad_max_hdu",
"compare_irf_cuts_in_files",
"read_fits_bins_lo_hi",
"read_irf_grid",
"read_aeff2d_hdu",
"read_energy_dispersion_hdu",
]


Expand Down Expand Up @@ -183,7 +190,6 @@ def create_energy_dispersion_hdu(
edisp["THETA_LO"], edisp["THETA_HI"] = binning.split_bin_lo_hi(fov_offset_bins[np.newaxis, :].to(u.deg))
# transpose as FITS uses opposite dimension order
edisp["MATRIX"] = u.Quantity(energy_dispersion.T[np.newaxis, ...]).to(u.one)

# required header keywords
header = DEFAULT_HEADER.copy()
header["HDUCLAS1"] = "RESPONSE"
Expand Down Expand Up @@ -303,3 +309,224 @@ def create_rad_max_hdu(
_add_header_cards(header, **header_cards)

return BinTableHDU(rad_max_table, header=header, name=extname)


def compare_irf_cuts_in_files(files, extname='THETA_CUTS'):
"""
Reads in a list of IRF files and checks if the same cuts have been applied in all of them

Parameters
----------
files: list of strings
files to be read
extname: string
name of the extension with cut values to read the data from in fits file
Returns
-------
match: Boolean
if the cuts are the same in all the files
"""
cuts = read_irf_cuts(files, extname=extname)
return compare_irf_cuts(cuts)


def read_irf_cuts(files, extname='THETA_CUTS'):
"""
Reads in a list of IRF files, extracts cuts from them and returns them as a list

Parameters
----------
files: list of strings
files to be read
extname: string
name of the extension with cut values to read the data from in fits file
Returns
-------
cuts: list
list of cuts
"""

single_file = False
if isinstance(files, str):
files = [files]
single_file = True

cuts = list()
for file_name in files:
table = QTable.read(file_name, hdu=extname)
cuts.append(table)

# if the function is run on single file do not need the first axis dimension
if single_file:
cuts = cuts[0]

return cuts


def read_fits_bins_lo_hi(file_name, extname, tags):
"""
Reads from a fits file two arrays of tag_LO and tag_HI.
If more then one file is given it checks that all of them are consistent before returning the value

Parameters
----------
file_name: string or list of string
file to be read, if a list of
extname: string
name of the extension to read the data from in fits file
tags: list of string
names of the field in the extension to extract, _LO and _HI will be added

Returns
-------
bins: list of astropy.units.Quantity
list of ntuples (LO, HI) of bins (with size of extnames)
"""

# to allow the function work on either single file or a list of files convert a single string into a list
if isinstance(file_name, str):
file_name = [file_name]

# same to allow to run over single tag
if isinstance(tags, str):
tags = [tags]

tags_lo_hi = [(tag + '_LO', tag + '_HI') for tag in tags]

old_table = None
bins = list()
for this_file in file_name:
table = QTable.read(this_file, hdu=extname)
for tag_lo, tag_hi in tags_lo_hi:
if old_table is not None:
if not old_table[tag_lo][0].shape == table[tag_lo][0].shape:
raise ValueError('Non matching bins in ' + extname)
if not ((old_table[tag_lo][0] == table[tag_lo][0]).all() and (old_table[tag_hi][0] == table[tag_hi][0]).all()):
raise ValueError('Non matching bins in ' + extname)
else:
bins.append([table[tag_lo][0], table[tag_hi][0]])
old_table = table

return bins


def read_irf_grid(files, extname, field_name, hduclas):
"""
Reads in a grid of IRFs for a bunch of different parameters and stores them in lists

Parameters
----------
files: string or list of strings
files to be read
extname: string
name of the extension to read the data from in fits file
field_name: string
name of the field in the extension to extract
hduclas: list of strings
list with expected HDUCLAS1, HDUCLAS2, ... headers.

Returns
-------
irfs_all: np.array
array of IRFs, first axis specify the file number(if more then one is given),
second axis is the offset angle, rest of the axes are IRF-specific
theta_bins: astropy.units.Quantity[angle]
theta bins
"""

expected_headers = [[hduclasx] for hduclasx in hduclas]
if expected_headers[0] == [None]: expected_headers[0] = ["GADF"]
if expected_headers[1] == [None]: expected_headers[1] = ["RESPONSE"]
if expected_headers[3] == [None]: expected_headers[3] = ["POINT-LIKE", "FULL-ENCLOSURE"]

# to allow the function work on either single file or a list of files convert a single string into a list
single_file = False
if isinstance(files, str):
files = [files]
single_file = True

irfs_all = list()

for this_file in files:
hdul = fits.open(this_file)
header = hdul[extname].header
for i, expected in enumerate(expected_headers):
header_i = ('HDUCLAS' + str(i)).replace('0', 'S')
if header[header_i] not in expected:
raise ValueError('Expected ' + header_i + ' in ' + str(expected) + ' and got ' + header[header_i] + " in " + this_file)
# [0] because there the IRFs are written as a single row of the table
# we transpose because of a different axis sequence in fits file and in pyirf
irfs_all.append(QTable.read(this_file, hdu=extname)[field_name][0].T)

# convert the list to a multidimentional table
irfs_all = np.stack(irfs_all)

# if the function is run on single file do not need the first axis dimension
if single_file:
irfs_all = irfs_all[0]

return irfs_all


def read_aeff2d_hdu(file_name, extname="EFFECTIVE AREA"):
"""
Reads effective area from FITS file

Parameters
----------
file_name: string or list of strings
file(s) to be read
extname:
Name for BinTableHDU

Returns
-------
effective_area: astropy.units.Quantity[area]
Effective area array of shape (n_energy_bins, n_fov_offset_bins) or (n_files, n_energy_bins, n_fov_offset_bins)
true_energy_bins: astropy.units.Quantity[energy]
Bin edges in true energy
fov_offset_bins: astropy.units.Quantity[angle]
Bin edges in the field of view offset.
"""

field_name = "EFFAREA"
effective_area = read_irf_grid(file_name, extname, field_name, [None, None, "EFF_AREA", None, "AEFF_2D"])
true_energy_bins, fov_offset_bins = read_fits_bins_lo_hi(file_name, extname, ["ENERG", "THETA"])
true_energy_bins = binning.join_bin_lo_hi(*true_energy_bins)
fov_offset_bins = binning.join_bin_lo_hi(*fov_offset_bins)
return effective_area, true_energy_bins, fov_offset_bins


def read_energy_dispersion_hdu(file_name, extname="EDISP"):
"""
Reads energy dispersion table from a FITS file

Parameters
----------
file_name: string or list of strings
file(s) to be read
extname:
Name for BinTableHDU

Returns
-------
energy_dispersion: numpy.ndarray
Energy dispersion array, with shape
(n_energy_bins, n_migra_bins, n_source_offset_bins)
or (n_files, n_energy_bins, n_migra_bins, n_source_offset_bins)
true_energy_bins: astropy.units.Quantity[energy]
Bin edges in true energy
migration_bins: numpy.ndarray
Bin edges for the relative energy migration (``reco_energy / true_energy``)
fov_offset_bins: astropy.units.Quantity[angle]
Bin edges in the field of view offset.
"""

field_name = "MATRIX"
energy_dispersion = read_irf_grid(file_name, extname, field_name, [None, None, "EDISP", None, "EDISP_2D"])
true_energy_bins, migration_bins, fov_offset_bins = read_fits_bins_lo_hi(file_name, extname, ["ENERG", "MIGRA", "THETA"])
true_energy_bins = binning.join_bin_lo_hi(*true_energy_bins)
migration_bins = binning.join_bin_lo_hi(*migration_bins)
fov_offset_bins = binning.join_bin_lo_hi(*fov_offset_bins)

return energy_dispersion, true_energy_bins, migration_bins, fov_offset_bins
93 changes: 92 additions & 1 deletion pyirf/io/tests/test_gadf.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
'''
Test export to GADF format
Test export to / import from GADF format
'''
import astropy.units as u
import numpy as np
from astropy.io import fits
from astropy.table import QTable
from pyirf.io import gadf
import pytest
import tempfile

Expand Down Expand Up @@ -181,3 +183,92 @@ def test_rad_max_schema(rad_max_hdu):

_, hdu = rad_max_hdu
RAD_MAX.validate_hdu(hdu)


def test_read_cuts():
"""Test of reading cuts."""
from pyirf.io.gadf import read_irf_cuts

enbins = np.logspace(-2, 3) * u.TeV
thcuts = np.linspace(0.5, 0.1) * u.deg
names = ("Energy", "Theta2")
t1 = QTable([enbins, thcuts], names=names)
hdu = fits.BinTableHDU(t1, header=fits.Header(), name='THETA CUTS')
with tempfile.NamedTemporaryFile(suffix='.fits') as f:
fits.HDUList([fits.PrimaryHDU(), hdu]).writeto(f.name)
cuts = read_irf_cuts(f.name)
assert u.allclose(cuts["Energy"], t1["Energy"], rtol=1.e-15)

# now check on two files
cuts = read_irf_cuts([f.name, f.name])[0]
assert u.allclose(cuts["Theta2"], t1["Theta2"], rtol=1.e-15)


def test_compare_irf_cuts_files():
"""Test of comparing cuts."""
from pyirf.io.gadf import compare_irf_cuts_in_files

enbins = np.logspace(-2, 3) * u.TeV
thcuts1 = np.linspace(0.5, 0.1) * u.deg
thcuts2 = np.linspace(0.6, 0.2) * u.deg
names = ("Energy", "Theta2")
t1 = QTable([enbins, thcuts1], names=names)
hdu1 = fits.BinTableHDU(t1, header=fits.Header(), name='THETA CUTS')
t2 = QTable([enbins, thcuts2], names=names)
hdu2 = fits.BinTableHDU(t2, header=fits.Header(), name='THETA CUTS')
with tempfile.NamedTemporaryFile(suffix='.fits') as f1:
fits.HDUList([fits.PrimaryHDU(), hdu1]).writeto(f1.name)
with tempfile.NamedTemporaryFile(suffix='.fits') as f2:
fits.HDUList([fits.PrimaryHDU(), hdu2]).writeto(f2.name)
assert compare_irf_cuts_in_files([f1.name, f1.name])
assert compare_irf_cuts_in_files([f1.name, f2.name]) is False


def test_read_write_energy_dispersion(edisp_hdus):
"""Test consistency of reading and writing for migration matrix."""

edisp, hdus = edisp_hdus

for hdu in hdus:
with tempfile.NamedTemporaryFile(suffix='.fits') as f:
fits.HDUList([fits.PrimaryHDU(), hdu]).writeto(f.name)
edisp2d, e_bins, mig_bins, th_bins = gadf.read_energy_dispersion_hdu(f.name, extname="EDISP")
# check if the values of migration matrix are the same
assert u.allclose(edisp, edisp2d, atol=1e-16)
# check if the sequence of variables is fine
bins_shape = (e_bins.shape[0] - 1, mig_bins.shape[0] - 1, th_bins.shape[0] - 1)
assert bins_shape == edisp2d.shape

# now check with reading two files
edisp2d, e_bins, mig_bins, th_bins = gadf.read_energy_dispersion_hdu([f.name, f.name], extname="EDISP")
assert u.allclose(edisp, edisp2d[0], atol=1e-16)
bins_shape = (2, e_bins.shape[0] - 1, mig_bins.shape[0] - 1, th_bins.shape[0] - 1)
assert bins_shape == edisp2d.shape

# now try to read it as a wrong IRF type
with pytest.raises(ValueError):
gadf.read_aeff2d_hdu(f.name, extname="EDISP")


def test_read_write_effective_area2d(aeff2d_hdus):
"""Test consistency of reading and writing for effective area."""

area, hdus = aeff2d_hdus

for hdu in hdus:
with tempfile.NamedTemporaryFile(suffix='.fits') as f:
fits.HDUList([fits.PrimaryHDU(), hdu]).writeto(f.name)
aeff2d, e_bins, th_bins = gadf.read_aeff2d_hdu(f.name, extname="EFFECTIVE AREA")
assert u.allclose(area, aeff2d, atol=-1e-16 * u.m**2)
bins_shape = (e_bins.shape[0] - 1, th_bins.shape[0] - 1)
assert bins_shape == aeff2d.shape

# now check with reading two files
aeff2d, e_bins, th_bins = gadf.read_aeff2d_hdu([f.name, f.name], extname="EFFECTIVE AREA")
assert u.allclose(area, aeff2d[0], atol=-1e-16 * u.m**2)
bins_shape = (2, e_bins.shape[0] - 1, th_bins.shape[0] - 1)
assert bins_shape == aeff2d.shape

# now try to read it as a wrong IRF type
with pytest.raises(ValueError):
gadf.read_energy_dispersion_hdu(f.name, extname="EFFECTIVE AREA")