"""
Unified Time Series I/O Infrastructure for IWFM.
This module provides a unified interface for reading and writing time series data
across all supported formats (ASCII, DSS, HDF5). It enables format-agnostic
handling of IWFM time series data with automatic format detection and conversion.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any
import h5py
import numpy as np
from numpy.typing import NDArray
from pyiwfm.core.timeseries import TimeSeries, TimeSeriesCollection
[docs]
class TimeSeriesFileType(Enum):
"""Supported time series file formats."""
ASCII = "ascii"
DSS = "dss"
HDF5 = "hdf5"
BINARY = "binary"
[docs]
class TimeUnit(Enum):
"""Standard time units for IWFM."""
SECOND = "second"
MINUTE = "minute"
HOUR = "hour"
DAY = "day"
WEEK = "week"
MONTH = "month"
YEAR = "year"
[docs]
@dataclass
class UnifiedTimeSeriesConfig:
"""
Configuration for unified time series reading.
Attributes:
filepath: Path to the time series file
file_type: Type of file (auto-detected if None)
n_columns: Expected number of columns (for validation)
column_ids: Entity IDs for each column
data_unit: Expected data unit
time_unit: Expected time unit
conversion_factor: Factor to apply to values
is_rate_data: Whether to normalize by timestep
recycling_interval: Recycle period (0 = no recycling)
start_filter: Only read data after this time
end_filter: Only read data before this time
dss_pathname: DSS pathname pattern (for DSS files)
hdf5_dataset: HDF5 dataset path (for HDF5 files)
"""
filepath: Path
file_type: TimeSeriesFileType | None = None
n_columns: int | None = None
column_ids: list[int] | None = None
data_unit: str = ""
time_unit: str = ""
conversion_factor: float = 1.0
is_rate_data: bool = False
recycling_interval: int = 0
start_filter: datetime | None = None
end_filter: datetime | None = None
# Format-specific options
dss_pathname: str | None = None # For DSS files
hdf5_dataset: str | None = None # For HDF5 files
[docs]
class BaseTimeSeriesReader(ABC):
"""Abstract base class for time series readers."""
[docs]
@abstractmethod
def read(
self,
filepath: Path | str,
**kwargs: Any,
) -> tuple[NDArray[np.datetime64], NDArray[np.float64], TimeSeriesMetadata]:
"""
Read time series data from file.
Args:
filepath: Path to the file
**kwargs: Format-specific options
Returns:
Tuple of (times, values, metadata)
"""
...
[docs]
class AsciiTimeSeriesAdapter(BaseTimeSeriesReader):
"""Adapter for ASCII time series files."""
[docs]
def read(
self,
filepath: Path | str,
**kwargs: Any,
) -> tuple[NDArray[np.datetime64], NDArray[np.float64], TimeSeriesMetadata]:
"""Read ASCII time series file."""
from pyiwfm.io.timeseries_ascii import TimeSeriesReader
reader = TimeSeriesReader()
times, values, config = reader.read(filepath)
# Convert to numpy datetime64
np_times = np.array(times, dtype="datetime64[s]")
metadata = TimeSeriesMetadata(
file_type=TimeSeriesFileType.ASCII,
n_columns=config.n_columns,
column_ids=config.column_ids,
conversion_factor=config.factor,
source_path=Path(filepath),
n_timesteps=len(times),
start_time=times[0] if times else None,
end_time=times[-1] if times else None,
)
return np_times, values, metadata
[docs]
class DssTimeSeriesAdapter(BaseTimeSeriesReader):
"""Adapter for HEC-DSS time series files."""
[docs]
def read(
self,
filepath: Path | str,
**kwargs: Any,
) -> tuple[NDArray[np.datetime64], NDArray[np.float64], TimeSeriesMetadata]:
"""Read DSS time series file."""
try:
from pyiwfm.io.dss import DSSTimeSeriesReader
except ImportError as e:
raise ImportError(
"DSS support requires the bundled HEC-DSS library. "
"Check that pyiwfm.io.dss is importable."
) from e
pathname = kwargs.get("pathname", "")
reader = DSSTimeSeriesReader() # type: ignore[call-arg]
times, values = reader.read(filepath, pathname) # type: ignore[attr-defined]
metadata = TimeSeriesMetadata(
file_type=TimeSeriesFileType.DSS,
n_columns=values.shape[1] if values.ndim > 1 else 1,
source_path=Path(filepath),
n_timesteps=len(times),
start_time=times[0].astype(datetime) if len(times) > 0 else None,
end_time=times[-1].astype(datetime) if len(times) > 0 else None,
)
return times, values, metadata
[docs]
class Hdf5TimeSeriesAdapter(BaseTimeSeriesReader):
"""Adapter for HDF5 time series files."""
[docs]
def read(
self,
filepath: Path | str,
**kwargs: Any,
) -> tuple[NDArray[np.datetime64], NDArray[np.float64], TimeSeriesMetadata]:
"""Read HDF5 time series file."""
dataset_path = kwargs.get("dataset", "/timeseries/data")
time_path = kwargs.get("time_dataset", "/timeseries/time")
with h5py.File(filepath, "r") as f:
if time_path in f:
times = np.array(f[time_path][:])
# Convert to datetime64 if necessary
if times.dtype.kind in ("i", "f"):
# Assume Julian dates or epoch seconds
times = np.array(times, dtype="datetime64[s]")
else:
times = np.array([], dtype="datetime64[s]")
if dataset_path in f:
values = np.array(f[dataset_path][:])
else:
values = np.array([])
metadata = TimeSeriesMetadata(
file_type=TimeSeriesFileType.HDF5,
n_columns=values.shape[1] if values.ndim > 1 else 1,
source_path=Path(filepath),
n_timesteps=len(times),
)
return times, values, metadata
[docs]
class UnifiedTimeSeriesReader:
"""
Unified reader for all time series formats.
This class provides a single interface for reading time series data
from ASCII, DSS, and HDF5 files with automatic format detection and
consistent output format.
Example:
>>> reader = UnifiedTimeSeriesReader()
>>> config = UnifiedTimeSeriesConfig(filepath=Path("pumping.dat"))
>>> times, values, metadata = reader.read(config)
>>> # Or with auto-detection
>>> times, values, metadata = reader.read_file("pumping.dat")
"""
[docs]
def __init__(self) -> None:
"""Initialize the unified reader with format adapters."""
self._adapters: dict[TimeSeriesFileType, BaseTimeSeriesReader] = {
TimeSeriesFileType.ASCII: AsciiTimeSeriesAdapter(),
}
# HDF5 is always available (h5py is a core dependency)
self._adapters[TimeSeriesFileType.HDF5] = Hdf5TimeSeriesAdapter()
# DSS adapter: bundled ctypes library may fail to load on some platforms
try:
from pyiwfm.io.dss import HAS_DSS_LIBRARY
if HAS_DSS_LIBRARY:
self._adapters[TimeSeriesFileType.DSS] = DssTimeSeriesAdapter()
except ImportError:
pass
[docs]
def read(
self, config: UnifiedTimeSeriesConfig
) -> tuple[NDArray[np.datetime64], NDArray[np.float64], TimeSeriesMetadata]:
"""
Read time series data using configuration.
Args:
config: UnifiedTimeSeriesConfig with file path and options
Returns:
Tuple of (times, values, metadata)
"""
file_type = config.file_type or self._detect_format(config.filepath)
adapter = self._get_adapter(file_type)
# Build kwargs for adapter
kwargs: dict[str, Any] = {}
if config.dss_pathname:
kwargs["pathname"] = config.dss_pathname
if config.hdf5_dataset:
kwargs["dataset"] = config.hdf5_dataset
times, values, metadata = adapter.read(config.filepath, **kwargs)
# Apply conversion factor
if config.conversion_factor != 1.0:
values = values * config.conversion_factor
metadata.conversion_factor = config.conversion_factor
# Apply time filter
if config.start_filter or config.end_filter:
times, values = self._apply_time_filter(
times, values, config.start_filter, config.end_filter
)
metadata.n_timesteps = len(times)
# Handle recycling
if config.recycling_interval > 0:
metadata.recycling_interval = config.recycling_interval
return times, values, metadata
[docs]
def read_file(
self,
filepath: Path | str,
file_type: TimeSeriesFileType | None = None,
**kwargs: Any,
) -> tuple[NDArray[np.datetime64], NDArray[np.float64], TimeSeriesMetadata]:
"""
Convenience method to read a time series file.
Args:
filepath: Path to the file
file_type: File type (auto-detected if None)
**kwargs: Additional options passed to adapter
Returns:
Tuple of (times, values, metadata)
"""
filepath = Path(filepath)
file_type = file_type or self._detect_format(filepath)
adapter = self._get_adapter(file_type)
return adapter.read(filepath, **kwargs)
[docs]
def read_to_collection(
self,
filepath: Path | str,
column_ids: list[str] | None = None,
variable: str = "",
file_type: TimeSeriesFileType | None = None,
**kwargs: Any,
) -> TimeSeriesCollection:
"""
Read file and return as TimeSeriesCollection.
Args:
filepath: Path to the file
column_ids: Optional column identifiers
variable: Variable name for the collection
file_type: File type (auto-detected if None)
**kwargs: Additional options
Returns:
TimeSeriesCollection
"""
times, values, metadata = self.read_file(filepath, file_type, **kwargs)
if column_ids is None:
column_ids = [str(cid) for cid in metadata.column_ids]
# Ensure values is 2D
if values.ndim == 1:
values = values.reshape(-1, 1)
collection = TimeSeriesCollection(variable=variable)
for i, col_id in enumerate(column_ids):
if i < values.shape[1]:
ts = TimeSeries(
times=times,
values=values[:, i],
location=col_id,
)
collection.add(ts)
return collection
def _detect_format(self, filepath: Path) -> TimeSeriesFileType:
"""Detect file format from extension."""
suffix = filepath.suffix.lower()
if suffix in (".dss",):
return TimeSeriesFileType.DSS
elif suffix in (".hdf", ".h5", ".hdf5"):
return TimeSeriesFileType.HDF5
elif suffix in (".bin",):
return TimeSeriesFileType.BINARY
else:
# Default to ASCII for .dat, .txt, .in, etc.
return TimeSeriesFileType.ASCII
def _get_adapter(self, file_type: TimeSeriesFileType) -> BaseTimeSeriesReader:
"""Get adapter for file type."""
if file_type not in self._adapters:
raise ValueError(
f"No adapter available for {file_type.value}. "
f"Available formats: {list(self._adapters.keys())}"
)
return self._adapters[file_type]
def _apply_time_filter(
self,
times: NDArray[np.datetime64],
values: NDArray[np.float64],
start: datetime | None,
end: datetime | None,
) -> tuple[NDArray[np.datetime64], NDArray[np.float64]]:
"""Apply time filter to data."""
mask = np.ones(len(times), dtype=bool)
if start:
start_np = np.datetime64(start)
mask &= times >= start_np
if end:
end_np = np.datetime64(end)
mask &= times <= end_np
return times[mask], values[mask] if values.ndim == 1 else values[mask, :]
[docs]
class RecyclingTimeSeriesReader:
"""
Reader that handles IWFM time series recycling.
IWFM supports repeating time series patterns. For example, monthly
data for a single year can be recycled across the entire simulation period.
"""
[docs]
def __init__(self, base_reader: UnifiedTimeSeriesReader | None = None) -> None:
"""Initialize with optional base reader."""
self._reader = base_reader or UnifiedTimeSeriesReader()
[docs]
def read_with_recycling(
self,
filepath: Path | str,
target_times: NDArray[np.datetime64],
recycling_interval: int = 12,
**kwargs: Any,
) -> NDArray[np.float64]:
"""
Read time series and recycle to match target times.
Args:
filepath: Source file path
target_times: Target timestamps to generate values for
recycling_interval: Number of months to recycle (12 = yearly)
**kwargs: Additional read options
Returns:
Values array matching target_times
"""
times, values, _ = self._reader.read_file(filepath, **kwargs)
if len(times) == 0:
return np.zeros((len(target_times), values.shape[1] if values.ndim > 1 else 1))
# Build recycled values
result = np.zeros((len(target_times), values.shape[1] if values.ndim > 1 else 1))
# Convert times to month indices
source_months = np.array(
[(t.astype("datetime64[M]") - t.astype("datetime64[Y]")).astype(int) for t in times]
)
for i, target_time in enumerate(target_times):
target_month = (
target_time.astype("datetime64[M]") - target_time.astype("datetime64[Y]")
).astype(int)
# Find matching source index
source_idx = np.where(source_months == target_month % recycling_interval)[0]
if len(source_idx) > 0:
result[i] = values[source_idx[0]] if values.ndim == 1 else values[source_idx[0], :]
return result
# Convenience functions
[docs]
def read_timeseries_unified(
filepath: Path | str,
file_type: TimeSeriesFileType | None = None,
**kwargs: Any,
) -> tuple[NDArray[np.datetime64], NDArray[np.float64], TimeSeriesMetadata]:
"""
Read time series from any supported format.
Args:
filepath: Path to file
file_type: Optional explicit file type
**kwargs: Format-specific options
Returns:
Tuple of (times, values, metadata)
"""
reader = UnifiedTimeSeriesReader()
return reader.read_file(filepath, file_type, **kwargs)