"""
Budget file reader for IWFM simulation output.
This module provides classes for reading IWFM budget binary and HDF5 files,
which contain water balance data for groundwater, streams, lakes, root zone,
and other model components.
Example
-------
Read a groundwater budget file:
>>> from pyiwfm.io.budget import BudgetReader
>>> reader = BudgetReader("GW_Budget.hdf")
>>> print(reader.descriptor)
'GROUNDWATER BUDGET'
>>> print(reader.locations)
['Subregion 1', 'Subregion 2', ...]
>>> df = reader.get_dataframe(location="Subregion 1")
"""
from __future__ import annotations
import struct
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, BinaryIO, Literal, cast
import h5py
import numpy as np
import pandas as pd
from numpy.typing import NDArray
from pyiwfm.io.timeseries_ascii import iwfm_date_to_iso as iwfm_date_to_iso
from pyiwfm.io.timeseries_ascii import parse_iwfm_datetime as parse_iwfm_datetime
# Budget data type codes (from Budget_Parameters.f90)
BUDGET_DATA_TYPES = {
1: "VR", # Volumetric rate
2: "VLB", # Volume at beginning
3: "VLE", # Volume at end
4: "AR", # Area
5: "LT", # Length
6: "VR_PotCUAW", # Potential CUAW
7: "VR_AgSupplyReq", # Agricultural supply requirement
8: "VR_AgShort", # Agricultural shortage
9: "VR_AgPump", # Agricultural pumping
10: "VR_AgDiv", # Agricultural diversions
11: "VR_AgOthIn", # Agricultural other inflows
}
# DSS data type codes
DSS_DATA_TYPES = {
1: "PER-CUM", # Period cumulative
2: "PER-AVER", # Period average
}
# Unit markers in column headers
UNIT_MARKERS = {
"@UNITVL@": "volume",
"@UNITAR@": "area",
"@UNITLT@": "length",
"@LOCNAME@": "location",
"@AREA@": "area_value",
}
[docs]
@dataclass
class TimeStepInfo:
"""Time step information from budget file."""
track_time: bool = True
delta_t: float = 1.0
delta_t_minutes: int = 1440 # 1 day in minutes
unit: str = "1DAY"
start_datetime: datetime | None = None
start_time: float = 0.0
n_timesteps: int = 0
[docs]
@dataclass
class ASCIIOutputInfo:
"""ASCII output formatting information."""
title_len: int = 160
n_titles: int = 0
titles: list[str] = field(default_factory=list)
title_persist: list[bool] = field(default_factory=list)
format_spec: str = ""
n_column_header_lines: int = 3
[docs]
@dataclass
class LocationData:
"""Data structure for a single budget location."""
n_columns: int = 0
storage_units: int = 0
column_headers: list[str] = field(default_factory=list)
column_types: list[int] = field(default_factory=list)
column_widths: list[int] = field(default_factory=list)
dss_pathnames: list[str] = field(default_factory=list)
dss_data_types: list[int] = field(default_factory=list)
[docs]
def julian_to_datetime(julian_day: float) -> datetime:
"""Convert Julian day to Python datetime.
IWFM uses astronomical Julian dates. Excel Julian dates have
an offset of 2415020 days.
"""
# Julian day 0 = January 1, 4713 BC (proleptic Julian calendar)
# We use a reference point: Julian day 2440587.5 = January 1, 1970 (Unix epoch)
unix_epoch_julian = 2440587.5
days_since_epoch = julian_day - unix_epoch_julian
return datetime(1970, 1, 1) + timedelta(days=days_since_epoch)
[docs]
def excel_julian_to_datetime(excel_julian: float) -> datetime:
"""Convert Excel-style Julian day to Python datetime.
Excel day 1 = January 1, 1900.
"""
# Excel serial date 1 = January 1, 1900
excel_epoch = datetime(1899, 12, 30) # Day 0 is Dec 30, 1899
return excel_epoch + timedelta(days=excel_julian)
[docs]
class BudgetReader:
"""
Reader for IWFM budget files (binary or HDF5 format).
This class reads budget output files generated by IWFM simulation,
including groundwater, stream, lake, and root zone budgets.
Parameters
----------
filepath : str or Path
Path to the budget file (.bin or .hdf/.h5).
Attributes
----------
filepath : Path
Path to the budget file.
format : str
File format ('binary' or 'hdf5').
header : BudgetHeader
Parsed header information.
descriptor : str
Budget type descriptor (e.g., 'GROUNDWATER BUDGET').
locations : list[str]
List of location names.
n_timesteps : int
Number of time steps in the file.
Examples
--------
>>> reader = BudgetReader("GW_Budget.hdf")
>>> print(reader.descriptor)
'GROUNDWATER BUDGET'
>>> print(reader.locations)
['Subregion 1', 'Subregion 2', 'Subregion 3']
>>>
>>> # Get column headers for a location
>>> headers = reader.get_column_headers("Subregion 1")
>>> print(headers)
['Deep Percolation', 'Stream-GW Interaction', ...]
>>>
>>> # Read data as numpy array
>>> times, data = reader.get_values("Subregion 1")
>>>
>>> # Read data as pandas DataFrame
>>> df = reader.get_dataframe("Subregion 1")
"""
[docs]
def __init__(self, filepath: str | Path) -> None:
self.filepath = Path(filepath)
if not self.filepath.exists():
raise FileNotFoundError(f"Budget file not found: {self.filepath}")
# Detect format
self.format = self._detect_format()
# Read header
self.header = self._read_header()
def _detect_format(self) -> Literal["binary", "hdf5"]:
"""Detect file format from extension and content."""
suffix = self.filepath.suffix.lower()
if suffix in (".hdf", ".h5", ".hdf5"):
return "hdf5"
elif suffix in (".bin", ".out"):
return "binary"
else:
# Try to detect from content
try:
with h5py.File(self.filepath, "r"):
return "hdf5"
except Exception:
pass
return "binary"
def _read_header(self) -> BudgetHeader:
"""Read header from file."""
if self.format == "hdf5":
return self._read_header_hdf5()
else:
return self._read_header_binary()
@staticmethod
def _h5_get(group: h5py.File | h5py.Group, key: str) -> Any:
"""Read a value from an HDF5 group.
Checks child datasets/groups first, then HDF5 group attributes.
IWFM stores most budget metadata as HDF5 attrs, not datasets.
Returns the raw value (scalar or array) or None if not found.
"""
# Check child datasets first
if key in group:
obj = group[key]
if isinstance(obj, h5py.Dataset):
if obj.ndim == 0:
return obj[()]
return obj[:]
return obj
# Check HDF5 group attributes
if key in group.attrs:
return group.attrs[key]
return None
def _read_header_hdf5(self) -> BudgetHeader:
"""Read header from HDF5 file."""
header = BudgetHeader()
_get = self._h5_get
with h5py.File(self.filepath, "r") as f:
# Find attributes group (try /Attributes/ first, then root)
if "Attributes" in f:
attrs_group = f["Attributes"]
else:
attrs_group = f
# Read descriptor
val = _get(attrs_group, "Descriptor")
if val is not None:
header.descriptor = val.decode() if isinstance(val, bytes) else str(val)
# Read areas
val = _get(attrs_group, "NAreas")
if val is not None:
header.n_areas = int(val)
val = _get(attrs_group, "Areas")
if val is not None:
header.areas = np.array(val)
# Read timestep info
ts = TimeStepInfo()
val = _get(attrs_group, "NTimeSteps")
if val is not None:
ts.n_timesteps = int(val)
# Timestep metadata (may be stored with TimeStep% prefix)
for key in ["TimeStep%TrackTime", "TrackTime"]:
val = _get(attrs_group, key)
if val is not None:
ts.track_time = bool(int(val))
break
for key in ["TimeStep%DeltaT", "DeltaT"]:
val = _get(attrs_group, key)
if val is not None:
ts.delta_t = float(val)
break
for key in ["TimeStep%DeltaT_InMinutes", "DeltaT_InMinutes"]:
val = _get(attrs_group, key)
if val is not None:
ts.delta_t_minutes = int(val)
break
for key in ["TimeStep%Unit", "Unit"]:
val = _get(attrs_group, key)
if val is not None:
ts.unit = val.decode().strip() if isinstance(val, bytes) else str(val).strip()
break
for key in ["TimeStep%BeginTime", "BeginTime"]:
val = _get(attrs_group, key)
if val is not None:
ts.start_time = float(val)
break
for key in ["TimeStep%BeginDateAndTime", "BeginDateAndTime"]:
val = _get(attrs_group, key)
if val is not None:
date_str = val.decode().strip() if isinstance(val, bytes) else str(val).strip()
ts.start_datetime = parse_iwfm_datetime(date_str)
break
header.timestep = ts
# Read ASCII output info
ascii_out = ASCIIOutputInfo()
for key in ["TitleLen", "ASCIIOutput%TitleLen"]:
val = _get(attrs_group, key)
if val is not None:
ascii_out.title_len = int(val)
break
for key in ["NTitles", "ASCIIOutput%NTitles"]:
val = _get(attrs_group, key)
if val is not None:
ascii_out.n_titles = int(val)
break
for key in ["cTitles", "ASCIIOutput%cTitles"]:
val = _get(attrs_group, key)
if val is not None:
ascii_out.titles = [t.decode() if isinstance(t, bytes) else str(t) for t in val]
break
header.ascii_output = ascii_out
# Read location info
for key in ["nLocations", "NLocations"]:
val = _get(attrs_group, key)
if val is not None:
header.n_locations = int(val)
break
val = _get(attrs_group, "cLocationNames")
if val is not None:
header.location_names = [
n.decode().strip() if isinstance(n, bytes) else str(n).strip() for n in val
]
# Infer n_locations from location names when attribute is missing
if header.n_locations == 0 and header.location_names:
header.n_locations = len(header.location_names)
# Infer n_areas from areas array when attribute is missing
if header.n_areas == 0 and header.areas is not None:
header.n_areas = len(header.areas)
# Read location data (column info)
header.location_data = []
val = _get(attrs_group, "NLocationData")
n_loc_data = int(val) if val is not None else 1
for i in range(n_loc_data):
loc_data = LocationData()
prefix = f"LocationData{i + 1}%" if n_loc_data > 1 else ""
# Number of columns
for key in [f"{prefix}NDataColumns", "NDataColumns"]:
val = _get(attrs_group, key)
if val is not None:
loc_data.n_columns = int(val)
break
# Column headers
for key in [f"{prefix}cFullColumnHeaders", "cFullColumnHeaders"]:
val = _get(attrs_group, key)
if val is not None:
loc_data.column_headers = [
h.decode().strip() if isinstance(h, bytes) else str(h).strip()
for h in val
]
break
# Column types
for key in [f"{prefix}iDataColumnTypes", "iDataColumnTypes"]:
val = _get(attrs_group, key)
if val is not None:
loc_data.column_types = list(val)
break
# Column widths
for key in [f"{prefix}iColWidth", "iColWidth"]:
val = _get(attrs_group, key)
if val is not None:
loc_data.column_widths = list(val)
break
header.location_data.append(loc_data)
# Fallback: derive column headers from DSS path names if still empty.
# Extract column names (parts[5]) from the first group of DSS paths
# that share the same location part (parts[1]). This is location-
# agnostic — it does NOT require the DSS path location to match
# cLocationNames[0], which fixes small watershed budgets where the
# HDF location is "WATERSHED 1" but the DSS path uses "WSHED_1".
if (
header.location_data
and not header.location_data[0].column_headers
and header.location_names
):
val = _get(attrs_group, "DSSOutput%cPathNames")
if val is not None:
first_loc_part: str | None = None
col_names: list[str] = []
for p in val:
s = p.decode().strip() if isinstance(p, bytes) else str(p).strip()
parts = s.strip("/").split("/")
if len(parts) >= 6:
loc_part = parts[1].upper()
if first_loc_part is None:
first_loc_part = loc_part
if loc_part == first_loc_part:
col_names.append(parts[5])
else:
break # moved to next location's paths
if col_names:
header.location_data[0].column_headers = col_names
if header.location_data[0].n_columns == 0:
header.location_data[0].n_columns = len(col_names)
# Fallback: infer n_columns from data shape
if header.location_data and header.location_data[0].n_columns == 0:
for name in header.location_names:
if name in f:
obj = f[name]
if isinstance(obj, h5py.Dataset) and obj.ndim == 2:
header.location_data[0].n_columns = obj.shape[1]
break
# Generate generic column headers if still empty
if (
header.location_data
and not header.location_data[0].column_headers
and header.location_data[0].n_columns > 0
):
n = header.location_data[0].n_columns
header.location_data[0].column_headers = [f"Column {j + 1}" for j in range(n)]
# If only one location data, replicate for all locations
if len(header.location_data) == 1 and header.n_locations > 1:
header.location_data = [header.location_data[0]] * header.n_locations
# Try to get timestep count from data if not in attributes
if ts.n_timesteps == 0 and header.location_names:
expected_cols = header.location_data[0].n_columns if header.location_data else 0
for name in header.location_names:
if name in f:
loc_obj = f[name]
if isinstance(loc_obj, h5py.Dataset) and loc_obj.ndim == 2:
# Determine orientation using known n_columns
if expected_cols > 0 and loc_obj.shape[0] == expected_cols:
ts.n_timesteps = loc_obj.shape[1]
else:
ts.n_timesteps = loc_obj.shape[0]
elif isinstance(loc_obj, h5py.Group):
for key in loc_obj.keys():
ds = loc_obj[key]
if isinstance(ds, h5py.Dataset) and ds.ndim == 2:
if expected_cols > 0 and ds.shape[0] == expected_cols:
ts.n_timesteps = ds.shape[1]
else:
ts.n_timesteps = ds.shape[0]
break
break
return header
def _read_header_binary(self) -> BudgetHeader:
"""Read header from binary file."""
header = BudgetHeader()
with open(self.filepath, "rb") as f:
# Read descriptor (100 chars)
header.descriptor = self._read_fortran_string(f, 100)
# Read timestep info
ts = TimeStepInfo()
ts.n_timesteps = self._read_fortran_int(f)
ts.track_time = self._read_fortran_logical(f)
ts.delta_t = self._read_fortran_real8(f)
ts.delta_t_minutes = self._read_fortran_int(f)
ts.unit = self._read_fortran_string(f, 10)
if ts.track_time:
date_str = self._read_fortran_string(f, 21)
ts.start_datetime = parse_iwfm_datetime(date_str)
ts.start_time = self._read_fortran_real8(f)
header.timestep = ts
# Read areas
header.n_areas = self._read_fortran_int(f)
if header.n_areas > 0:
header.areas = self._read_fortran_real8_array(f, header.n_areas)
# Read ASCII output info
ascii_out = ASCIIOutputInfo()
ascii_out.title_len = self._read_fortran_int(f)
ascii_out.n_titles = self._read_fortran_int(f)
ascii_out.titles = []
for _ in range(ascii_out.n_titles):
ascii_out.titles.append(self._read_fortran_string(f, 1000))
ascii_out.title_persist = []
for _ in range(ascii_out.n_titles):
ascii_out.title_persist.append(self._read_fortran_logical(f))
ascii_out.format_spec = self._read_fortran_string(f, 500)
ascii_out.n_column_header_lines = self._read_fortran_int(f)
header.ascii_output = ascii_out
# Read locations
header.n_locations = self._read_fortran_int(f)
header.location_names = []
for _ in range(header.n_locations):
header.location_names.append(self._read_fortran_string(f, 100))
# Read location data
n_loc_data = self._read_fortran_int(f)
header.location_data = []
for _ in range(n_loc_data):
loc_data = LocationData()
loc_data.n_columns = self._read_fortran_int(f)
loc_data.storage_units = self._read_fortran_int(f)
# Column headers
loc_data.column_headers = []
for _ in range(loc_data.n_columns):
loc_data.column_headers.append(self._read_fortran_string(f, 100))
# Column types
loc_data.column_types = list(self._read_fortran_int_array(f, loc_data.n_columns))
# Column widths
loc_data.column_widths = list(self._read_fortran_int_array(f, loc_data.n_columns))
# Multi-line column headers (skip for now)
for _ in range(ascii_out.n_column_header_lines):
for _ in range(loc_data.n_columns):
self._read_fortran_string(f, 100)
# Column header format specs (skip)
for _ in range(ascii_out.n_column_header_lines):
self._read_fortran_string(f, 500)
header.location_data.append(loc_data)
# DSS output info (skip for now)
n_dss_pathnames = self._read_fortran_int(f)
for _ in range(n_dss_pathnames):
self._read_fortran_string(f, 80)
n_dss_types = self._read_fortran_int(f)
self._read_fortran_int_array(f, n_dss_types)
# Record file position after header
header.file_position = f.tell()
return header
def _read_fortran_string(self, f: BinaryIO, length: int) -> str:
"""Read Fortran string from binary file."""
# Fortran unformatted record: 4-byte length, data, 4-byte length
rec_len = struct.unpack("i", f.read(4))[0]
data = f.read(rec_len)
f.read(4) # trailing length
return data[:length].decode("ascii", errors="replace").strip()
def _read_fortran_int(self, f: BinaryIO) -> int:
"""Read single Fortran integer from binary file."""
f.read(4) # record length
val = struct.unpack("i", f.read(4))[0]
f.read(4) # trailing length
return cast(int, val)
def _read_fortran_real8(self, f: BinaryIO) -> float:
"""Read single Fortran REAL(8) from binary file."""
f.read(4) # record length
val = struct.unpack("d", f.read(8))[0]
f.read(4) # trailing length
return cast(float, val)
def _read_fortran_logical(self, f: BinaryIO) -> bool:
"""Read single Fortran logical from binary file."""
f.read(4) # record length
val = struct.unpack("i", f.read(4))[0]
f.read(4) # trailing length
return bool(val != 0)
def _read_fortran_int_array(self, f: BinaryIO, n: int) -> NDArray[np.int32]:
"""Read Fortran integer array from binary file."""
f.read(4) # record length
data = np.frombuffer(f.read(4 * n), dtype=np.int32)
f.read(4) # trailing length
return data
def _read_fortran_real8_array(self, f: BinaryIO, n: int) -> NDArray[np.float64]:
"""Read Fortran REAL(8) array from binary file."""
f.read(4) # record length
data = np.frombuffer(f.read(8 * n), dtype=np.float64)
f.read(4) # trailing length
return data
@property
def descriptor(self) -> str:
"""Return budget type descriptor."""
return self.header.descriptor
@property
def locations(self) -> list[str]:
"""Return list of location names."""
return self.header.location_names
@property
def n_locations(self) -> int:
"""Return number of locations."""
return self.header.n_locations
@property
def n_timesteps(self) -> int:
"""Return number of time steps."""
return self.header.timestep.n_timesteps
[docs]
def get_location_index(self, location: str | int) -> int:
"""Get location index from name or index."""
if isinstance(location, int):
if 0 <= location < self.n_locations:
return location
raise IndexError(f"Location index {location} out of range [0, {self.n_locations})")
# Search by name
for i, name in enumerate(self.locations):
if name == location or name.lower() == location.lower():
return i
raise KeyError(f"Location '{location}' not found. Available: {self.locations}")
[docs]
def get_column_headers(self, location: str | int = 0) -> list[str]:
"""
Get column headers for a location.
Parameters
----------
location : str or int
Location name or index.
Returns
-------
list[str]
List of column header names.
"""
loc_idx = self.get_location_index(location)
# Determine which location data to use
if len(self.header.location_data) == 1:
loc_data = self.header.location_data[0]
else:
loc_data = self.header.location_data[loc_idx]
# Clean up column headers (remove unit markers)
headers = []
for h in loc_data.column_headers:
for marker, replacement in UNIT_MARKERS.items():
h = h.replace(marker, f"({replacement})")
headers.append(h.strip())
# IWFM HDF5 files may include "Time" as first column header even though
# it is not a data column. Strip it when it causes a mismatch.
if len(headers) > loc_data.n_columns > 0 and headers[0].lower() == "time":
headers = headers[1:]
return headers
[docs]
def get_values(
self,
location: str | int = 0,
columns: list[int] | None = None,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""
Read budget values for a location.
Parameters
----------
location : str or int
Location name or index.
columns : list[int], optional
Column indices to read (0-based). If None, reads all columns.
Returns
-------
tuple[NDArray[np.float64], NDArray[np.float64]]
Tuple of (times, values) where:
- times: 1D array of time values (or datetimes as floats)
- values: 2D array of shape (n_timesteps, n_columns)
"""
loc_idx = self.get_location_index(location)
if self.format == "hdf5":
return self._read_values_hdf5(loc_idx, columns)
else:
return self._read_values_binary(loc_idx, columns)
def _read_values_hdf5(
self,
loc_idx: int,
columns: list[int] | None = None,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Read values from HDF5 file."""
loc_name = self.locations[loc_idx]
with h5py.File(self.filepath, "r") as f:
if loc_name not in f:
raise KeyError(f"Location '{loc_name}' not found in HDF5 file")
loc_obj = f[loc_name]
# Find the data — location may be a Dataset directly or a Group
data = None
if isinstance(loc_obj, h5py.Dataset):
data = loc_obj[:]
elif isinstance(loc_obj, h5py.Group):
for key in loc_obj.keys():
if isinstance(loc_obj[key], h5py.Dataset):
data = loc_obj[key][:]
break
if data is None:
raise ValueError(f"No data found for location '{loc_name}'")
# Data may be (n_columns, n_timesteps) or (n_timesteps, n_columns)
if data.ndim == 1:
data = data.reshape(1, -1)
# Ensure shape is (n_timesteps, n_columns).
# Use expected n_columns from header to determine orientation
# instead of comparing shape dimensions (which fails when
# n_timesteps < n_columns).
if len(self.header.location_data) == 1:
expected_cols = self.header.location_data[0].n_columns
else:
expected_cols = self.header.location_data[loc_idx].n_columns
if expected_cols > 0 and data.shape[1] != expected_cols:
data = data.T
n_timesteps = data.shape[0]
# Generate time array
ts = self.header.timestep
if ts.start_datetime:
start = ts.start_datetime
use_months = "MON" in ts.unit.upper() if ts.unit else False
if use_months:
from dateutil.relativedelta import relativedelta
times = np.array(
[(start + relativedelta(months=i)).timestamp() for i in range(n_timesteps)]
)
else:
delta = timedelta(minutes=ts.delta_t_minutes)
times = np.array([(start + i * delta).timestamp() for i in range(n_timesteps)])
else:
times = np.arange(n_timesteps) * ts.delta_t + ts.start_time
# Select columns if specified
if columns is not None:
data = data[:, columns]
return times, data
def _read_values_binary(
self,
loc_idx: int,
columns: list[int] | None = None,
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Read values from binary file."""
# Get location data structure
if len(self.header.location_data) == 1:
loc_data = self.header.location_data[0]
else:
loc_data = self.header.location_data[loc_idx]
n_cols = loc_data.n_columns
n_timesteps = self.header.timestep.n_timesteps
# Calculate total storage units per timestep
if len(self.header.location_data) == 1:
total_storage = loc_data.storage_units * self.n_locations
loc_offset = loc_data.storage_units * loc_idx
else:
total_storage = sum(ld.storage_units for ld in self.header.location_data)
loc_offset = sum(self.header.location_data[i].storage_units for i in range(loc_idx))
# Read data
data = np.zeros((n_timesteps, n_cols), dtype=np.float64)
with open(self.filepath, "rb") as f:
for t in range(n_timesteps):
# Calculate file position
pos = (
self.header.file_position
+ t * total_storage * 8 # 8 bytes per REAL(8)
+ loc_offset * 8
)
f.seek(pos)
# Read data for this location and timestep
row = np.frombuffer(f.read(n_cols * 8), dtype=np.float64)
data[t, :] = row
# Generate time array
ts = self.header.timestep
if ts.start_datetime:
start = ts.start_datetime
use_months = "MON" in ts.unit.upper() if ts.unit else False
if use_months:
from dateutil.relativedelta import relativedelta
times = np.array(
[(start + relativedelta(months=i)).timestamp() for i in range(n_timesteps)]
)
else:
delta = timedelta(minutes=ts.delta_t_minutes)
times = np.array([(start + i * delta).timestamp() for i in range(n_timesteps)])
else:
times = np.arange(n_timesteps) * ts.delta_t + ts.start_time
# Select columns if specified
if columns is not None:
data = data[:, columns]
return times, data
[docs]
def get_dataframe(
self,
location: str | int = 0,
columns: list[int] | list[str] | None = None,
*,
length_factor: float = 1.0,
area_factor: float = 1.0,
volume_factor: float = 1.0,
) -> pd.DataFrame:
"""
Read budget values as a pandas DataFrame.
When *all* conversion factors are left at their default of ``1.0``
the raw simulation values are returned unchanged. Pass non-unity
factors (FACTLTOU / FACTAROU / FACTVLOU) to get unit-converted
output — each column is multiplied by the factor that matches its
IWFM data-type code (volume, area, or length).
Parameters
----------
location : str or int
Location name or index.
columns : list, optional
Column indices or names to read. If None, reads all columns.
length_factor : float
Multiplier for length columns (type 5). Default ``1.0``.
area_factor : float
Multiplier for area columns (type 4). Default ``1.0``.
volume_factor : float
Multiplier for volume columns (types 1-3, 6-11). Default ``1.0``.
Returns
-------
pd.DataFrame
DataFrame with datetime index and budget columns.
"""
# Get column headers
all_headers = self.get_column_headers(location)
# Convert column names to indices if needed
col_indices = None
if columns is not None:
# Pre-compute lookup dicts for O(1) matching
exact_map = {h: i for i, h in enumerate(all_headers)}
lower_map = {h.lower(): i for i, h in enumerate(all_headers)}
col_indices = []
for c in columns:
if isinstance(c, str):
idx = exact_map.get(c)
if idx is None:
idx = lower_map.get(c.lower())
if idx is None:
raise KeyError(f"Column '{c}' not found")
col_indices.append(idx)
else:
col_indices.append(c)
# Read data
times, values = self.get_values(location, col_indices)
# Create datetime index
ts = self.header.timestep
index: pd.DatetimeIndex | pd.Index[Any]
if ts.start_datetime:
index = pd.to_datetime(times, unit="s")
else:
index = pd.Index(times, name="Time")
# Get column names
if col_indices is not None:
col_names = [all_headers[i] for i in col_indices]
else:
col_names = all_headers
# Apply unit conversion when any factor is non-unity
needs_conversion = length_factor != 1.0 or area_factor != 1.0 or volume_factor != 1.0
if needs_conversion:
from pyiwfm.io.budget_utils import apply_unit_conversion
loc_idx = self.get_location_index(location)
if len(self.header.location_data) == 1:
loc_data = self.header.location_data[0]
else:
loc_data = self.header.location_data[loc_idx]
if loc_data.column_types:
# Build column-type list for the selected columns
if col_indices is not None:
col_type_list = [
loc_data.column_types[i] if i < len(loc_data.column_types) else 1
for i in col_indices
]
else:
col_type_list = loc_data.column_types
values = apply_unit_conversion(
values,
col_type_list,
length_factor=length_factor,
area_factor=area_factor,
volume_factor=volume_factor,
)
else:
# No column type info — apply volume factor to everything
values = values * volume_factor
return pd.DataFrame(values, index=index, columns=col_names)
[docs]
def get_all_dataframes(self) -> dict[str, pd.DataFrame]:
"""
Read budget data for all locations as DataFrames.
Returns
-------
dict[str, pd.DataFrame]
Dictionary mapping location name to DataFrame.
"""
return {loc: self.get_dataframe(loc) for loc in self.locations}
[docs]
def get_monthly_averages(
self,
location: str | int = 0,
columns: list[int] | list[str] | None = None,
) -> pd.DataFrame:
"""
Calculate monthly averages for budget data.
Parameters
----------
location : str or int
Location name or index.
columns : list, optional
Column indices or names to include.
Returns
-------
pd.DataFrame
DataFrame with monthly averaged values.
"""
df = self.get_dataframe(location, columns)
if isinstance(df.index, pd.DatetimeIndex):
result: pd.DataFrame = df.resample("MS").mean()
return result
else:
# Assume monthly time steps and return as-is
return df
[docs]
def get_annual_totals(
self,
location: str | int = 0,
columns: list[int] | list[str] | None = None,
) -> pd.DataFrame:
"""
Calculate annual totals for budget data.
Parameters
----------
location : str or int
Location name or index.
columns : list, optional
Column indices or names to include.
Returns
-------
pd.DataFrame
DataFrame with annual total values.
"""
df = self.get_dataframe(location, columns)
if isinstance(df.index, pd.DatetimeIndex):
annual_result: pd.DataFrame = df.resample("YS").sum()
return annual_result
else:
# Group by year (assume 12 time steps per year)
n_years = len(df) // 12
if n_years == 0:
single_row: pd.DataFrame = df.sum().to_frame().T
return single_row
annual = []
for i in range(n_years):
annual.append(df.iloc[i * 12 : (i + 1) * 12].sum())
return pd.DataFrame(annual)
[docs]
def get_cumulative(
self,
location: str | int = 0,
columns: list[int] | list[str] | None = None,
) -> pd.DataFrame:
"""
Calculate cumulative sums for budget data.
Parameters
----------
location : str or int
Location name or index.
columns : list, optional
Column indices or names to include.
Returns
-------
pd.DataFrame
DataFrame with cumulative values.
"""
df = self.get_dataframe(location, columns)
result: pd.DataFrame = df.cumsum()
return result
def __repr__(self) -> str:
return (
f"BudgetReader('{self.filepath.name}', "
f"format='{self.format}', "
f"descriptor='{self.descriptor}', "
f"n_locations={self.n_locations}, "
f"n_timesteps={self.n_timesteps})"
)