Source code for sdo.aia._filtergrams

from typing import Self, Literal
import os
import pathlib
import dataclasses
import numpy as np
import astropy.units as u
import astropy.time
import astropy.wcs
import astropy.io.fits
import sunpy.net.attrs
import aiapy.calibrate.utils
import named_arrays as na
import sdo

__all__ = [
    "Filtergram",
]


[docs] @dataclasses.dataclass(eq=False, repr=False) class Filtergram( na.FunctionArray[ na.ExplicitTemporalSpectralWcsPositionalVectorArray, na.ScalarArray, ], ): """ A representation of an AIA image sequence using any number of filters. """ axis_time: str = "time" """The logical axis corresponding to changes in time.""" axis_wavelength: str = "wavelength" """The logical axis corresponding to changes in wavelength.""" axis_detector_x: str = "detector_x" """The logical axis corresponding to changes in detector :math:`x`-coordinate.""" axis_detector_y: str = "detector_y" """The logical axis corresponding to changes in detector :math:`y`-coordinate."""
[docs] @classmethod def from_time_range( cls, time_start: str | astropy.time.Time, time_stop: str | astropy.time.Time, wavelength: u.Quantity | na.ScalarArray, user_email: None | str = None, series: Literal["aia.lev1_euv_12s", "aia.lev1_uv_24s"] = "aia.lev1_euv_12s", axis_time: str = "time", axis_detector_x: str = "detector_x", axis_detector_y: str = "detector_y", limit: None | int = None, ): """ Given a time range and a wavelength, download the corresponding AIA filtergram. .. important:: Your email must be `registered with JSOC <http://jsoc.stanford.edu/ajax/register_email.html>`_ to use this method. Parameters ---------- time_start The start time of the search period time_stop The end time of the search period. wavelength The wavelengths to download. Must be a valid AIA wavelength. user_email An email address used to notify the user that their JSOC request is complete. This email must be registered with JSOC before using this function. If :obj:`None`, the value is taken from the ``JSOC_EMAIL`` environment variable. series The data series to download. See the `sunpy documentation <https://docs.sunpy.org/en/stable/tutorial/acquiring_data/jsoc.html#querying-the-jsoc>`_ for more information. axis_time The logical axis corresponding to changes in time. axis_detector_x The logical axis corresponding to changes in detector :math:`x`-coordinate. axis_detector_y The logical axis corresponding to changes in detector :math:`y`-coordinate. limit The maximum number of files to download for each wavelength. """ time_start = astropy.time.Time(time_start) time_stop = astropy.time.Time(time_stop) wavelength = na.as_named_array(wavelength) if wavelength.ndim == 0: axis_wavelength = "wavelength" wavelength = wavelength.add_axes(axis_wavelength) elif wavelength.ndim == 1: axis_wavelength = wavelength.axes[0] else: # pragma: nocover raise ValueError(f"`wavelength` must be 0D or 1D, got {wavelength.shape=}") directory = sdo.directory_default directory.mkdir(parents=True, exist_ok=True) directory_level_1 = directory / "level_1" directory_level_15 = directory / "level_15" directory_level_15.mkdir(parents=True, exist_ok=True) pointing_table = aiapy.calibrate.utils.get_pointing_table( source="JSOC", time_range=( time_start - 12 * u.h, time_stop + 12 * u.h, ), ) if user_email is None: user_email = os.environ["JSOC_EMAIL"] attrs = ( sunpy.net.attrs.jsoc.Notify(user_email), sunpy.net.attrs.jsoc.Segment("image"), sunpy.net.attrs.jsoc.Series(series), ) if limit is not None: timedelta = (time_stop - time_start).to(u.s) time_start = time_start + timedelta / 2 period = timedelta / limit attrs = attrs + (sunpy.net.attrs.Sample(period),) attrs = attrs + (sunpy.net.attrs.Time(time_start, time_stop),) files = [] for w in wavelength.ndindex(): channel = wavelength[w].ndarray attrs_w = attrs + (sunpy.net.attrs.jsoc.Wavelength(channel),) search = sunpy.net.Fido.search(*attrs_w) files_wavelength = sunpy.net.Fido.fetch( search, path=directory_level_1, progress=False, ) files_wavelength = sorted(files_wavelength) files_15 = [] for file in files_wavelength: file_15 = directory_level_15 / pathlib.Path(file).name files_15.append(file_15) if not file_15.is_file(): aia_map = sunpy.map.Map(file) aia_map = aiapy.calibrate.update_pointing( smap=aia_map, pointing_table=pointing_table, ) # aia_map = aiapy.calibrate.register(aia_map) aia_map.save(file_15) files_wavelength = files_15 files_wavelength = np.array(files_wavelength) files_wavelength = na.ScalarArray(files_wavelength, axes=axis_time) files.append(files_wavelength) files = na.stack(files, axis=axis_wavelength) files = files.transpose((axis_time, axis_wavelength)) return cls.from_fits( path=files, wavelength=wavelength, axis_time=axis_time, axis_wavelength=axis_wavelength, axis_detector_x=axis_detector_x, axis_detector_y=axis_detector_y, )
[docs] @classmethod def from_fits( cls, path: pathlib.Path | na.ScalarArray[pathlib.Path], wavelength: u.Quantity | na.ScalarArray, axis_time: str = "time", axis_wavelength: str = "wavelength", axis_detector_x: str = "detector_x", axis_detector_y: str = "detector_y", ) -> Self: """ Given a single FITS file or an array of FITS files with the same OBSID, construct a SpectrographObservation object. Parameters ---------- path A single FITS file or an array of FITS files to load. window The spectral window to load. axis_time The logical axis corresponding to changes in time. axis_wavelength The logical axis corresponding to changes in wavelength. axis_detector_x The logical axis corresponding to changes in detector :math:`x`-coordinate. axis_detector_y The logical axis corresponding to changes in detector :math:`y`-coordinate. """ path = na.asarray(path) shape_base = path.shape hdul_prototype = astropy.io.fits.open(path.ndarray.item(0)) index_window = 0 hdu_prototype = hdul_prototype[index_window] wcs_prototype = astropy.wcs.WCS(hdu_prototype) axes_wcs = list(reversed(wcs_prototype.axis_type_names)) ix = axes_wcs.index("HPLN") iy = axes_wcs.index("HPLT") axes_wcs[ix] = axis_detector_x axes_wcs[iy] = axis_detector_y shape_wcs = wcs_prototype.array_shape shape_wcs = {ax: sz for ax, sz in zip(axes_wcs, shape_wcs)} index_max = { axis_detector_x: slice(None, shape_wcs[axis_detector_x]), axis_detector_y: slice(None, shape_wcs[axis_detector_y]), } self = cls.empty( shape_base=shape_base, shape_wcs=shape_wcs, axis_time=axis_time, axis_wavelength=axis_wavelength, axis_detector_x=axis_detector_x, axis_detector_y=axis_detector_y, ) for index in path.ndindex(): file = path[index].ndarray hdul = astropy.io.fits.open( name=file, output_verify="silentfix", ) hdu = hdul[index_window] self.outputs[index] = na.ScalarArray( ndarray=hdu.data << u.DN, axes=tuple(shape_wcs), )[index_max] time = astropy.time.Time(hdu.header["DATE-OBS"]).jd self.inputs.time[index] = time wcs = astropy.wcs.WCS(hdu).wcs crval = self.inputs.crval crval.position.x[index] = wcs.crval[~ix] << u.deg crval.position.y[index] = wcs.crval[~iy] << u.deg crpix = self.inputs.crpix crpix.components[axis_detector_x][index] = wcs.crpix[~ix] crpix.components[axis_detector_y][index] = wcs.crpix[~iy] cdelt = self.inputs.cdelt cdelt.position.x[index] = wcs.cdelt[~ix] << u.deg cdelt.position.y[index] = wcs.cdelt[~iy] << u.deg pc_wcs = wcs.get_pc() pc = self.inputs.pc pc.position.x.components[axis_detector_x][index] = pc_wcs[~ix, ~ix] pc.position.x.components[axis_detector_y][index] = pc_wcs[~ix, ~iy] pc.position.y.components[axis_detector_x][index] = pc_wcs[~iy, ~ix] pc.position.y.components[axis_detector_y][index] = pc_wcs[~iy, ~iy] t = astropy.time.Time( val=self.inputs.time.ndarray, format="jd", ) t.format = "isot" self.inputs.time.ndarray = t self.inputs.wavelength = wavelength return self
[docs] @classmethod def empty( cls, shape_base: dict[str, int], shape_wcs: dict[str, int], axis_time: str = "time", axis_wavelength: str = "wavelength", axis_detector_x: str = "detector_x", axis_detector_y: str = "detector_y", ) -> Self: """ Create an empty :class:`Filtergrams` object. Parameters ---------- shape_base The shape of the result excluding the axes handled by WCS. shape_wcs The shape of the axes handled by WCS. axis_time The logical axis corresponding to changes in time. axis_wavelength The logical axis corresponding to changes in wavelength. axis_detector_x The logical axis corresponding to changes in detector :math:`x`-coordinate. axis_detector_y The logical axis corresponding to changes in detector :math:`y`-coordinate. """ inputs = na.ExplicitTemporalSpectralWcsPositionalVectorArray( time=na.ScalarArray.zeros(shape_base), wavelength=na.ScalarArray.zeros(shape_base) << u.AA, crval=na.PositionalVectorArray( position=na.Cartesian2dVectorArray( x=na.ScalarArray.empty(shape_base) << u.arcsec, y=na.ScalarArray.empty(shape_base) << u.arcsec, ), ), crpix=na.CartesianNdVectorArray( components=dict( detector_x=na.ScalarArray.empty(shape_base), detector_y=na.ScalarArray.empty(shape_base), ) ), cdelt=na.PositionalVectorArray( position=na.Cartesian2dVectorArray( x=na.ScalarArray.empty(shape_base) << u.arcsec, y=na.ScalarArray.empty(shape_base) << u.arcsec, ), ), pc=na.PositionalMatrixArray( position=na.Cartesian2dMatrixArray( x=na.CartesianNdVectorArray( components=dict( detector_x=na.ScalarArray.empty(shape_base), detector_y=na.ScalarArray.empty(shape_base), ), ), y=na.CartesianNdVectorArray( components=dict( detector_x=na.ScalarArray.empty(shape_base), detector_y=na.ScalarArray.empty(shape_base), ), ), ), ), shape_wcs={a: shape_wcs[a] + 1 for a in shape_wcs}, ) shape = na.broadcast_shapes(shape_base, shape_wcs) outputs = na.ScalarArray.empty(shape) << u.DN return cls( inputs=inputs, outputs=outputs, # timedelta=timedelta, axis_time=axis_time, axis_wavelength=axis_wavelength, axis_detector_x=axis_detector_x, axis_detector_y=axis_detector_y, )