Source code for pyiwfm.io.budget

"""
Budget file reader for IWFM simulation output.

This module provides classes for reading IWFM budget binary and HDF5 files,
which contain water balance data for groundwater, streams, lakes, root zone,
and other model components.

Example
-------
Read a groundwater budget file:

>>> from pyiwfm.io.budget import BudgetReader
>>> reader = BudgetReader("GW_Budget.hdf")
>>> print(reader.descriptor)
'GROUNDWATER BUDGET'
>>> print(reader.locations)
['Subregion 1', 'Subregion 2', ...]
>>> df = reader.get_dataframe(location="Subregion 1")
"""

from __future__ import annotations

import struct
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, BinaryIO, Literal, cast

import h5py
import numpy as np
import pandas as pd
from numpy.typing import NDArray

from pyiwfm.io.timeseries_ascii import iwfm_date_to_iso as iwfm_date_to_iso
from pyiwfm.io.timeseries_ascii import parse_iwfm_datetime as parse_iwfm_datetime

# Budget data type codes (from Budget_Parameters.f90)
BUDGET_DATA_TYPES = {
    1: "VR",  # Volumetric rate
    2: "VLB",  # Volume at beginning
    3: "VLE",  # Volume at end
    4: "AR",  # Area
    5: "LT",  # Length
    6: "VR_PotCUAW",  # Potential CUAW
    7: "VR_AgSupplyReq",  # Agricultural supply requirement
    8: "VR_AgShort",  # Agricultural shortage
    9: "VR_AgPump",  # Agricultural pumping
    10: "VR_AgDiv",  # Agricultural diversions
    11: "VR_AgOthIn",  # Agricultural other inflows
}

# DSS data type codes
DSS_DATA_TYPES = {
    1: "PER-CUM",  # Period cumulative
    2: "PER-AVER",  # Period average
}

# Unit markers in column headers
UNIT_MARKERS = {
    "@UNITVL@": "volume",
    "@UNITAR@": "area",
    "@UNITLT@": "length",
    "@LOCNAME@": "location",
    "@AREA@": "area_value",
}


