Skip to content

Commit

Permalink
Add Spectrum and CountSpectrum objects
Browse files Browse the repository at this point in the history
  • Loading branch information
samaloney committed Apr 15, 2024
1 parent 0a47673 commit 87cf14f
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 14 deletions.
10 changes: 6 additions & 4 deletions docs/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ Reference
Software and API.

.. automodapi:: sunkit_spex
.. automodapi:: sunkit_spex.thermal
.. automodapi:: sunkit_spex.integrate
.. automodapi:: sunkit_spex.io
.. automodapi:: sunkit_spex.emission
.. automodapi:: sunkit_spex.constants
.. automodapi:: sunkit_spex.emission
.. automodapi:: sunkit_spex.fitting_legacy
.. automodapi:: sunkit_spex.fitting_legacy.io
.. automodapi:: sunkit_spex.integrate
.. automodapi:: sunkit_spex.io
.. automodapi:: sunkit_spex.photon_power_law
.. automodapi:: sunkit_spex.spectrum
.. automodapi:: sunkit_spex.spectrum.spectrum
.. automodapi:: sunkit_spex.thermal
21 changes: 11 additions & 10 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,27 @@ packages = find:
python_requires = >=3.9
setup_requires = setuptools_scm
install_requires =
sunpy
parfive
scipy
xarray
quadpy
orthopy
ndim
matplotlib
emcee
corner
emcee
matplotlib
ndcube
ndim
nestle
numdifftools

orthopy
parfive
quadpy
scipy
sunpy
xarray

[options.extras_require]
test =
pytest
pytest-astropy
pytest-cov
pytest-xdist

docs =
sphinx
sphinx-automodapi
Expand Down
Empty file.
264 changes: 264 additions & 0 deletions sunkit_spex/spectrum/spectrum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
import numpy as np
from gwcs import WCS as GWCS
from gwcs import coordinate_frames as cf
from ndcube import NDCube

import astropy.units as u
from astropy.coordinates import SpectralCoord
from astropy.modeling.tabular import Tabular1D
from astropy.utils import lazyproperty

__all__ = ['gwcs_from_array', 'SpectralAxis', 'Spectrum', 'CountSpectrum']


def gwcs_from_array(array):
"""
Create a new WCS from provided tabular data. This defaults to being
a GWCS object.
"""
orig_array = u.Quantity(array)

coord_frame = cf.CoordinateFrame(naxes=1,
axes_type=('SPECTRAL',),
axes_order=(0,))
spec_frame = cf.SpectralFrame(unit=array.unit, axes_order=(0,))

# In order for the world_to_pixel transformation to automatically convert
# input units, the equivalencies in the lookup table have to be extended
# with spectral unit information.
SpectralTabular1D = type("SpectralTabular1D", (Tabular1D,),
{'input_units_equivalencies': {'x0': u.spectral()}})

forward_transform = SpectralTabular1D(np.arange(len(array)),
lookup_table=array)
# If our spectral axis is in descending order, we have to flip the lookup
# table to be ascending in order for world_to_pixel to work.
if len(array) == 0 or array[-1] > array[0]:
forward_transform.inverse = SpectralTabular1D(
array, lookup_table=np.arange(len(array)))
else:
forward_transform.inverse = SpectralTabular1D(
array[::-1], lookup_table=np.arange(len(array))[::-1])

class SpectralGWCS(GWCS):
def pixel_to_world(self, *args, **kwargs):
if orig_array.unit == '':
return u.Quantity(super().pixel_to_world_values(*args, **kwargs))
return super().pixel_to_world(*args, **kwargs).to(
orig_array.unit, equivalencies=u.spectral())

tabular_gwcs = SpectralGWCS(forward_transform=forward_transform,
input_frame=coord_frame,
output_frame=spec_frame)

# Store the intended unit from the origin input array
# tabular_gwcs._input_unit = orig_array.unit

return tabular_gwcs


class SpectralAxis(SpectralCoord):
"""
Coordinate object representing spectral values corresponding to a specific
spectrum. Overloads SpectralCoord with additional information (currently
only bin edges).
Parameters
----------
bin_specification: str, optional
Must be "edges" or "centers". Determines whether specified axis values
are interpreted as bin edges or bin centers. Defaults to "centers".
"""

