Skip to content
221 changes: 221 additions & 0 deletions chandra_aca/manvr_mon_images.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""Read ACA maneuver monitor window image data"""

# Standard library imports
import enum
import os
from pathlib import Path
from typing import Generator

# Third-party imports
import astropy.table as apt
import astropy.units as u
import numpy as np
from cxotime import CxoTime, CxoTimeLike

# Ska/Chandra imports
from chandra_aca.dark_model import dark_temp_scale

DN_TO_ELEC = np.float32(5.0 / 1.696) # Convert DN (per readout) to e-/s
BAD_PIXEL_LOW = -5 # Limit below which a pixel is considered corrupted (DN)
BAD_PIXEL_HIGH = 30000 # Upper limit for bad pixel (DN)


class ImgStatus(enum.Enum):
"""Bit flags for image masks."""

SUM_OUTLIER = np.uint8(0b001)
CORR_SUM_OUTLIER = np.uint8(0b010)
HAS_BAD_PIX = np.uint8(0b100)


def get_years_doys(
start: CxoTimeLike, stop: CxoTimeLike
) -> Generator[tuple[str, str], None, None]:
"""Generate (year, doy) tuples for days between start and stop with margin."""
start = CxoTime(start) - 1.5 * u.d
stop = CxoTime(stop) + 1.5 * u.d
date = start
while date < stop:
yield date.date[:4], date.date[5:8]
date += 1 * u.d


def imgs_root_dir_path(data_dir: Path | str | None = None) -> Path:
"""Get root path to store monitor window images."""
if data_dir is None:
data_dir = Path(os.environ["SKA"]) / "data" / "manvr_mon_images"
return Path(data_dir)


def read_manvr_mon_images( # noqa: PLR0915
start: CxoTimeLike,
stop: CxoTimeLike,
t_ccd_ref: float | None = -10.0,
scale_4c: float | None = None,
require_same_row_col: bool = True,
exact_interval: bool = False,
data_dir: Path | str | None = None,
) -> apt.Table:
"""Read ACA maneuver monitor window images from archived data files.

This function loads processed monitor window images from the compressed .npz
archive files created by save_imgs(). It concatenates data across multiple
files and date ranges, applies temperature correction if requested, and
returns an astropy Table.