[docs] @dataclass class TimeStepInfo: """Time step information from budget file.""" track_time: bool = True delta_t: float = 1.0 delta_t_minutes: int = 1440 # 1 day in minutes unit: str = "1DAY" start_datetime: datetime | None = None start_time: float = 0.0 n_timesteps: int = 0
[docs] @dataclass class ASCIIOutputInfo: """ASCII output formatting information.""" title_len: int = 160 n_titles: int = 0 titles: list[str] = field(default_factory=list) title_persist: list[bool] = field(default_factory=list) format_spec: str = "" n_column_header_lines: int = 3
[docs] @dataclass class LocationData: """Data structure for a single budget location.""" n_columns: int = 0 storage_units: int = 0 column_headers: list[str] = field(default_factory=list) column_types: list[int] = field(default_factory=list) column_widths: list[int] = field(default_factory=list) dss_pathnames: list[str] = field(default_factory=list) dss_data_types: list[int] = field(default_factory=list)
[docs] @dataclass class BudgetHeader: """Complete budget file header information.""" descriptor: str = "" timestep: TimeStepInfo = field(default_factory=TimeStepInfo) n_areas: int = 0 areas: NDArray[np.float64] = field(default_factory=lambda: np.array([])) ascii_output: ASCIIOutputInfo = field(default_factory=ASCIIOutputInfo) n_locations: int = 0 location_names: list[str] = field(default_factory=list) location_data: list[LocationData] = field(default_factory=list) file_position: int = 0 # Position after header in binary file
[docs] def julian_to_datetime(julian_day: float) -> datetime: """Convert Julian day to Python datetime. IWFM uses astronomical Julian dates. Excel Julian dates have an offset of 2415020 days. """ # Julian day 0 = January 1, 4713 BC (proleptic Julian calendar) # We use a reference point: Julian day 2440587.5 = January 1, 1970 (Unix epoch) unix_epoch_julian = 2440587.5 days_since_epoch = julian_day - unix_epoch_julian return datetime(1970, 1, 1) + timedelta(days=days_since_epoch)
[docs] def excel_julian_to_datetime(excel_julian: float) -> datetime: """Convert Excel-style Julian day to Python datetime. Excel day 1 = January 1, 1900. """ # Excel serial date 1 = January 1, 1900 excel_epoch = datetime(1899, 12, 30) # Day 0 is Dec 30, 1899 return excel_epoch + timedelta(days=excel_julian)
[docs] class BudgetReader: """ Reader for IWFM budget files (binary or HDF5 format). This class reads budget output files generated by IWFM simulation, including groundwater, stream, lake, and root zone budgets. Parameters ---------- filepath : str or Path Path to the budget file (.bin or .hdf/.h5). Attributes ---------- filepath : Path Path to the budget file. format : str File format ('binary' or 'hdf5'). header : BudgetHeader Parsed header information. descriptor : str Budget type descriptor (e.g., 'GROUNDWATER BUDGET'). locations : list[str] List of location names. n_timesteps : int Number of time steps in the file. Examples -------- >>> reader = BudgetReader("GW_Budget.hdf") >>> print(reader.descriptor) 'GROUNDWATER BUDGET' >>> print(reader.locations) ['Subregion 1', 'Subregion 2', 'Subregion 3'] >>> >>> # Get column headers for a location >>> headers = reader.get_column_headers("Subregion 1") >>> print(headers) ['Deep Percolation', 'Stream-GW Interaction', ...] >>> >>> # Read data as numpy array >>> times, data = reader.get_values("Subregion 1") >>> >>> # Read data as pandas DataFrame >>> df = reader.get_dataframe("Subregion 1") """
[docs] def __init__(self, filepath: str | Path) -> None: self.filepath = Path(filepath) if not self.filepath.exists(): raise FileNotFoundError(f"Budget file not found: {self.filepath}") # Detect format self.format = self._detect_format() # Read header self.header = self._read_header()
def _detect_format(self) -> Literal["binary", "hdf5"]: """Detect file format from extension and content.""" suffix = self.filepath.suffix.lower() if suffix in (".hdf", ".h5", ".hdf5"): return "hdf5" elif suffix in (".bin", ".out"): return "binary" else: # Try to detect from content try: with h5py.File(self.filepath, "r"): return "hdf5" except Exception: pass return "binary" def _read_header(self) -> BudgetHeader: """Read header from file.""" if self.format == "hdf5": return self._read_header_hdf5() else: return self._read_header_binary() @staticmethod def _h5_get(group: h5py.File | h5py.Group, key: str) -> Any: """Read a value from an HDF5 group. Checks child datasets/groups first, then HDF5 group attributes. IWFM stores most budget metadata as HDF5 attrs, not datasets. Returns the raw value (scalar or array) or None if not found. """ # Check child datasets first if key in group: obj = group[key] if isinstance(obj, h5py.Dataset): if obj.ndim == 0: return obj[()] return obj[:] return obj # Check HDF5 group attributes if key in group.attrs: return group.attrs[key] return None def _read_header_hdf5(self) -> BudgetHeader: """Read header from HDF5 file.""" header = BudgetHeader() _get = self._h5_get with h5py.File(self.filepath, "r") as f: # Find attributes group (try /Attributes/ first, then root) if "Attributes" in f: attrs_group = f["Attributes"] else: attrs_group = f # Read descriptor val = _get(attrs_group, "Descriptor") if val is not None: header.descriptor = val.decode() if isinstance(val, bytes) else str(val) # Read areas val = _get(attrs_group, "NAreas") if val is not None: header.n_areas = int(val) val = _get(attrs_group, "Areas") if val is not None: header.areas = np.array(val) # Read timestep info ts = TimeStepInfo() val = _get(attrs_group, "NTimeSteps") if val is not None: ts.n_timesteps = int(val) # Timestep metadata (may be stored with TimeStep% prefix) for key in ["TimeStep%TrackTime", "TrackTime"]: val = _get(attrs_group, key) if val is not None: ts.track_time = bool(int(val)) break for key in ["TimeStep%DeltaT", "DeltaT"]: val = _get(attrs_group, key) if val is not None: ts.delta_t = float(val) break for key in ["TimeStep%DeltaT_InMinutes", "DeltaT_InMinutes"]: val = _get(attrs_group, key) if val is not None: ts.delta_t_minutes = int(val) break for key in ["TimeStep%Unit", "Unit"]: val = _get(attrs_group, key) if val is not None: ts.unit = val.decode().strip() if isinstance(val, bytes) else str(val).strip() break for key in ["TimeStep%BeginTime", "BeginTime"]: val = _get(attrs_group, key) if val is not None: ts.start_time = float(val) break for key in ["TimeStep%BeginDateAndTime", "BeginDateAndTime"]: val = _get(attrs_group, key) if val is not None: date_str = val.decode().strip() if isinstance(val, bytes) else str(val).strip() ts.start_datetime = parse_iwfm_datetime(date_str) break header.timestep = ts # Read ASCII output info ascii_out = ASCIIOutputInfo() for key in ["TitleLen", "ASCIIOutput%TitleLen"]: val = _get(attrs_group, key) if val is not None: ascii_out.title_len = int(val) break for key in ["NTitles", "ASCIIOutput%NTitles"]: val = _get(attrs_group, key) if val is not None: ascii_out.n_titles = int(val) break for key in ["cTitles", "ASCIIOutput%cTitles"]: val = _get(attrs_group, key) if val is not None: ascii_out.titles = [t.decode() if isinstance(t, bytes) else str(t) for t in val] break header.ascii_output = ascii_out # Read location info for key in ["nLocations", "NLocations"]: val = _get(attrs_group, key) if val is not None: header.n_locations = int(val) break val = _get(attrs_group, "cLocationNames") if val is not None: header.location_names = [ n.decode().strip() if isinstance(n, bytes) else str(n).strip() for n in val ] # Infer n_locations from location names when attribute is missing if header.n_locations == 0 and header.location_names: header.n_locations = len(header.location_names) # Infer n_areas from areas array when attribute is missing if header.n_areas == 0 and header.areas is not None: header.n_areas = len(header.areas) # Read location data (column info) header.location_data = [] val = _get(attrs_group, "NLocationData") n_loc_data = int(val) if val is not None else 1 for i in range(n_loc_data): loc_data = LocationData() prefix = f"LocationData{i + 1}%" if n_loc_data > 1 else "" # Number of columns for key in [f"{prefix}NDataColumns", "NDataColumns"]: val = _get(attrs_group, key) if val is not None: loc_data.n_columns = int(val) break # Column headers for key in [f"{prefix}cFullColumnHeaders", "cFullColumnHeaders"]: val = _get(attrs_group, key) if val is not None: loc_data.column_headers = [ h.decode().strip() if isinstance(h, bytes) else str(h).strip() for h in val ] break # Column types for key in [f"{prefix}iDataColumnTypes", "iDataColumnTypes"]: val = _get(attrs_group, key) if val is not None: loc_data.column_types = list(val) break # Column widths for key in [f"{prefix}iColWidth", "iColWidth"]: val = _get(attrs_group, key) if val is not None: loc_data.column_widths = list(val) break header.location_data.append(loc_data) # Fallback: derive column headers from DSS path names if still empty. # Extract column names (parts[5]) from the first group of DSS paths # that share the same location part (parts[1]). This is location- # agnostic — it does NOT require the DSS path location to match # cLocationNames[0], which fixes small watershed budgets where the # HDF location is "WATERSHED 1" but the DSS path uses "WSHED_1". if ( header.location_data and not header.location_data[0].column_headers and header.location_names ): val = _get(attrs_group, "DSSOutput%cPathNames") if val is not None: first_loc_part: str | None = None col_names: list[str] = [] for p in val: s = p.decode().strip() if isinstance(p, bytes) else str(p).strip() parts = s.strip("/").split("/") if len(parts) >= 6: loc_part = parts[1].upper() if first_loc_part is None: first_loc_part = loc_part if loc_part == first_loc_part: col_names.append(parts[5]) else: break # moved to next location's paths if col_names: header.location_data[0].column_headers = col_names if header.location_data[0].n_columns == 0: header.location_data[0].n_columns = len(col_names) # Fallback: infer n_columns from data shape if header.location_data and header.location_data[0].n_columns == 0: for name in header.location_names: if name in f: obj = f[name] if isinstance(obj, h5py.Dataset) and obj.ndim == 2: header.location_data[0].n_columns = obj.shape[1] break # Generate generic column headers if still empty if ( header.location_data and not header.location_data[0].column_headers and header.location_data[0].n_columns > 0 ): n = header.location_data[0].n_columns header.location_data[0].column_headers = [f"Column {j + 1}" for j in range(n)] # If only one location data, replicate for all locations if len(header.location_data) == 1 and header.n_locations > 1: header.location_data = [header.location_data[0]] * header.n_locations # Try to get timestep count from data if not in attributes if ts.n_timesteps == 0 and header.location_names: expected_cols = header.location_data[0].n_columns if header.location_data else 0 for name in header.location_names: if name in f: loc_obj = f[name] if isinstance(loc_obj, h5py.Dataset) and loc_obj.ndim == 2: # Determine orientation using known n_columns if expected_cols > 0 and loc_obj.shape[0] == expected_cols: ts.n_timesteps = loc_obj.shape[1] else: ts.n_timesteps = loc_obj.shape[0] elif isinstance(loc_obj, h5py.Group): for key in loc_obj.keys(): ds = loc_obj[key] if isinstance(ds, h5py.Dataset) and ds.ndim == 2: if expected_cols > 0 and ds.shape[0] == expected_cols: ts.n_timesteps = ds.shape[1] else: ts.n_timesteps = ds.shape[0] break break return header def _read_header_binary(self) -> BudgetHeader: """Read header from binary file.""" header = BudgetHeader() with open(self.filepath, "rb") as f: # Read descriptor (100 chars) header.descriptor = self._read_fortran_string(f, 100) # Read timestep info ts = TimeStepInfo() ts.n_timesteps = self._read_fortran_int(f) ts.track_time = self._read_fortran_logical(f) ts.delta_t = self._read_fortran_real8(f) ts.delta_t_minutes = self._read_fortran_int(f) ts.unit = self._read_fortran_string(f, 10) if ts.track_time: date_str = self._read_fortran_string(f, 21) ts.start_datetime = parse_iwfm_datetime(date_str) ts.start_time = self._read_fortran_real8(f) header.timestep = ts # Read areas header.n_areas = self._read_fortran_int(f) if header.n_areas > 0: header.areas = self._read_fortran_real8_array(f, header.n_areas) # Read ASCII output info ascii_out = ASCIIOutputInfo() ascii_out.title_len = self._read_fortran_int(f) ascii_out.n_titles = self._read_fortran_int(f) ascii_out.titles = [] for _ in range(ascii_out.n_titles): ascii_out.titles.append(self._read_fortran_string(f, 1000)) ascii_out.title_persist = [] for _ in range(ascii_out.n_titles): ascii_out.title_persist.append(self._read_fortran_logical(f)) ascii_out.format_spec = self._read_fortran_string(f, 500) ascii_out.n_column_header_lines = self._read_fortran_int(f) header.ascii_output = ascii_out # Read locations header.n_locations = self._read_fortran_int(f) header.location_names = [] for _ in range(header.n_locations): header.location_names.append(self._read_fortran_string(f, 100)) # Read location data n_loc_data = self._read_fortran_int(f) header.location_data = [] for _ in range(n_loc_data): loc_data = LocationData() loc_data.n_columns = self._read_fortran_int(f) loc_data.storage_units = self._read_fortran_int(f) # Column headers loc_data.column_headers = [] for _ in range(loc_data.n_columns): loc_data.column_headers.append(self._read_fortran_string(f, 100)) # Column types loc_data.column_types = list(self._read_fortran_int_array(f, loc_data.n_columns)) # Column widths loc_data.column_widths = list(self._read_fortran_int_array(f, loc_data.n_columns)) # Multi-line column headers (skip for now) for _ in range(ascii_out.n_column_header_lines): for _ in range(loc_data.n_columns): self._read_fortran_string(f, 100) # Column header format specs (skip) for _ in range(ascii_out.n_column_header_lines): self._read_fortran_string(f, 500) header.location_data.append(loc_data) # DSS output info (skip for now) n_dss_pathnames = self._read_fortran_int(f) for _ in range(n_dss_pathnames): self._read_fortran_string(f, 80) n_dss_types = self._read_fortran_int(f) self._read_fortran_int_array(f, n_dss_types) # Record file position after header header.file_position = f.tell() return header def _read_fortran_string(self, f: BinaryIO, length: int) -> str: """Read Fortran string from binary file.""" # Fortran unformatted record: 4-byte length, data, 4-byte length rec_len = struct.unpack("i", f.read(4))[0] data = f.read(rec_len) f.read(4) # trailing length return data[:length].decode("ascii", errors="replace").strip() def _read_fortran_int(self, f: BinaryIO) -> int: """Read single Fortran integer from binary file.""" f.read(4) # record length val = struct.unpack("i", f.read(4))[0] f.read(4) # trailing length return cast(int, val) def _read_fortran_real8(self, f: BinaryIO) -> float: """Read single Fortran REAL(8) from binary file.""" f.read(4) # record length val = struct.unpack("d", f.read(8))[0] f.read(4) # trailing length return cast(float, val) def _read_fortran_logical(self, f: BinaryIO) -> bool: """Read single Fortran logical from binary file.""" f.read(4) # record length val = struct.unpack("i", f.read(4))[0] f.read(4) # trailing length return bool(val != 0) def _read_fortran_int_array(self, f: BinaryIO, n: int) -> NDArray[np.int32]: """Read Fortran integer array from binary file.""" f.read(4) # record length data = np.frombuffer(f.read(4 * n), dtype=np.int32) f.read(4) # trailing length return data def _read_fortran_real8_array(self, f: BinaryIO, n: int) -> NDArray[np.float64]: """Read Fortran REAL(8) array from binary file.""" f.read(4) # record length data = np.frombuffer(f.read(8 * n), dtype=np.float64) f.read(4) # trailing length return data @property def descriptor(self) -> str: """Return budget type descriptor.""" return self.header.descriptor @property def locations(self) -> list[str]: """Return list of location names.""" return self.header.location_names @property def n_locations(self) -> int: """Return number of locations.""" return self.header.n_locations @property def n_timesteps(self) -> int: """Return number of time steps.""" return self.header.timestep.n_timesteps
[docs] def get_location_index(self, location: str | int) -> int: """Get location index from name or index.""" if isinstance(location, int): if 0 <= location < self.n_locations: return location raise IndexError(f"Location index {location} out of range [0, {self.n_locations})") # Search by name for i, name in enumerate(self.locations): if name == location or name.lower() == location.lower(): return i raise KeyError(f"Location '{location}' not found. Available: {self.locations}")
[docs] def get_column_headers(self, location: str | int = 0) -> list[str]: """ Get column headers for a location. Parameters ---------- location : str or int Location name or index. Returns ------- list[str] List of column header names. """ loc_idx = self.get_location_index(location) # Determine which location data to use if len(self.header.location_data) == 1: loc_data = self.header.location_data[0] else: loc_data = self.header.location_data[loc_idx] # Clean up column headers (remove unit markers) headers = [] for h in loc_data.column_headers: for marker, replacement in UNIT_MARKERS.items(): h = h.replace(marker, f"({replacement})") headers.append(h.strip()) # IWFM HDF5 files may include "Time" as first column header even though # it is not a data column. Strip it when it causes a mismatch. if len(headers) > loc_data.n_columns > 0 and headers[0].lower() == "time": headers = headers[1:] return headers
[docs] def get_values( self, location: str | int = 0, columns: list[int] | None = None, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """ Read budget values for a location. Parameters ---------- location : str or int Location name or index. columns : list[int], optional Column indices to read (0-based). If None, reads all columns. Returns ------- tuple[NDArray[np.float64], NDArray[np.float64]] Tuple of (times, values) where: - times: 1D array of time values (or datetimes as floats) - values: 2D array of shape (n_timesteps, n_columns) """ loc_idx = self.get_location_index(location) if self.format == "hdf5": return self._read_values_hdf5(loc_idx, columns) else: return self._read_values_binary(loc_idx, columns)
def _read_values_hdf5( self, loc_idx: int, columns: list[int] | None = None, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Read values from HDF5 file.""" loc_name = self.locations[loc_idx] with h5py.File(self.filepath, "r") as f: if loc_name not in f: raise KeyError(f"Location '{loc_name}' not found in HDF5 file") loc_obj = f[loc_name] # Find the data — location may be a Dataset directly or a Group data = None if isinstance(loc_obj, h5py.Dataset): data = loc_obj[:] elif isinstance(loc_obj, h5py.Group): for key in loc_obj.keys(): if isinstance(loc_obj[key], h5py.Dataset): data = loc_obj[key][:] break if data is None: raise ValueError(f"No data found for location '{loc_name}'") # Data may be (n_columns, n_timesteps) or (n_timesteps, n_columns) if data.ndim == 1: data = data.reshape(1, -1) # Ensure shape is (n_timesteps, n_columns). # Use expected n_columns from header to determine orientation # instead of comparing shape dimensions (which fails when # n_timesteps < n_columns). if len(self.header.location_data) == 1: expected_cols = self.header.location_data[0].n_columns else: expected_cols = self.header.location_data[loc_idx].n_columns if expected_cols > 0 and data.shape[1] != expected_cols: data = data.T n_timesteps = data.shape[0] # Generate time array ts = self.header.timestep if ts.start_datetime: start = ts.start_datetime use_months = "MON" in ts.unit.upper() if ts.unit else False if use_months: from dateutil.relativedelta import relativedelta times = np.array( [(start + relativedelta(months=i)).timestamp() for i in range(n_timesteps)] ) else: delta = timedelta(minutes=ts.delta_t_minutes) times = np.array([(start + i * delta).timestamp() for i in range(n_timesteps)]) else: times = np.arange(n_timesteps) * ts.delta_t + ts.start_time # Select columns if specified if columns is not None: data = data[:, columns] return times, data def _read_values_binary( self, loc_idx: int, columns: list[int] | None = None, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Read values from binary file.""" # Get location data structure if len(self.header.location_data) == 1: loc_data = self.header.location_data[0] else: loc_data = self.header.location_data[loc_idx] n_cols = loc_data.n_columns n_timesteps = self.header.timestep.n_timesteps # Calculate total storage units per timestep if len(self.header.location_data) == 1: total_storage = loc_data.storage_units * self.n_locations loc_offset = loc_data.storage_units * loc_idx else: total_storage = sum(ld.storage_units for ld in self.header.location_data) loc_offset = sum(self.header.location_data[i].storage_units for i in range(loc_idx)) # Read data data = np.zeros((n_timesteps, n_cols), dtype=np.float64) with open(self.filepath, "rb") as f: for t in range(n_timesteps): # Calculate file position pos = ( self.header.file_position + t * total_storage * 8 # 8 bytes per REAL(8) + loc_offset * 8 ) f.seek(pos) # Read data for this location and timestep row = np.frombuffer(f.read(n_cols * 8), dtype=np.float64) data[t, :] = row # Generate time array ts = self.header.timestep if ts.start_datetime: start = ts.start_datetime use_months = "MON" in ts.unit.upper() if ts.unit else False if use_months: from dateutil.relativedelta import relativedelta times = np.array( [(start + relativedelta(months=i)).timestamp() for i in range(n_timesteps)] ) else: delta = timedelta(minutes=ts.delta_t_minutes) times = np.array([(start + i * delta).timestamp() for i in range(n_timesteps)]) else: times = np.arange(n_timesteps) * ts.delta_t + ts.start_time # Select columns if specified if columns is not None: data = data[:, columns] return times, data
[docs] def get_dataframe( self, location: str | int = 0, columns: list[int] | list[str] | None = None, *, length_factor: float = 1.0, area_factor: float = 1.0, volume_factor: float = 1.0, ) -> pd.DataFrame: """ Read budget values as a pandas DataFrame. When *all* conversion factors are left at their default of ``1.0`` the raw simulation values are returned unchanged. Pass non-unity factors (FACTLTOU / FACTAROU / FACTVLOU) to get unit-converted output — each column is multiplied by the factor that matches its IWFM data-type code (volume, area, or length). Parameters ---------- location : str or int Location name or index. columns : list, optional Column indices or names to read. If None, reads all columns. length_factor : float Multiplier for length columns (type 5). Default ``1.0``. area_factor : float Multiplier for area columns (type 4). Default ``1.0``. volume_factor : float Multiplier for volume columns (types 1-3, 6-11). Default ``1.0``. Returns ------- pd.DataFrame DataFrame with datetime index and budget columns. """ # Get column headers all_headers = self.get_column_headers(location) # Convert column names to indices if needed col_indices = None if columns is not None: # Pre-compute lookup dicts for O(1) matching exact_map = {h: i for i, h in enumerate(all_headers)} lower_map = {h.lower(): i for i, h in enumerate(all_headers)} col_indices = [] for c in columns: if isinstance(c, str): idx = exact_map.get(c) if idx is None: idx = lower_map.get(c.lower()) if idx is None: raise KeyError(f"Column '{c}' not found") col_indices.append(idx) else: col_indices.append(c) # Read data times, values = self.get_values(location, col_indices) # Create datetime index ts = self.header.timestep index: pd.DatetimeIndex | pd.Index[Any] if ts.start_datetime: index = pd.to_datetime(times, unit="s") else: index = pd.Index(times, name="Time") # Get column names if col_indices is not None: col_names = [all_headers[i] for i in col_indices] else: col_names = all_headers # Apply unit conversion when any factor is non-unity needs_conversion = length_factor != 1.0 or area_factor != 1.0 or volume_factor != 1.0 if needs_conversion: from pyiwfm.io.budget_utils import apply_unit_conversion loc_idx = self.get_location_index(location) if len(self.header.location_data) == 1: loc_data = self.header.location_data[0] else: loc_data = self.header.location_data[loc_idx] if loc_data.column_types: # Build column-type list for the selected columns if col_indices is not None: col_type_list = [ loc_data.column_types[i] if i < len(loc_data.column_types) else 1 for i in col_indices ] else: col_type_list = loc_data.column_types values = apply_unit_conversion( values, col_type_list, length_factor=length_factor, area_factor=area_factor, volume_factor=volume_factor, ) else: # No column type info — apply volume factor to everything values = values * volume_factor return pd.DataFrame(values, index=index, columns=col_names)
[docs] def get_all_dataframes(self) -> dict[str, pd.DataFrame]: """ Read budget data for all locations as DataFrames. Returns ------- dict[str, pd.DataFrame] Dictionary mapping location name to DataFrame. """ return {loc: self.get_dataframe(loc) for loc in self.locations}
[docs] def get_monthly_averages( self, location: str | int = 0, columns: list[int] | list[str] | None = None, ) -> pd.DataFrame: """ Calculate monthly averages for budget data. Parameters ---------- location : str or int Location name or index. columns : list, optional Column indices or names to include. Returns ------- pd.DataFrame DataFrame with monthly averaged values. """ df = self.get_dataframe(location, columns) if isinstance(df.index, pd.DatetimeIndex): result: pd.DataFrame = df.resample("MS").mean() return result else: # Assume monthly time steps and return as-is return df
[docs] def get_annual_totals( self, location: str | int = 0, columns: list[int] | list[str] | None = None, ) -> pd.DataFrame: """ Calculate annual totals for budget data. Parameters ---------- location : str or int Location name or index. columns : list, optional Column indices or names to include. Returns ------- pd.DataFrame DataFrame with annual total values. """ df = self.get_dataframe(location, columns) if isinstance(df.index, pd.DatetimeIndex): annual_result: pd.DataFrame = df.resample("YS").sum() return annual_result else: # Group by year (assume 12 time steps per year) n_years = len(df) // 12 if n_years == 0: single_row: pd.DataFrame = df.sum().to_frame().T return single_row annual = [] for i in range(n_years): annual.append(df.iloc[i * 12 : (i + 1) * 12].sum()) return pd.DataFrame(annual)
[docs] def get_cumulative( self, location: str | int = 0, columns: list[int] | list[str] | None = None, ) -> pd.DataFrame: """ Calculate cumulative sums for budget data. Parameters ---------- location : str or int Location name or index. columns : list, optional Column indices or names to include. Returns ------- pd.DataFrame DataFrame with cumulative values. """ df = self.get_dataframe(location, columns) result: pd.DataFrame = df.cumsum() return result
def __repr__(self) -> str: return ( f"BudgetReader('{self.filepath.name}', " f"format='{self.format}', " f"descriptor='{self.descriptor}', " f"n_locations={self.n_locations}, " f"n_timesteps={self.n_timesteps})" )