_equivalent_unit = SpectralCoord._equivalent_unit + (u.pixel,)

def __new__(cls, value, *args, bin_specification="centers", **kwargs):

# Enforce pixel axes are ascending
if ((type(value) is u.quantity.Quantity) and
(value.size > 1) and
(value.unit is u.pix) and
(value[-1] <= value[0])):
raise ValueError("u.pix spectral axes should always be ascending")

# Convert to bin centers if bin edges were given, since SpectralCoord
# only accepts centers
if bin_specification == "edges":
bin_edges = value
value = SpectralAxis._centers_from_edges(value)

obj = super().__new__(cls, value, *args, **kwargs)

if bin_specification == "edges":
obj._bin_edges = bin_edges

return obj

@staticmethod
def _edges_from_centers(centers, unit):
"""
Calculates interior bin edges based on the average of each pair of
centers, with the two outer edges based on extrapolated centers added
to the beginning and end of the spectral axis.
"""
a = np.insert(centers, 0, 2*centers[0] - centers[1])
b = np.append(centers, 2*centers[-1] - centers[-2])
edges = (a + b) / 2
return edges*unit

@staticmethod
def _centers_from_edges(edges):
"""
Calculates the bin centers as the average of each pair of edges
"""
return (edges[1:] + edges[:-1]) / 2

@lazyproperty
def bin_edges(self):
"""
Calculates bin edges if the spectral axis was created with centers
specified.
"""
if hasattr(self, '_bin_edges'):
return self._bin_edges
else:
return self._edges_from_centers(self.value, self.unit)


class Spectrum(NDCube):
r"""
Spectrum container for data with one spectral axis.
Note that "1D" in this case refers to the fact that there is only one
spectral axis. `Spectrum` can contain "vector 1D spectra" by having the
``flux`` have a shape with dimension greater than 1.
Notes
-----
A stripped down version of `Spectrum1D` from `specutils`.
Parameters
----------
data : `~astropy.units.Quantity`
The data for this spectrum. This can be a simple `~astropy.units.Quantity`,
or an existing `~Spectrum1D` or `~ndcube.NDCube` object.
uncertainty : `~astropy.nddata.NDUncertainty`
Contains uncertainty information along with propagation rules for
spectrum arithmetic. Can take a unit, but if none is given, will use
the unit defined in the flux.
spectral_axis : `~astropy.units.Quantity` or `~specutils.SpectralAxis`
Dispersion information with the same shape as the dimension specified by spectral_dimension
of shape plus one if specifying bin edges.
spectral_dimension : `int` default 0
The dimension of the data which represents the spectral information default to first dimension index 0.
mask : `~numpy.ndarray`-like
Array where values in the flux to be masked are those that
``astype(bool)`` converts to True. (For example, integer arrays are not
masked where they are 0, and masked for any other value.)
meta : dict
Arbitrary container for any user-specific information to be carried
around with the spectrum container object.
"""

def __init__(self, data, *, uncertainty=None, spectral_axis=None,
spectral_dimension=0, mask=None, meta=None, **kwargs):

# If the flux (data) argument is already a Spectrum (as it would
# be for internal arithmetic operations), avoid setup entirely.
if isinstance(data, Spectrum):
super().__init__(data)
return

# Ensure that the flux argument is an astropy quantity
if data is not None:
if not isinstance(data, u.Quantity):
raise ValueError("Flux must be a `Quantity` object.")
elif data.isscalar:
data = u.Quantity([data])

# Ensure that the unit information codified in the quantity object is
# the One True Unit.
kwargs.setdefault('unit', data.unit if isinstance(data, u.Quantity)
else kwargs.get('unit'))

# If flux and spectral axis are both specified, check that their lengths
# match or are off by one (implying the spectral axis stores bin edges)
if data is not None and spectral_axis is not None:
if spectral_axis.shape[0] == data.shape[spectral_dimension]:
bin_specification = "centers"
elif spectral_axis.shape[0] == data.shape[spectral_dimension]+1:
bin_specification = "edges"
else:
raise ValueError(
"Spectral axis length ({}) must be the same size or one "
"greater (if specifying bin edges) than that of the spextral"
"axis ({})".format(spectral_axis.shape[0],
data.shape[spectral_dimension]))

