Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 106 additions & 11 deletions activitysim/core/skim_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,8 @@ def _should_ignore(ignore, x):
)

d = _drop_unused_names(state, d)
# apply non-zarr dependent digital encoding
d = _apply_digital_encoding(d, skim_digital_encoding)
# note: digital encoding is deferred and applied later, after
# data is in its final memory location (shared memory or local)

if skim_tag in ("taz", "maz"):
# check alignment of TAZs that it matches land_use table
Expand All @@ -904,11 +904,79 @@ def _should_ignore(ignore, x):
else:
land_use_zone_id = None

time_periods = _dedupe_time_periods(network_los_preload)
return _finalize_skim_dataset(
d,
omx_file_paths=omx_file_paths,
omx_file_handles=omx_file_handles,
time_periods=time_periods,
land_use_zone_id=land_use_zone_id,
land_use_index=land_use.index.to_numpy(),
zone_system=network_los_preload.zone_system,
store_skims_in_shm=state.settings.store_skims_in_shm,
backing=backing,
skim_digital_encoding=skim_digital_encoding,
omx_ignore_patterns=state.settings.omx_ignore_patterns,
)


def _finalize_skim_dataset(
d,
omx_file_paths,
omx_file_handles,
time_periods,
land_use_zone_id,
land_use_index,
zone_system,
store_skims_in_shm,
backing,
skim_digital_encoding,
omx_ignore_patterns=None,
):
"""
Align, optionally share, and encode a skim dataset.

This covers the final phase of ``load_skim_dataset_to_shared_memory``:
zone alignment checks, writing into shared memory (with the deferred
``reload_from_omx_3d`` path when possible), and digital encoding.

Parameters
----------
d : xr.Dataset
The dataset as loaded from OMX / zarr, with unused variables already
dropped. May be backed by dask arrays.
omx_file_paths : list[Path]
Paths to the OMX source files (needed for `reload_from_omx_3d`).
omx_file_handles : list[openmatrix.File]
Already-open OMX file handles. Closed before returning.
time_periods : list[str]
Deduplicated time-period labels (e.g. ``["AM", "MD", "PM"]``).
land_use_zone_id : array-like or None
Original (pre-remap) zone IDs from the land-use table, or ``None``
for non-taz/maz skim tags.
land_use_index : array-like or None
Zero-based contiguous land-use index, or ``None``.
zone_system : int
``ONE_ZONE``, ``TWO_ZONE``, or ``THREE_ZONE``.
store_skims_in_shm : bool
Whether to store the dataset in shared memory.
backing : str
Shared-memory backing token / memmap path.
skim_digital_encoding : list[dict]
Digital encoding instructions to apply after data is in its final
memory location.
omx_ignore_patterns : list[str] or None
User-supplied OMX ignore patterns (from settings).

Returns
-------
xr.Dataset
"""
from activitysim.core.los import ONE_ZONE

dask_required = False
if network_los_preload.zone_system == ONE_ZONE:
if zone_system == ONE_ZONE and land_use_zone_id is not None:
# check TAZ alignment for ONE_ZONE system.
# other systems use MAZ for most lookups, which dynamically
# resolves to TAZ inside the Dataset code.
if d["otaz"].attrs.get("preprocessed") != "zero-based-contiguous":
try:
np.testing.assert_array_equal(land_use_zone_id, d.otaz)
Expand All @@ -918,10 +986,10 @@ def _should_ignore(ignore, x):
dask_required = True
else:
logger.info("otaz alignment ok")
d["otaz"] = land_use.index.to_numpy()
d["otaz"] = np.asarray(land_use_index)
d["otaz"].attrs["preprocessed"] = "zero-based-contiguous"
else:
np.testing.assert_array_equal(land_use.index, d.otaz)
np.testing.assert_array_equal(land_use_index, d.otaz)

if d["dtaz"].attrs.get("preprocessed") != "zero-based-contiguous":
try:
Expand All @@ -932,24 +1000,28 @@ def _should_ignore(ignore, x):
dask_required = True
else:
logger.info("dtaz alignment ok")
d["dtaz"] = land_use.index.to_numpy()
d["dtaz"] = np.asarray(land_use_index)
d["dtaz"].attrs["preprocessed"] = "zero-based-contiguous"
else:
np.testing.assert_array_equal(land_use.index, d.dtaz)
np.testing.assert_array_equal(land_use_index, d.dtaz)

if d.shm.is_shared_memory:
for f in omx_file_handles:
f.close()
return d
elif not state.settings.store_skims_in_shm:
elif not store_skims_in_shm:
logger.info(
"store_skims_in_shm is False, keeping skims in process-local memory"
)
for f in omx_file_handles:
f.close()
d = _apply_digital_encoding(d, skim_digital_encoding)
return d
else:
logger.info("writing skims to shared memory")
if dask_required:
# setting `load` to True uses dask to load the data into memory
d = _apply_digital_encoding(d, skim_digital_encoding)
d_shared_mem = d.shm.to_shared_memory(backing, mode="r", load=True)
else:
# setting `load` to false then calling `reload_from_omx_3d` avoids
Expand All @@ -958,11 +1030,34 @@ def _should_ignore(ignore, x):
# requires no realignment (i.e. the land use table and skims match
# exactly in order and length).
d_shared_mem = d.shm.to_shared_memory(backing, mode="r", load=False)
# Build an extended ignore list that includes any skims that were
# dropped as unused, so reload_from_omx_3d doesn't try to load them.
# We must account for 3D skims where OMX names have time period
# suffixes (e.g. WLK_Bus_Ivtt__AM) but the dataset variable is
# the collapsed name (e.g. WLK_Bus_Ivtt).
reload_ignore = list(omx_ignore_patterns or [])
ds_var_names = set(d.variables.keys())
# expand dataset variable names to include their time-period
# suffixed OMX equivalents, so we don't accidentally drop them
ds_omx_names = set()
for var_name in ds_var_names:
ds_omx_names.add(var_name)
for tp in time_periods:
ds_omx_names.add(f"{var_name}__{tp}")
all_omx_names = set()
for f in omx_file_handles:
all_omx_names.update(f.list_matrices())
dropped_names = all_omx_names - ds_omx_names
for name in dropped_names:
reload_ignore.append(f"^{re.escape(name)}$")
sh.dataset.reload_from_omx_3d(
d_shared_mem,
[str(i) for i in omx_file_paths],
ignore=state.settings.omx_ignore_patterns,
ignore=reload_ignore,
)
# apply digital encoding AFTER reload so raw OMX values are
# properly encoded in the shared memory dataset
d_shared_mem = _apply_digital_encoding(d_shared_mem, skim_digital_encoding)
for f in omx_file_handles:
f.close()
return d_shared_mem
Expand Down
Loading
Loading