Skip to content
This repository has been archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
Merge pull request #190 from lincc-frameworks/flux_to_mag
Browse files Browse the repository at this point in the history
Flux to Mag Converter Function
  • Loading branch information
dougbrn authored Aug 9, 2023
2 parents d166744 + 7ded283 commit 0e8f0d2
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 3 deletions.
62 changes: 59 additions & 3 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ def _load_column_mapper(self, column_mapper, **kwargs):
if column_mapper.map["provenance_col"] is not None:
self._provenance_col = column_mapper.map["provenance_col"]
if column_mapper.map["nobs_total_col"] is not None:
self._nobs_total_col = column_mapper.map["nobs_total_col"]
self._nobs_tot_col = column_mapper.map["nobs_total_col"]
if column_mapper.map["nobs_band_cols"] is not None:
self._nobs_band_cols = column_mapper.map["nobs_band_cols"]

Expand Down Expand Up @@ -955,8 +955,8 @@ def from_parquet(
columns = [self._time_col, self._flux_col, self._err_col, self._band_col]
if self._provenance_col is not None:
columns.append(self._provenance_col)
if self._nobs_total_col is not None:
columns.append(self._nobs_total_col)
if self._nobs_tot_col is not None:
columns.append(self._nobs_tot_col)
if self._nobs_band_cols is not None:
for col in self._nobs_band_cols:
columns.append(col)
Expand Down Expand Up @@ -1094,6 +1094,62 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, **kwa
self._object_dirty = False
return self

def convert_flux_to_mag(self, flux_col, zero_point, err_col=None, zp_form="mag", out_col_name=None):
"""Converts a flux column into a magnitude column.
Parameters
----------
flux_col: 'str'
The name of the ensemble flux column to convert into magnitudes.
zero_point: 'str'
The name of the ensemble column containing the zero point
information for column transformation.
err_col: 'str', optional
The name of the ensemble column containing the errors to propagate.
Errors are propagated using the following approximation:
Err= (2.5/log(10))*(flux_error/flux), which holds mainly when the
error in flux is much smaller than the flux.
zp_form: `str`, optional
The form of the zero point column, either "flux" or
"magnitude"/"mag". Determines how the zero point (zp) is applied in
the conversion. If "flux", then the function is applied as
mag=-2.5*log10(flux/zp), or if "magnitude", then
mag=-2.5*log10(flux)+zp.
out_col_name: 'str', optional
The name of the output magnitude column, if None then the output
is just the flux column name + "_mag". The error column is also
generated as the out_col_name + "_err".
Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object with a new magnitude (and error) column.
"""
if out_col_name is None:
out_col_name = flux_col + "_mag"

if zp_form == "flux": # mag = -2.5*np.log10(flux/zp)
self._source = self._source.assign(
**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col] / x[zero_point])}
)

elif zp_form == "magnitude" or zp_form == "mag": # mag = -2.5*np.log10(flux) + zp
self._source = self._source.assign(
**{out_col_name: lambda x: -2.5 * np.log10(x[flux_col]) + x[zero_point]}
)

else:
raise ValueError(f"{zp_form} is not a valid zero_point format.")

# Calculate Errors
if err_col is not None:
self._source = self._source.assign(
**{out_col_name + "_err": lambda x: (2.5 / np.log(10)) * (x[err_col] / x[flux_col])}
)

return self

def _generate_object_table(self):
"""Generate the object table from the source table."""
counts = self._source.groupby([self._id_col, self._band_col])[self._time_col].aggregate("count")
Expand Down
54 changes: 54 additions & 0 deletions tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,60 @@ def test_coalesce(dask_client, drop_inputs):
assert col in ens._source.columns


@pytest.mark.parametrize("zp_form", ["flux", "mag", "magnitude", "lincc"])
@pytest.mark.parametrize("err_col", [None, "error"])
@pytest.mark.parametrize("out_col_name", [None, "mag"])
def test_convert_flux_to_mag(dask_client, zp_form, err_col, out_col_name):
ens = Ensemble(client=dask_client)

source_dict = {
"id": [0, 0, 0, 0, 0],
"time": [1, 2, 3, 4, 5],
"flux": [30.5, 70, 80.6, 30.2, 60.3],
"zp_mag": [25.0, 25.0, 25.0, 25.0, 25.0],
"zp_flux": [10**10, 10**10, 10**10, 10**10, 10**10],
"error": [10, 10, 10, 10, 10],
"band": ["g", "g", "g", "g", "g"],
}

if out_col_name is None:
output_column = "flux_mag"
else:
output_column = out_col_name

# map flux_col to one of the flux columns at the start
col_map = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="error", band_col="band")
ens.from_source_dict(source_dict, column_mapper=col_map)

if zp_form == "flux":
ens.convert_flux_to_mag("flux", "zp_flux", err_col, zp_form, out_col_name)

res_mag = ens._source.compute()[output_column].to_list()[0]
assert pytest.approx(res_mag, 0.001) == 21.28925

if err_col is not None:
res_err = ens._source.compute()[output_column + "_err"].to_list()[0]
assert pytest.approx(res_err, 0.001) == 0.355979
else:
assert output_column + "_err" not in ens._source.columns

elif zp_form == "mag" or zp_form == "magnitude":
ens.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, out_col_name)

res_mag = ens._source.compute()[output_column].to_list()[0]
assert pytest.approx(res_mag, 0.001) == 21.28925

if err_col is not None:
res_err = ens._source.compute()[output_column + "_err"].to_list()[0]
assert pytest.approx(res_err, 0.001) == 0.355979
else:
assert output_column + "_err" not in ens._source.columns

else:
with pytest.raises(ValueError):
ens.convert_flux_to_mag("flux", "zp_mag", err_col, zp_form, "mag")


def test_find_day_gap_offset(dask_client):
ens = Ensemble(client=dask_client)

Expand Down

0 comments on commit 0e8f0d2

Please sign in to comment.