Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ test_data/
/imap_processing/tests/swe/l1_validation/
/imap_processing/tests/swe/l2_validation/
imap_processing/tests/lo/test_cdfs/imap_lo_l1c_pset_20260101-repoint01261_v001.cdf
/imap_processing/tests/glows/validation_data/combined_de_l1a.csv
# Ignore specific SPICE kernels that get downloaded from NAIF automatically for tests
# marked with @pytest.mark.external_kernel
**/de440*.bsp
**/pck0001*.tpc
**/earth_*_combined.bpc
**/earth_*_combined.bpc
22 changes: 22 additions & 0 deletions imap_processing/glows/l0/glows_l0_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,28 @@ def __post_init__(self) -> None:
int(self.DE_DATA, 2).to_bytes(len(self.DE_DATA) // 8, "big")
)

# Sort by SEQ
def __lt__(self, other: "DirectEventL0") -> bool:
"""
Define less-than comparison for DirectEventL0.

This is used when sorting lists of DirectEvents.
The L0 values should be sorted according to SEQ, which is the official
sequencing value for packets.

Parameters
----------
other : DirectEventL0
Another DirectEventL0 object for comparison.

Returns
-------
bool:
If the current object's SEQ is less than the other object's SEQ (i.e.
sorts SEQ in traditional ascending order).
"""
return self.SEQ < other.SEQ

def within_same_sequence(self, other: "DirectEventL0") -> bool:
"""
Compare fields for L0 which should be the same for packets within one sequence.
Expand Down
57 changes: 44 additions & 13 deletions imap_processing/glows/l1a/glows_l1a.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Methods for GLOWS Level 1A processing and CDF writing."""

import logging
from itertools import groupby
from pathlib import Path

import numpy as np
Expand All @@ -14,6 +16,8 @@
met_to_ttj2000ns,
)

logger = logging.getLogger(__name__)


def create_glows_attr_obj() -> ImapCdfAttributes:
"""
Expand Down Expand Up @@ -93,18 +97,48 @@ def process_de_l0(
Dictionary with keys of days and values of lists of DirectEventL1A objects.
Each day has one CDF file associated with it.
"""
de_list: list[DirectEventL1A] = []

for de in de_l0:
# Putting not first data int o last direct event list.
if de.SEQ != 0:
# If the direct event is part of a sequence and is not the first,
# add it to the last direct event in the list
de_list[-1].merge_de_packets(de)
l1a_output: list[DirectEventL1A] = []

# Sort by SEC, so groupby only has one instance of each SEC
sorted_l0 = sorted(de_l0, key=lambda x: x.SEC)

for sec, de in groupby(sorted_l0, lambda x: x.SEC):
de_list = list(de)
if len(de_list) == 1:
# Only one seq found
new_de = DirectEventL1A(de_list[0])
if new_de.l0.LEN != 1:
# We're missing packets off the end
new_de.finish_incomplete_packet()

l1a_output.append(new_de)
else:
de_list.append(DirectEventL1A(de))
sorted_des = sorted(de_list)
if sorted_des[0].SEQ != 0:
logger.warning(f"GLOWS: First SEQ not found for DE SEC {sec}")
# Processing cannot be run on this packet.
continue
first_de = DirectEventL1A(sorted_des[0])
for each_de in sorted_des[1:]:
try:
first_de.merge_de_packets(each_de)
except (ValueError, IndexError) as e:
# We don't want to stop processing for DE errors
logger.warning(
f"ERROR ENCOUNTERED in GLOWS DE processing. "
f"Excluding packet from output. Error: {e}"
)
continue

if sorted_des[-1].SEQ != first_de.l0.LEN:
first_de.finish_incomplete_packet()

l1a_output.append(first_de)

return de_list
# Filter out DE records with no direct_events (incomplete packet sequences)
l1a_output = [de for de in l1a_output if de.direct_events is not None]

return l1a_output