# Attempt to parse the spectral axis. If none is given, try instead to
# parse a given wcs. This is put into a GWCS object to
# then be used behind-the-scenes for all operations.
if spectral_axis is not None:
# Ensure that the spectral axis is an astropy Quantity
if not isinstance(spectral_axis, u.Quantity):
raise ValueError("Spectral axis must be a `Quantity` or "
"`SpectralAxis` object.")

# If a spectral axis is provided as an astropy Quantity, convert it
# to a SpectralAxis object.
if not isinstance(spectral_axis, SpectralAxis):
if spectral_axis.shape[0] == data.shape[spectral_dimension] + 1:
bin_specification = "edges"
else:
bin_specification = "centers"
self._spectral_axis = SpectralAxis(
spectral_axis,
bin_specification=bin_specification)

wcs = gwcs_from_array(self._spectral_axis)

super().__init__(
data=data.value if isinstance(data, u.Quantity) else data,
wcs=wcs, mask=mask, meta=meta, uncertainty=uncertainty, **kwargs)


class CountSpectrum(Spectrum):
r"""
Spectrum container for count data with one spectral axis.
Specifically, the data must be supplied as counts or convertable to counts by multiplying by the provided
normalisation.
Parameters
----------
data : `~astropy.units.Quantity`
The data for this spectrum. This can be a simple `~astropy.units.Quantity`,
or an existing `~Spectrum1D` or `~ndcube.NDCube` object.
norm : `~astropy.units.Quantity`
The normalisation if the data unit is not counts then the product the unit of data and norm must be counts.
uncertainty : `~astropy.nddata.NDUncertainty`
Contains uncertainty information along with propagation rules for spectrum arithmetic. Can take a unit, but if
none is given, will use the unit defined in the flux.
spectral_axis : `~astropy.units.Quantity` or `~specutils.SpectralAxis`
Dispersion information with the same shape as the dimension specified by spectral_dimension
of shape plus one if specifying bin edges.
spectral_dimension : `int` default 0
The dimension of the data which represents the spectral information default to first dimension index 0.
mask : `~numpy.ndarray`-like
Array where values in the flux to be masked are those that
``astype(bool)`` converts to True. (For example, integer arrays are not
masked where they are 0, and masked for any other value.)
meta : dict
Arbitrary container for any user-specific information to be carried
around with the spectrum container object.
"""

def __init__(self, data, norm=None, **kwargs):
if data.unit != u.ct:
data_norm_unit = (data.unit * norm.unit).decompose().bases[0]
if data_norm_unit != u.ct:
raise ValueError('Data must be supplied in counts or the product of the norm and data has units of '
'counts')
data = data * norm
self.norm = norm
super().__init__(data, **kwargs)
28 changes: 28 additions & 0 deletions sunkit_spex/spectrum/tests/test_spectrum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import numpy as np
from numpy.testing import assert_array_equal

import astropy.units as u

from sunkit_spex.spectrum.spectrum import CountSpectrum, Spectrum


def test_spectrum_bin_edges():
spec = Spectrum(np.arange(1, 11)*u.watt, spectral_axis=np.arange(1, 12)*u.keV)
assert_array_equal(spec._spectral_axis, [1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5] * u.keV)


def test_spectrum_bin_centers():
spec = Spectrum(np.arange(1, 11)*u.watt, spectral_axis=(np.arange(1, 11) - 0.5) * u.keV)
assert_array_equal(spec._spectral_axis, [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5] * u.keV)


def test_countspectrum():
count_spec = CountSpectrum(np.arange(1, 11)*u.ct, spectral_axis=np.arange(1, 12)*u.keV)
assert isinstance(count_spec, CountSpectrum)
assert count_spec.norm is None


def test_countspectrum_with_normalization():
count_spec = CountSpectrum(np.arange(1, 11)*u.ct*u.s, spectral_axis=np.arange(1, 12)*u.keV, norm=np.ones(10)/u.s)
assert isinstance(count_spec, CountSpectrum)
assert_array_equal(count_spec.norm, np.ones(10)/u.s)

0 comments on commit 87cf14f

Please sign in to comment.