Source code for pyiwfm.io.hydrograph_loader
"""
Lazy hydrograph data loader backed by HDF5.
Provides the same interface as ``IWFMHydrographReader`` but reads from
HDF5 cache files produced by ``hydrograph_converter.py``. This avoids
loading the full text file into memory and enables LRU-cached access.
"""
from __future__ import annotations
import logging
from collections import OrderedDict
from pathlib import Path
import h5py
import numpy as np
from numpy.typing import NDArray
logger = logging.getLogger(__name__)
[docs]
class LazyHydrographDataLoader:
"""Lazy loader for hydrograph time series backed by HDF5.
Exposes the same public interface as ``IWFMHydrographReader`` so the
web viewer route code can use either interchangeably.
Parameters
----------
file_path : Path or str
Path to the HDF5 cache file.
cache_size : int
Number of timestep rows to keep in the LRU cache.
"""
[docs]
def __init__(self, file_path: Path | str, cache_size: int = 100) -> None:
self._file_path = Path(file_path)
self._cache_size = cache_size
self._cache: OrderedDict[int, NDArray[np.float64]] = OrderedDict()
self._times: list[str] = []
self._hydrograph_ids: list[int] = []
self._layers: list[int] = []
self._node_ids: list[int] = []
self._n_columns = 0
self._n_timesteps = 0
self._load_metadata()
# ------------------------------------------------------------------
# Metadata
# ------------------------------------------------------------------
def _load_metadata(self) -> None:
if not self._file_path.exists():
logger.warning("Hydrograph HDF5 not found: %s", self._file_path)
return
try:
with h5py.File(self._file_path, "r") as f:
if "data" not in f:
logger.warning("No 'data' dataset in %s", self._file_path)
return
ds = f["data"]
self._n_timesteps = ds.shape[0]
self._n_columns = ds.shape[1] if ds.ndim > 1 else 1
if "times" in f:
raw = f["times"][:]
self._times = [t.decode() if isinstance(t, bytes) else str(t) for t in raw]
if "hydrograph_ids" in f:
self._hydrograph_ids = f["hydrograph_ids"][:].tolist()
if "layers" in f:
self._layers = f["layers"][:].tolist()
if "node_ids" in f:
self._node_ids = f["node_ids"][:].tolist()
logger.info(
"Hydrograph HDF5 loaded: %d timesteps, %d columns from %s",
self._n_timesteps,
self._n_columns,
self._file_path.name,
)
except (OSError, KeyError, ValueError, TypeError) as e:
logger.error("Failed to load hydrograph HDF5 metadata: %s", e)
# ------------------------------------------------------------------
# Properties (IWFMHydrographReader interface)
# ------------------------------------------------------------------
@property
def n_columns(self) -> int:
return self._n_columns
@property
def n_timesteps(self) -> int:
return self._n_timesteps
@property
def times(self) -> list[str]:
return self._times
@property
def hydrograph_ids(self) -> list[int]:
return self._hydrograph_ids
@property
def layers(self) -> list[int]:
return self._layers
@property
def node_ids(self) -> list[int]:
return self._node_ids
# ------------------------------------------------------------------
# Data access
# ------------------------------------------------------------------
def _load_row(self, row_idx: int) -> NDArray[np.float64]:
"""Load a single row from the HDF5 dataset."""
with h5py.File(self._file_path, "r") as f:
ds = f["data"]
result: NDArray[np.float64] = ds[row_idx].astype(np.float64)
return result
def _evict_if_needed(self) -> None:
while len(self._cache) >= self._cache_size:
self._cache.popitem(last=False)
[docs]
def get_row(self, row_idx: int) -> NDArray[np.float64]:
"""Get a cached row by index."""
if row_idx in self._cache:
self._cache.move_to_end(row_idx)
return self._cache[row_idx]
self._evict_if_needed()
row = self._load_row(row_idx)
self._cache[row_idx] = row
return row
[docs]
def get_time_series(self, column_index: int) -> tuple[list[str], list[float]]:
"""Get time series for a specific column.
Parameters
----------
column_index : int
0-based column index.
Returns
-------
tuple[list[str], list[float]]
(times, values) where times are ISO 8601 strings.
"""
if column_index < 0 or column_index >= self._n_columns:
return [], []
# For full-column extraction, read the whole column at once
# (more efficient than row-by-row for column access).
with h5py.File(self._file_path, "r") as f:
ds = f["data"]
values = ds[:, column_index].astype(np.float64).tolist()
return self._times, values
[docs]
def find_column_by_node_id(self, node_id: int) -> int | None:
"""Find column index for a given node/element ID."""
if node_id in self._node_ids:
return self._node_ids.index(node_id)
return None