Source code for pyiwfm.io.supply_adjust
"""
Supply adjustment file reader and writer for IWFM.
IWFM supply adjustment files use the IntTSDataInFileType format:
- Header comments
- NCOLADJ (number of columns)
- NSPADJ (time step update frequency)
- NFQADJ (data repetition frequency)
- DSSFL (DSS filename, blank for inline)
- Data lines: timestamp + integer adjustment codes (00, 01, 10)
The adjustment codes are two-digit integers:
- 1st digit: 0 = no agriculture adjustment, 1 = adjust agriculture
- 2nd digit: 0 = no urban adjustment, 1 = adjust urban
- Combined: 00=none, 01=urban only, 10=ag only, 11=both (deprecated)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from pyiwfm.io.timeseries_ascii import (
_strip_inline_comment,
format_iwfm_timestamp,
parse_iwfm_timestamp,
)
logger = logging.getLogger(__name__)
# IWFM line-comment characters (must appear in column 1).
_LINE_COMMENT_CHARS = ("C", "c", "*")
def _is_fortran_comment(line: str) -> bool:
"""Check if a line is a Fortran-style IWFM comment.
Only C, c, or * in column 1 count as line comments.
This distinction matters for blank DSSFL lines like
``' / DSSFL'`` which are data lines (empty value).
"""
if not line or not line.strip():
return False # blank lines are not comments — caller decides handling
return line[0] in _LINE_COMMENT_CHARS
[docs]
@dataclass
class SupplyAdjustment:
"""Parsed supply adjustment specification data.
Attributes:
n_columns: Number of adjustment columns (NCOLADJ).
nsp: Time step update frequency (NSPADJ).
nfq: Data repetition frequency (NFQADJ).
dss_file: DSS filename (empty string if inline data).
times: List of timestamps for each data row.
values: List of rows, each row is a list of integer adjustment codes.
header_lines: Original header comment lines.
"""
n_columns: int = 0
nsp: int = 1
nfq: int = 0
dss_file: str = ""
times: list[datetime] = field(default_factory=list)
values: list[list[int]] = field(default_factory=list)
header_lines: list[str] = field(default_factory=list)
[docs]
def read_supply_adjustment(filepath: Path | str) -> SupplyAdjustment:
"""Read a supply adjustment file.
Parses the IWFM integer time series format (NCOL, NSP, NFQ, DSSFL)
followed by timestamp + integer data rows.
Args:
filepath: Path to the supply adjustment file.
Returns:
SupplyAdjustment with parsed data.
Raises:
FileNotFoundError: If file does not exist.
ValueError: If file format is invalid.
"""
filepath = Path(filepath)
if not filepath.exists():
raise FileNotFoundError(f"Supply adjustment file not found: {filepath}")
result = SupplyAdjustment()
with open(filepath, errors="replace") as f:
# Phase 1: Read NCOLADJ (first non-comment value)
for line in f:
if _is_fortran_comment(line):
result.header_lines.append(line.rstrip("\n"))
continue
value_str = _strip_inline_comment(line)
if not value_str:
continue
result.n_columns = int(value_str)
break
# Phase 2: Read NSPADJ (second non-comment value)
for line in f:
if _is_fortran_comment(line):
continue
value_str = _strip_inline_comment(line)
if not value_str:
continue
result.nsp = int(value_str)
break
# Phase 3: Read NFQADJ (third non-comment value)
for line in f:
if _is_fortran_comment(line):
continue
value_str = _strip_inline_comment(line)
if not value_str:
continue
result.nfq = int(value_str)
break
# Phase 4: Read DSSFL (may be blank)
# Use _is_fortran_comment (not _is_comment_line) because a blank
# DSSFL line like " / DSSFL" must NOT be treated as a comment.
for line in f:
if _is_fortran_comment(line):
continue
if not line.strip():
result.dss_file = ""
break
value_str = _strip_inline_comment(line)
result.dss_file = value_str # May be empty string
break
# Phase 5: Read data lines (timestamp + integer codes)
# Use token-based parsing (split on whitespace) instead of
# fixed-width slicing.
for line in f:
if _is_fortran_comment(line):
continue
stripped = line.strip()
if not stripped:
continue
# Try to parse as a timestamp data line
try:
tokens = stripped.split()
if not tokens:
continue
dt = parse_iwfm_timestamp(tokens[0])
# Remaining tokens are integer adjustment codes
int_values = [int(v) for v in tokens[1:]]
result.times.append(dt)
result.values.append(int_values)
except ValueError:
# Skip non-data lines (DSS pathnames, extra comments, etc.)
continue
logger.info(
"Read supply adjustment: %d columns, %d rows from %s",
result.n_columns,
len(result.times),
filepath,
)
return result
[docs]
def write_supply_adjustment(
data: SupplyAdjustment,
filepath: Path | str,
) -> Path:
"""Write a supply adjustment file.
Writes the IWFM integer time series format: NCOL, NSP, NFQ, DSSFL,
followed by timestamp + integer data rows.
Args:
data: SupplyAdjustment data to write.
filepath: Output file path.
Returns:
Path to the written file.
"""
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "w") as f:
# Header
f.write(
"C*******************************************************************************\n"
)
f.write("C\n")
f.write("C SUPPLY ADJUSTMENT SPECIFICATIONS\n")
f.write("C for IWFM Simulation\n")
f.write("C\n")
f.write("C Generated by pyiwfm\n")
f.write(f"C {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(
"C*******************************************************************************\n"
)
f.write("C\n")
f.write("C NCOLADJ: Number of columns in the supply adjustment specifications data\n")
f.write(
"C NSPADJ : Number of time steps to update the supply adjustment specifications\n"
)
f.write("C NFQADJ : Repetition frequency of the supply adjustment specifications data\n")
f.write("C DSSFL : DSS filename (blank = inline data)\n")
f.write("C\n")
f.write(
"C-------------------------------------------------------------------------------\n"
)
f.write("C VALUE DESCRIPTION\n")
f.write(
"C-------------------------------------------------------------------------------\n"
)
# Parameters (no FACTOR for integer TS)
f.write(f" {data.n_columns:<38}/ NCOLADJ\n")
f.write(f" {data.nsp:<38}/ NSPADJ\n")
f.write(f" {data.nfq:<38}/ NFQADJ\n")
dss_str = data.dss_file if data.dss_file else ""
f.write(f" {dss_str:<44}/ DSSFL\n")
# Data section header
f.write(
"C*******************************************************************************\n"
)
f.write("C Supply Adjustment Specifications Data\n")
f.write("C\n")
f.write("C ITADJ: Time\n")
f.write("C KADJ : Supply adjustment code (2-digit: 1st=ag, 2nd=urban)\n")
f.write("C 00=None, 01=Urban, 10=Ag\n")
f.write("C\n")
f.write(
"C-------------------------------------------------------------------------------\n"
)
# Data rows
for i, dt in enumerate(data.times):
ts_str = format_iwfm_timestamp(dt)
vals = data.values[i] if i < len(data.values) else []
val_strs = [f"\t{v:02d}" for v in vals]
f.write(f" {ts_str}{''.join(val_strs)}\n")
logger.info("Wrote supply adjustment: %s", filepath)
return filepath