Parameters
----------
start : CxoTimeLike
Start time for data retrieval (any format accepted by CxoTime).
stop : CxoTimeLike
Stop time for data retrieval (any format accepted by CxoTime).
t_ccd_ref : float or None, optional
Reference CCD temperature in Celsius for dark current scaling. If None, no
temperature correction is applied. Default is -10.0.
scale_4c : float or None, optional
Scaling factor in dark current temperature dependence. If None, uses default
from dark_temp_scale(). Default is None.
require_same_row_col : bool, optional
If True, only include images where all slots have the same row0 and col0
values across the entire time range. This uses the median values to filter.
Default is True.
exact_interval : bool, optional
If True, only include images with times exactly within start and stop.
Otherwise include all times within the maneuvers that are included within start
and stop. Default is False.
data_dir : Path or str, optional
Root directory containing the archived image files organized as
data_dir/YYYY/DOY/*.npz. Default is ``$SKA/data/manvr_mon_images``.

Returns
-------
dat : apt.Table
Table containing concatenated monitor window data with columns:
- time: observation times in CXO seconds since 1998.0
- img_raw: raw monitor window images in DN [slot, row, col]
- img_corr: corrected images in e-/s with temperature scaling [slot, row, col]
- mask: combined bit mask for image status flags [slot]
- sum_outlier: boolean flag for images with total sum outliers [slot]
- corr_sum_outlier: boolean flag for bgd-subtracted sum outliers [slot]
- bad_pixels: boolean flag for images with bad pixels [slot]
- t_ccd: CCD temperatures in Celsius
- earth_limb_angle: Earth limb angle in degrees
- moon_limb_angle: Moon limb angle in degrees
- rate: spacecraft rate in arcsec/sec
- idx_manvr: maneuver index for each sample
- row0: row0 position for each slot [slot]
- col0: col0 position for each slot [slot]

Notes
-----
- Raw images in DN are converted to e-/s using factor 5.0/1.696
- Temperature correction uses dark_temp_scale() from chandra_aca.dark_model
- Early CCD temperature samples are replaced with 5th sample to avoid artifacts
"""
start = CxoTime(start)
stop = CxoTime(stop)
data_path = imgs_root_dir_path(data_dir)

imgs_list = []
masks_list = []
times_list = []
row0s_list = []
col0s_list = []
t_ccds_list = []
earth_limb_angles_list = []
moon_limb_angles_list = []
rates_list = []
idx_manvrs_list = []
idx_manvr = 0

for year, doy in get_years_doys(start, stop):
dir = Path(data_path, year, doy)
for path in sorted(dir.glob("*.npz")):
with np.load(path) as dat:
n_samp = len(dat["t_ccd"])
# Move the sample dimension (-1) to the front so they concat properly.
# Imgs will then be indexed as [sample, slot, row, col].
imgs_list.append(np.moveaxis(dat["slot_imgs"], -1, 0))
# Masks will be [sample, slot].
masks_list.append(np.moveaxis(dat["slot_masks"], -1, 0))
# Repeat slot_row0s n_samp times for each sample.
row0s_list.append(np.repeat(dat["slot_row0s"][None, :], n_samp, axis=0))
col0s_list.append(np.repeat(dat["slot_col0s"][None, :], n_samp, axis=0))
t_ccds = dat["t_ccd"]
t_ccds[:5] = t_ccds[5]
t_ccds_list.append(t_ccds)
earth_limb_angles_list.append(dat["earth_limb_angle"])
moon_limb_angles_list.append(dat["moon_limb_angle"])
rates_list.append(dat["rate"])
times_list.append(dat["time0"] + 4.1 * np.arange(n_samp))
idx_manvrs_list.append(idx_manvr + np.zeros(n_samp, dtype=np.int32))
idx_manvr += 1

dat = apt.Table()

dat["time"] = np.concatenate(times_list)
dat["img_raw"] = np.concatenate(imgs_list)
masks = np.concatenate(masks_list)
dat["mask"] = masks
dat["sum_outlier"] = (masks & ImgStatus.SUM_OUTLIER.value) != 0
dat["corr_sum_outlier"] = (masks & ImgStatus.CORR_SUM_OUTLIER.value) != 0
dat["bad_pixels"] = (masks & ImgStatus.HAS_BAD_PIX.value) != 0
dat["t_ccd"] = np.concatenate(t_ccds_list)
dat["earth_limb_angle"] = np.concatenate(earth_limb_angles_list)
dat["moon_limb_angle"] = np.concatenate(moon_limb_angles_list)
dat["rate"] = np.concatenate(rates_list)
dat["idx_manvr"] = np.concatenate(idx_manvrs_list)
dat["row0"] = np.concatenate(row0s_list)
dat["col0"] = np.concatenate(col0s_list)

dat["img_corr"] = dat["img_raw"] * DN_TO_ELEC
if t_ccd_ref is not None:
dark_scale = dark_temp_scale(dat["t_ccd"], t_ccd_ref, scale_4c=scale_4c)
dat["img_corr"] *= dark_scale.astype(np.float32)[:, None, None, None]

# Archived bad_pixels flag is based on img_raw < 0, which is too strict. Many pixels
# are slightly negative due to background subtraction. Recompute here using -5 DN.
# Note that slot 5 goes to -15 DN for some reason but we treat all slots the same
# here.
bad_pix = np.any(
(dat["img_raw"] < BAD_PIXEL_LOW) | (dat["img_raw"] > BAD_PIXEL_HIGH),
axis=(2, 3),
)
dat["bad_pixels"] = bad_pix

# Remake masks now as well
dat["mask"][:] = np.uint8(0)
dat["mask"][dat["sum_outlier"]] |= ImgStatus.SUM_OUTLIER.value
dat["mask"][dat["corr_sum_outlier"]] |= ImgStatus.CORR_SUM_OUTLIER.value
dat["mask"][dat["bad_pixels"]] |= ImgStatus.HAS_BAD_PIX.value

i0, i1 = np.searchsorted(dat["time"], [start.secs, stop.secs])
if not exact_interval:
idx_manvr0 = dat["idx_manvr"][i0]
i0 = np.searchsorted(dat["idx_manvr"], idx_manvr0, side="left")
idx_manvr1 = dat["idx_manvr"][i1 - 1]
i1 = np.searchsorted(dat["idx_manvr"], idx_manvr1, side="right")
dat = dat[i0:i1]

if len(dat) > 0:
# Make idx_manvr start at 0
dat["idx_manvr"] -= dat["idx_manvr"][0]

if require_same_row_col:
# Compute the median of row0 and col0 across all samples and slots
# then choose only rows with those values.
median_row0 = np.median(dat["row0"], axis=0)
median_col0 = np.median(dat["col0"], axis=0)
ok = np.all(
(dat["row0"] == median_row0[None, :])
& (dat["col0"] == median_col0[None, :]),
axis=1,
)
dat = dat[ok]

# Formatting
dat["time"].info.format = ".3f"
dat["img_corr"].info.format = ".0f"
dat["t_ccd"].info.format = ".2f"

return dat
155 changes: 155 additions & 0 deletions chandra_aca/tests/test_manvr_mon_images.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import numpy as np
import pytest
from cxotime import CxoTime

from chandra_aca.dark_model import dark_temp_scale
from chandra_aca.manvr_mon_images import DN_TO_ELEC, read_manvr_mon_images


def test_read_manvr_mon_images_no_temperature_correction():
"""Test reading without temperature correction"""
# This also tests specifying a start time before the start of data, which is on
# 2023:295.
dat = read_manvr_mon_images(start="2023:100", stop="2023:310", t_ccd_ref=None)
assert np.allclose(dat["img_raw"] * DN_TO_ELEC, dat["img_corr"])
assert len(dat) == 46065


def test_read_manvr_mon_images_with_temperature_correction():
"""Test reading without temperature correction"""
# This also tests specifying a start time before the start of data, which is on
# 2023:295.
dat = read_manvr_mon_images(
start="2025:300",
stop="2025:302",
t_ccd_ref=-5.0,
scale_4c=1.5,
)
dark_scale = dark_temp_scale(dat["t_ccd"], t_ccd_ref=-5.0, scale_4c=1.5)
img_corr = (
dat["img_raw"] * DN_TO_ELEC * dark_scale.astype(np.float32)[:, None, None, None]
)
assert np.allclose(dat["img_corr"], img_corr)

assert dat.colnames == [
"time",
"img_raw",
"mask",
"sum_outlier",
"corr_sum_outlier",
"bad_pixels",
"t_ccd",
"earth_limb_angle",
"moon_limb_angle",
"rate",
"idx_manvr",
"row0",
"col0",
"img_corr",
]

assert dat.info(out=None).pformat() == [
" name dtype shape unit format description class n_bad length",
"---------------- ------- --------- ---- ------ ----------- ------ ----- ------",
" time float64 .3f Column 0 6434",
" img_raw int16 (8, 8, 8) Column 0 6434",
" mask uint8 (8,) Column 0 6434",
" sum_outlier bool (8,) Column 0 6434",
"corr_sum_outlier bool (8,) Column 0 6434",
" bad_pixels bool (8,) Column 0 6434",
" t_ccd float64 .2f Column 0 6434",
"earth_limb_angle float32 Column 0 6434",
" moon_limb_angle float32 Column 0 6434",
" rate float32 Column 0 6434",
" idx_manvr int32 Column 0 6434",
" row0 int16 (8,) Column 0 6434",
" col0 int16 (8,) Column 0 6434",
" img_corr float32 (8, 8, 8) .0f Column 0 6434",
]


def test_read_manvr_mon_images_require_same_row_col():
"""Test reading with require_same_row_col option"""
dat = read_manvr_mon_images(
start="2025:001",
stop="2025:002",
require_same_row_col=True,
)

# Check that all row0 and col0 values are the same as the median
median_row0 = np.median(dat["row0"], axis=0)
median_col0 = np.median(dat["col0"], axis=0)
assert np.all(dat["row0"] == median_row0[None, :])
assert np.all(dat["col0"] == median_col0[None, :])


def test_read_manvr_mon_images_require_same_row_col_false():
"""Test reading with require_same_row_col option"""
dat = read_manvr_mon_images(
start="2025:001",
stop="2025:002",
require_same_row_col=False,
)

# Check for known case where the col0 values differ (in slot 7)
median_col0 = np.median(dat["col0"], axis=0)
assert not np.all(dat["col0"] == median_col0[None, :])


def test_read_manvr_mon_images_exact_interval():
"""Test exact_interval parameter behavior"""
# Test with a narrow time range that falls within a maneuver
start = "2025:301:02:00:00"
stop = "2025:301:02:10:00"

# Get data with exact_interval=False (default - include full maneuvers)
dat_full_manvr = read_manvr_mon_images(
start=start,
stop=stop,
exact_interval=False,
)

# Has data outside of start/stop but contained within maneuver starting at
# 2025:301:01:51:26.182.
exp_full = ["2025:301:01:51:31.741", "2025:301:02:21:35.741"]
assert np.all(CxoTime(dat_full_manvr["time"][[0, -1]]).date == exp_full)

# Get data with exact_interval=True (only exact time range)
dat_exact = read_manvr_mon_images(
start=start,
stop=stop,
exact_interval=True,
)

# Contained within start / stop
exp_exact = ["2025:301:02:00:00.141", "2025:301:02:09:58.741"]
assert np.all(CxoTime(dat_exact["time"][[0, -1]]).date == exp_exact)


@pytest.mark.parametrize("exact_interval", [True, False])
def test_read_manvr_mon_images_zero_length(exact_interval):
"""Test a short interval not containing manvr mon data (during NPNT)"""
dat = read_manvr_mon_images(
"2024:001:01:00:00", "2024:001:01:01:00", exact_interval=exact_interval
)
assert len(dat) == 0
exp = [
" name dtype shape unit format description class n_bad length",
"---------------- ------- --------- ---- ------ ----------- ------ ----- ------",
" time float64 .3f Column 0 0",
" img_raw int16 (8, 8, 8) Column 0 0",
" mask uint8 (8,) Column 0 0",
" sum_outlier bool (8,) Column 0 0",
"corr_sum_outlier bool (8,) Column 0 0",
" bad_pixels bool (8,) Column 0 0",
" t_ccd float64 .2f Column 0 0",
"earth_limb_angle float32 Column 0 0",
" moon_limb_angle float32 Column 0 0",
" rate float32 Column 0 0",
" idx_manvr int32 Column 0 0",
" row0 int16 (8,) Column 0 0",
" col0 int16 (8,) Column 0 0",
" img_corr float32 (8, 8, 8) .0f Column 0 0",
]

assert dat.info(out=None).pformat() == exp