Source code for pyiwfm.calibration.results_extraction
"""
Unified results extraction pipeline for IWFM models.
Extracts time series at arbitrary locations from any all-node HDF5 output
(HeadAll or SubsidenceAll) using finite element interpolation.
For HEAD data, supports transmissivity-weighted multi-layer averaging.
For SUBSIDENCE data, supports layer summation (additive compaction) and
cumulative-to-incremental conversion.
This generalizes HeadAllExtractor to handle any all-node output type.
Example
-------
>>> extractor = ResultsExtractor(model, results_path, data_type='SUBSIDENCE')
>>> extractor.prepare(specs)
>>> result = extractor.extract()
>>> extractor.write_smp(Path("SUB_OUT.smp"), result)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
import numpy as np
from numpy.typing import NDArray
if TYPE_CHECKING:
from pyiwfm.core.model import IWFMModel
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ExtractionSpec:
"""Specification for an extraction point.
Attributes
----------
name : str
Location identifier.
x : float
X coordinate (in model units, typically feet).
y : float
Y coordinate (in model units, typically feet).
layer : int
Target model layer (1-based). Use 0 for all-layer aggregation
(average for HEAD, sum for SUBSIDENCE). Use -1 for T-weighted
multi-layer average (HEAD only).
element : int
Model element containing the point (0 = auto-detect).
bos : float
Bottom of screen elevation (feet), for T-weighted averaging.
tos : float
Top of screen elevation (feet), for T-weighted averaging.
"""
name: str
x: float
y: float
layer: int = 0
element: int = 0
bos: float = float("nan")
tos: float = float("nan")
[docs]
@dataclass
class ExtractionResult:
"""Results of extraction.
Attributes
----------
times : NDArray
Array of timestamps (datetime64).
names : list[str]
Names of extraction points in order.
values : dict[str, NDArray[np.float64]]
Extracted values: {name: array of shape (n_times,)}.
per_layer : dict[str, NDArray[np.float64]]
Per-layer values: {name: array of shape (n_times, n_layers)}.
data_type : str
'HEAD' or 'SUBSIDENCE'.
incremental : bool
If True, values are incremental (diff between timesteps).
"""
times: NDArray[np.datetime64]
names: list[str] = field(default_factory=list)
values: dict[str, NDArray[np.float64]] = field(default_factory=dict)
per_layer: dict[str, NDArray[np.float64]] = field(default_factory=dict)
data_type: str = "HEAD"
incremental: bool = False
[docs]
class ResultsExtractor:
"""Extract time series at arbitrary locations from all-node HDF5 output.
Parameters
----------
model : IWFMModel
Loaded IWFM model (for mesh geometry and stratigraphy).
results_path : Path
Path to the all-node HDF5 output file (HeadAll or SubsidenceAll).
data_type : str
'HEAD' or 'SUBSIDENCE'.
incremental : bool
If True and data_type is 'SUBSIDENCE', convert cumulative values
to incremental (current - previous timestep). Ignored for HEAD.
"""
[docs]
def __init__(
self,
model: IWFMModel,
results_path: Path,
data_type: Literal["HEAD", "SUBSIDENCE"] = "HEAD",
incremental: bool = True,
) -> None:
self._model = model
self._results_path = Path(results_path)
self._data_type = data_type.upper()
self._incremental = incremental if self._data_type == "SUBSIDENCE" else False
self._specs: list[ExtractionSpec] = []
self._fe_weights: dict[str, dict[str, Any]] = {}
self._t_weights: dict[str, NDArray[np.float64]] = {}
self._active_layers: dict[str, NDArray[np.bool_]] = {}
[docs]
def prepare(self, specs: list[ExtractionSpec]) -> None:
"""Pre-compute FE interpolation weights for all extraction points.
Parameters
----------
specs : list[ExtractionSpec]
Extraction specifications with coordinates in model units.
"""
from pyiwfm.core.interpolation import FEInterpolator
self._specs = specs
grid = self._model.grid
if grid is None:
raise ValueError("Model must have a grid loaded")
strat = self._model.stratigraphy
interp = FEInterpolator(grid)
n_layers = self._model.n_layers
n_outside = 0
for spec in specs:
# Find containing element and compute FE interpolation weights
elem_id, node_ids, coeffs = interp.interpolate(spec.x, spec.y)
if elem_id == 0:
n_outside += 1
continue
self._fe_weights[spec.name] = {
"element": elem_id,
"node_ids": node_ids,
"weights": coeffs,
}
# T-weights for multi-layer averaging (HEAD only, layer=-1)
if self._data_type == "HEAD" and spec.layer == -1:
if not np.isnan(spec.bos) and not np.isnan(spec.tos):
from pyiwfm.calibration.iwfm2obs import (
MultiLayerWellSpec,
compute_multilayer_weights,
)
ml_well = MultiLayerWellSpec(
name=spec.name,
x=spec.x,
y=spec.y,
element_id=elem_id,
bottom_of_screen=spec.bos,
top_of_screen=spec.tos,
)
kh = None
if (
self._model.groundwater
and self._model.groundwater.aquifer_params
and self._model.groundwater.aquifer_params.kh is not None
):
kh = self._model.groundwater.aquifer_params.kh.T
if kh is not None and strat is not None:
t_weights = compute_multilayer_weights(
ml_well,
grid,
strat,
kh,
fe_node_ids=node_ids,
fe_weights=coeffs,
)
self._t_weights[spec.name] = t_weights
else:
self._t_weights[spec.name] = np.ones(n_layers) / n_layers
else:
self._t_weights[spec.name] = np.ones(n_layers) / n_layers
# Build active-layer masks for HEAD Layer=0 (Fortran averages only
# over layers where at least one FE interpolation node is active).
if self._data_type == "HEAD" and strat is not None:
for spec in specs:
if spec.layer == 0 and spec.name in self._fe_weights:
fe_info = self._fe_weights[spec.name]
node_indices = np.array([nid - 1 for nid in fe_info["node_ids"]])
# active_node is (n_nodes, n_layers); layer active if ANY
# FE node is active there
active = np.any(strat.active_node[node_indices, :], axis=0)
self._active_layers[spec.name] = active
if n_outside > 0:
logger.warning("%d points outside model mesh, skipped", n_outside)
logger.info(
"Prepared %d points for %s extraction (%d with FE weights)",
len(specs),
self._data_type,
len(self._fe_weights),
)
[docs]
def extract(self, timesteps: list[int] | None = None) -> ExtractionResult:
"""Extract values at all locations.
Iterates timesteps in the outer loop so each HDF5 frame is read
exactly once, regardless of the number of extraction points.
Parameters
----------
timesteps : list[int], optional
Specific timestep indices to extract. If None, extract all.
Returns
-------
ExtractionResult
"""
from pyiwfm.io.head_loader import LazyHeadDataLoader
n_layers = self._model.n_layers
loader = LazyHeadDataLoader(self._results_path, n_layers=n_layers)
all_times = loader.times
if timesteps is not None:
time_indices = timesteps
else:
time_indices = list(range(len(all_times)))
times = np.array([all_times[i] for i in time_indices])
n_times = len(time_indices)
result = ExtractionResult(
times=times,
data_type=self._data_type,
incremental=self._incremental,
)
# Build list of active specs with pre-computed index arrays
active_specs: list[tuple[ExtractionSpec, NDArray, NDArray]] = []
for spec in self._specs:
if spec.name not in self._fe_weights:
continue
fe_info = self._fe_weights[spec.name]
node_indices = np.array([nid - 1 for nid in fe_info["node_ids"]])
coeffs = fe_info["weights"]
active_specs.append((spec, node_indices, coeffs))
# Pre-allocate per-layer arrays
per_layer_arrays: dict[str, NDArray[np.float64]] = {}
for spec, _, _ in active_specs:
per_layer_arrays[spec.name] = np.full((n_times, n_layers), np.nan, dtype=np.float64)
# Outer loop: timesteps (each frame read once)
for ti, ts_idx in enumerate(time_indices):
frame = loader.get_frame(ts_idx) # (n_nodes, n_layers)
for spec, node_indices, coeffs in active_specs:
for layer in range(n_layers):
vals = frame[node_indices, layer]
if not np.any(np.isnan(vals)):
per_layer_arrays[spec.name][ti, layer] = float(np.dot(coeffs, vals))
# Populate result and aggregate
for spec, _, _ in active_specs:
per_layer = per_layer_arrays[spec.name]
result.per_layer.setdefault(spec.name, per_layer)
result.names.append(spec.name)
aggregated = self._aggregate_layers(per_layer, spec, n_layers)
if self._incremental:
incr = np.full(n_times, np.nan, dtype=np.float64)
incr[0] = 0.0
incr[1:] = aggregated[1:] - aggregated[:-1]
result.values[spec.name] = incr
else:
result.values[spec.name] = aggregated
logger.info(
"Extracted %s for %d points across %d timesteps",
self._data_type,
len(result.names),
n_times,
)
return result
def _aggregate_layers(
self,
per_layer: NDArray[np.float64],
spec: ExtractionSpec,
n_layers: int,
) -> NDArray[np.float64]:
"""Aggregate per-layer values into a single time series.
HEAD:
- layer > 0: single layer
- layer == 0: average over layers with valid data
- layer == -1: T-weighted multi-layer average
SUBSIDENCE:
- layer > 0: single layer
- layer == 0: sum over all layers (additive compaction)
"""
n_times = per_layer.shape[0]
result = np.full(n_times, np.nan, dtype=np.float64)
if spec.layer > 0:
# Single specific layer
result = per_layer[:, spec.layer - 1].copy()
elif self._data_type == "SUBSIDENCE":
# Sum over all layers (additive compaction)
for ti in range(n_times):
valid = ~np.isnan(per_layer[ti, :])
if valid.any():
result[ti] = np.nansum(per_layer[ti, :])
elif spec.layer == -1 and self._data_type == "HEAD":
# T-weighted multi-layer average
t_weights = self._t_weights.get(spec.name)
if t_weights is not None:
for ti in range(n_times):
valid = ~np.isnan(per_layer[ti, :])
if valid.any():
w = t_weights[valid]
w_sum = w.sum()
if w_sum > 0:
result[ti] = float(np.dot(w, per_layer[ti, valid]) / w_sum)
else:
# layer == 0, HEAD: average over active layers only.
# Matches Fortran ResultsExtract which uses Stratigraphy%ActiveNode
# to exclude inactive layers from the average.
active_mask = self._active_layers.get(spec.name)
for ti in range(n_times):
valid = ~np.isnan(per_layer[ti, :])
if active_mask is not None:
valid &= active_mask
if valid.any():
result[ti] = float(np.mean(per_layer[ti, valid]))
return result
[docs]
def write_smp(self, output_path: Path, result: ExtractionResult) -> None:
"""Write results to SMP format for PEST/IWFM2OBS.
Parameters
----------
output_path : Path
Output SMP file path.
result : ExtractionResult
Results from :meth:`extract`.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
for name in result.names:
if name not in result.values:
continue
vals = result.values[name]
for ti, t in enumerate(result.times):
if np.isnan(vals[ti]):
continue
dt = np.datetime64(t, "s").astype("datetime64[s]")
ts = str(dt)
# Parse to MM/DD/YYYY HH:MM:SS
date_part = ts[:10] # YYYY-MM-DD
time_part = ts[11:19] if len(ts) > 10 else "00:00:00"
ymd = date_part.split("-")
date_str = f"{ymd[1]}/{ymd[2]}/{ymd[0]}"
# Fixed-format SMP: (A25,A12,A12,A11)
f.write(f"{name:<25s}{date_str:>12s}{time_part:>12s}{vals[ti]:11.3f}\n")
logger.info("Wrote SMP to %s (%d locations)", output_path, len(result.names))
[docs]
def write_cache(self, output_path: Path, result: ExtractionResult) -> None:
"""Write extraction results to HDF5 cache.
Parameters
----------
output_path : Path
Output HDF5 file path.
result : ExtractionResult
Results from :meth:`extract`.
"""
import h5py
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with h5py.File(output_path, "w") as hf:
hf.attrs["data_type"] = result.data_type
hf.attrs["incremental"] = result.incremental
time_strs = [str(t) for t in result.times]
hf.create_dataset("times", data=np.array(time_strs, dtype="S30"))
hf.create_dataset(
"names",
data=np.array(result.names, dtype="S50"),
)
# Aggregated values: (n_times, n_names)
n_times = len(result.times)
n_names = len(result.names)
vals_arr = np.full((n_times, n_names), np.nan)
for ni, name in enumerate(result.names):
if name in result.values:
vals_arr[:, ni] = result.values[name]
hf.create_dataset("values", data=vals_arr, compression="gzip")
# Per-layer: (n_times, n_names, n_layers)
if result.per_layer:
n_layers = next(iter(result.per_layer.values())).shape[1]
pl_arr = np.full((n_times, n_names, n_layers), np.nan)
for ni, name in enumerate(result.names):
if name in result.per_layer:
pl_arr[:, ni, :] = result.per_layer[name]
hf.create_dataset("per_layer", data=pl_arr, compression="gzip")
logger.info("Wrote cache to %s", output_path)
[docs]
@staticmethod
def load_cache(cache_path: Path) -> ExtractionResult:
"""Load previously cached extraction results.
Parameters
----------
cache_path : Path
Path to the HDF5 cache file.
Returns
-------
ExtractionResult
"""
import h5py
with h5py.File(cache_path, "r") as hf:
times = np.array([np.datetime64(s.decode()) for s in hf["times"][:]])
names = [s.decode().strip() for s in hf["names"][:]]
data_type = hf.attrs.get("data_type", "HEAD")
incremental = bool(hf.attrs.get("incremental", False))
result = ExtractionResult(
times=times,
names=names,
data_type=data_type,
incremental=incremental,
)
if "values" in hf:
vals_arr = hf["values"][:]
for ni, name in enumerate(names):
result.values[name] = vals_arr[:, ni]
if "per_layer" in hf:
pl_arr = hf["per_layer"][:]
for ni, name in enumerate(names):
result.per_layer[name] = pl_arr[:, ni, :]
return result
[docs]
class FortranBackend:
"""Run ResultsExtract Fortran executable as a fast extraction backend.
Generates an input file, invokes ``ResultsExtract_x64.exe``, and parses
the IWFM columnar ``.out`` output back into an :class:`ExtractionResult`.
This is ~300x faster than the pure-Python FE extraction for large
datasets (e.g. 31K subsidence locations × 577 timesteps: 3s vs 17min).
Parameters
----------
exe_path : Path
Path to ``ResultsExtract_x64.exe``.
sim_file : Path
Path to the IWFM simulation main file.
data_type : str
``'HEAD'`` or ``'SUBSIDENCE'``.
incremental : bool
If True and data_type is ``'SUBSIDENCE'``, the Fortran code produces
incremental output automatically.
"""
[docs]
def __init__(
self,
exe_path: Path,
sim_file: Path,
data_type: str = "SUBSIDENCE",
incremental: bool = True,
) -> None:
self._exe_path = Path(exe_path)
self._sim_file = Path(sim_file)
self._data_type = data_type.upper()
self._incremental = incremental if self._data_type == "SUBSIDENCE" else False
[docs]
@staticmethod
def find_exe(search_dirs: list[Path] | None = None) -> Path | None:
"""Locate ``ResultsExtract_x64.exe`` in common locations."""
candidates = [Path("tools") / "ResultsExtract_x64.exe"]
if search_dirs:
candidates.extend(d / "ResultsExtract_x64.exe" for d in search_dirs)
for p in candidates:
if p.exists():
return p
return None
[docs]
def generate_input(
self,
specs: list[ExtractionSpec],
output_file: str = "RE_OUT.out",
factxy: float = 1.0,
) -> tuple[str, list[str]]:
"""Generate a ResultsExtract input file from ExtractionSpec objects.
Parameters
----------
specs : list[ExtractionSpec]
Extraction specifications (coordinates already in model units).
output_file : str
Name for the ResultsExtract output file.
factxy : float
Coordinate conversion factor written to the input file.
Returns
-------
tuple[str, list[str]]
(input file content, ordered list of spec names)
"""
lines: list[str] = []
lines.append(f" {self._sim_file!s:<40s} / SIMFILE")
lines.append(f" {self._data_type:<40s} / DATATYPE")
lines.append(f" {output_file:<40s} / OUTFILE")
lines.append(f" {len(specs):<40d} / NHYD")
lines.append(f" {factxy:<40.5f} / FACTXY")
names: list[str] = []
for i, spec in enumerate(specs, 1):
layer = max(spec.layer, 0)
lines.append(f" {i:<6d} 0 {layer:<4d} {spec.x:<18.4f} {spec.y:<18.4f} {spec.name}")
names.append(spec.name)
return "\n".join(lines) + "\n", names
[docs]
def run(
self,
specs: list[ExtractionSpec],
work_dir: Path | None = None,
factxy: float = 1.0,
) -> ExtractionResult:
"""Run ResultsExtract and parse the output.
Parameters
----------
specs : list[ExtractionSpec]
Extraction specifications (coordinates in model units).
work_dir : Path, optional
Working directory for the subprocess.
factxy : float
Coordinate conversion factor.
Returns
-------
ExtractionResult
Raises
------
FileNotFoundError
If the executable is not found.
RuntimeError
If ResultsExtract reports a FATAL error.
"""
import subprocess
if not self._exe_path.exists():
raise FileNotFoundError(f"ResultsExtract not found: {self._exe_path}")
cwd = Path(work_dir) if work_dir else Path.cwd()
out_name = f"RE_{self._data_type}_OUT.out"
input_content, names = self.generate_input(specs, out_name, factxy)
input_path = cwd / f"resultsextract_{self._data_type.lower()}_auto.in"
input_path.write_text(input_content)
logger.info(
"Running ResultsExtract (%s, %d locations)...",
self._data_type,
len(specs),
)
try:
proc = subprocess.run(
[str(self._exe_path), str(input_path)],
cwd=str(cwd),
capture_output=True,
text=True,
timeout=300,
)
if proc.returncode != 0:
raise RuntimeError(
f"ResultsExtract exited with code {proc.returncode}:\n"
f"{proc.stderr or proc.stdout}"
)
except subprocess.TimeoutExpired as exc:
raise RuntimeError("ResultsExtract timed out after 300 seconds") from exc
finally:
# Clean up input file regardless of success/failure
if input_path.exists():
input_path.unlink(missing_ok=True)
msg_file = cwd / "ResultsExtract_Messages.out"
if msg_file.exists():
msg_text = msg_file.read_text()
if "* FATAL:" in msg_text:
raise RuntimeError(f"ResultsExtract FATAL error:\n{msg_text}")
out_path = cwd / out_name
if not out_path.exists():
raise RuntimeError(f"ResultsExtract output not found: {out_path}")
return self._parse_output(out_path, names)
def _parse_output(
self,
out_path: Path,
names: list[str],
) -> ExtractionResult:
"""Parse IWFM columnar .out file into ExtractionResult.
The .out file has header lines starting with ``*`` followed by
data lines: ``MM/DD/YYYY_HH:MM val1 val2 val3 ...``
"""
import re
from datetime import datetime
times_list: list[np.datetime64] = []
data_rows: list[list[float]] = []
n_names = len(names)
with open(out_path) as f:
for line in f:
stripped = line.strip()
if not stripped or stripped.startswith("*"):
continue
ts_match = re.match(r"(\d{2}/\d{2}/\d{4})_(\d{2}:\d{2})", stripped)
if not ts_match:
continue
date_str = ts_match.group(1)
time_str = ts_match.group(2)
if time_str.startswith("24:"):
time_str = "00:00"
try:
dt = datetime.strptime(f"{date_str} {time_str}", "%m/%d/%Y %H:%M")
except ValueError:
continue
times_list.append(np.datetime64(dt))
val_str = stripped[ts_match.end() :]
vals = []
for v in val_str.split():
try:
vals.append(float(v))
except ValueError:
vals.append(float("nan"))
while len(vals) < n_names:
vals.append(float("nan"))
data_rows.append(vals[:n_names])
times = np.array(times_list)
result = ExtractionResult(
times=times,
names=list(names),
data_type=self._data_type,
incremental=self._incremental,
)
if data_rows:
data = np.array(data_rows, dtype=np.float64)
for ni, name in enumerate(names):
result.values[name] = data[:, ni]
logger.info(
"Parsed ResultsExtract output: %d timesteps × %d locations",
len(times),
n_names,
)
return result