def generate_de_dataset(
Expand All @@ -128,9 +162,6 @@ def generate_de_dataset(
"""
# TODO: Block header per second, or global attribute?

# Filter out DE records with no direct_events (incomplete packet sequences)
de_l1a_list = [de for de in de_l1a_list if de.direct_events is not None]

# Store timestamps for each DirectEventL1a object.
time_data = np.zeros(len(de_l1a_list), dtype=np.int64)

Expand Down
14 changes: 12 additions & 2 deletions imap_processing/glows/l1a/glows_l1a_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ class DirectEventL1A:
-------
merge_de_packets
Add another Level0 instance.
finish_incomplete_packet
"""

l0: DirectEventL0
Expand Down Expand Up @@ -383,6 +384,7 @@ def merge_de_packets(self, second_l0: DirectEventL0) -> None:
f"Sequence for direct event L1A is out of order or "
f"incorrect. Attempted to append sequence counter "
f"{second_l0.SEQ} after {self.most_recent_seq}."
f"New DE time: {second_l0.SEC}, current time: {self.l0.SEC}."
)

# Track any missing sequence counts
Expand All @@ -392,7 +394,6 @@ def merge_de_packets(self, second_l0: DirectEventL0) -> None:
# Determine if new L0 packet matches existing L0 packet
match = self.l0.within_same_sequence(second_l0)

# TODO: Should this raise an error? Log? something else?
if not match:
raise ValueError(
f"While attempting to merge L0 packet {second_l0} "
Expand All @@ -404,10 +405,19 @@ def merge_de_packets(self, second_l0: DirectEventL0) -> None:

self.most_recent_seq = second_l0.SEQ
# if this is the last packet in the sequence, process the DE data
# TODO: What if the last packet never arrives?
if self.l0.LEN == self.most_recent_seq + 1:
self._process_de_data()

def finish_incomplete_packet(self) -> None:
"""
Finish an incomplete packet.

This will fill out the missing sequences and status data, but no DEs. This can
only run with at least the first packet.
"""
self.missing_seq += [i for i in range(self.most_recent_seq + 1, self.l0.LEN)]
self.status_data = StatusData(self.de_data[:40])

def _process_de_data(self) -> None:
"""
Will process direct event bytes.
Expand Down
3 changes: 3 additions & 0 deletions imap_processing/tests/external_test_data_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,7 @@
("swe_l0_unpacked-data_20240510_v001_VALIDATION_L2_bins_v0F_15.dat", "swe/l2_validation/"),
("swe_l0_unpacked-data_20240510_v001_VALIDATION_L2_bins_v1H_14_6.dat", "swe/l2_validation/"),
("swe_l0_unpacked-data_20240510_v001_VALIDATION_L2_bins_v0H_14_6.dat", "swe/l2_validation/"),

# GLOWS
("combined_de_l1a.csv", "glows/validation_data")
] # fmt: skip
6 changes: 5 additions & 1 deletion imap_processing/tests/glows/test_glows_l1a_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ def test_generate_histogram_dataset(l1a_test_data):
def test_generate_de_dataset(l1a_test_data):
_, de_l1a = l1a_test_data
glows_attrs = create_glows_attr_obj()

dataset = generate_de_dataset(de_l1a, glows_attrs)
assert len(dataset["epoch"].values) == len(de_l1a)
non_none_len = len([de for de in de_l1a if de.de_data is not None])
assert len(dataset["epoch"].values) == non_none_len

# Output dataarrays are padded to the longest length in the entire set of packets.
# Test data for the first and last DE need to be padded to this length
assert (
dataset["direct_events"].data[0]
== np.pad(
Expand Down
45 changes: 14 additions & 31 deletions imap_processing/tests/glows/test_glows_l1a_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import ast
import dataclasses
import json
from pathlib import Path
Expand Down Expand Up @@ -424,26 +423,25 @@ def test_generate_status_data():
assert dataclasses.asdict(output) == expected


@pytest.mark.external_test_data
def test_expected_de_results(l1a_test_data):
_, de_data = l1a_test_data

# Validation data is generated from the code sent over by GLOWS team. Contains the
# first 20 packets
validation_data = pd.read_csv(
Path(__file__).parent
/ "validation_data"
/ "direct_events_validation_data_l1a.csv",
converters={"de_data": ast.literal_eval},
Path(__file__).parent / "validation_data" / "combined_de_l1a.csv",
converters={
"de_data": lambda x: [
[int(i) for i in n.split(" ") if i != ""] for n in x.split("\n")
]
},
)
assert validation_data.index.size == 5703

for index in validation_data.index:
de = de_data[validation_data["packet_counter"][index]]
de = de_data[index]

assert (
de.l0.ccsds_header.SRC_SEQ_CTR
== validation_data["seq_count_in_pkts_file"][index]
)
assert de.l0.SEC == validation_data["imap_start_time_seconds"][index]
assert (
de.status_data.imap_sclk_last_pps
== validation_data["imap_sclk_last_pps"][index]
Expand Down Expand Up @@ -521,26 +519,11 @@ def test_expected_de_results(l1a_test_data):

assert de.l0.LEN == validation_data["number_of_de_packets"][index]

assert (
de.direct_events[
validation_data["de_data_counter"][index]
].timestamp.seconds
== validation_data["de_data"][index][0]
)
assert (
de.direct_events[
validation_data["de_data_counter"][index]
].timestamp.subseconds
== validation_data["de_data"][index][1]
)
assert (
de.direct_events[validation_data["de_data_counter"][index]].impulse_length
== validation_data["de_data"][index][2]
)
assert (
de.direct_events[validation_data["de_data_counter"][index]].multi_event
== validation_data["de_data"][index][3]
)
de_val = validation_data["de_data"][index]
for de_counter, direct_event in enumerate(de.direct_events):
assert direct_event.timestamp.seconds == de_val[de_counter][0]
assert direct_event.timestamp.subseconds == de_val[de_counter][1]
assert direct_event.impulse_length == de_val[de_counter][2]


def test_expected_hist_results(l1a_dataset):
Expand Down
Loading