Source code for supy.util.converter.table.table

#!/usr/bin/env python
########################################################
# Table Converter for SUEWS
# Ting Sun, ting.sun@reading.ac.uk
# Yihao Tang, Yihao.Tang@student.reading.ac.uk
# history:
# TS, 13 Oct 2017: initial version
# YT, 01 Jun 2018: added the chained conversion
# TS, 21 May 2019: integrated into supy
########################################################
# %%
from collections import defaultdict
from contextlib import nullcontext
from fnmatch import fnmatch
from heapq import heappop, heappush
import os
import os.path
from pathlib import Path
import re
import shutil
from shutil import copyfile, move, rmtree
import sys
import textwrap
from tempfile import TemporaryDirectory

# ignore warnings raised by numpy when reading-in -9 lines
import warnings

from chardet import detect
import f90nml
import numpy as np
import pandas as pd

from ...._env import logger_supy, trv_supy_module
from ...._load import load_SUEWS_nml_simple
from .profile_manager import ProfileManager

warnings.filterwarnings("ignore")
########################################################
# %%
# load the rule file
rules = pd.read_csv(trv_supy_module / "util" / "converter" / "table" / "rules.csv")
list_ver_from = rules["From"].unique().tolist()
list_ver_to = rules["To"].unique().tolist()

# Canonical ordering from oldest → newest to support version comparisons
VERSION_SEQUENCE = [
    "2016a",
    "2017a",
    "2018a",
    "2018b",
    "2018c",
    "2019a",
    "2019b",
    "2020a",
    "2021a",
    "2023a",
    "2024a",
    "2025a",
]


PLACEHOLDER_GRIDLAYOUT = textwrap.dedent(
    """\
    ! Placeholder GridLayout generated by suews-convert for legacy datasets
    &dim
    nlayer = 3
    /
    &geom

    height = 0., 11., 15., 22.

    building_frac = 0.43, 0.38, .2
    veg_frac       = 0.01, 0.02, .01
    building_scale = 50.,  50.,  50
    veg_scale       = 10.,  10.,  10
    /
    &roof
    sfr_roof          = .3,  .3,  .4
    tin_roof          = 5,   5,   6
    alb_roof          = .5,  .5,  .2
    emis_roof         = .95, .95, .95
    state_roof        = .0,  .0,  .0
    statelimit_roof   = 5,   5,   5
    wetthresh_roof    = 5,   5,   5
    soilstore_roof    = 20,  20,  20
    soilstorecap_roof = 120, 120, 120

    roof_albedo_dir_mult_fact(1,:) = 1.,1.,1.

    dz_roof(1,:)  = .2,  .1,  .1,  .01, .01
    k_roof(1,:) = 1.2, 1.2, 1.2, 1.2, 1.2
    cp_roof(1,:)  = 2e6, 2e6, 2e6, 2e6, 2e6

    dz_roof(2,:)  = .2,  .1,  .1,  .01, .01
    k_roof(2,:) = 2.2, 1.2, 1.2, 1.2, 1.2
    cp_roof(2,:)  = 2e6, 3e6, 2e6, 2e6, 2e6

    dz_roof(3,:)  = .2,  .1,  .1,  .01, .01
    k_roof(3,:) = 2.2, 1.2, 1.2, 1.2, 1.2
    cp_roof(3,:)  = 2e6, 3e6, 2e6, 2e6, 2e6
    /

    &wall
    sfr_wall  = .3,  .3,  .4
    tin_wall  = 5,   5,   5
    alb_wall  = .5,  .5,  .5
    emis_wall = .95, .95, .95

    state_wall        = .0,  .0,  .0
    statelimit_wall   = 5,   5,   5
    wetthresh_wall    = 5,   5,   5
    soilstore_wall    = 20,  20,  20
    soilstorecap_wall = 120, 120, 120

    wall_specular_frac(1,:) = 0.,0.,0.

    dz_wall(1,:)  = .2,  .1,  .1,  .01, .01
    k_wall(1,:) = 1.2, 1.2, 1.2, 1.2, 1.2
    cp_wall(1,:)  = 3e6, 2e6, 2e6, 2e6, 2e6

    dz_wall(2,:)  = .2,  .1,  .1,  .01, .01
    k_wall(2,:) = 1.2, 1.2, 1.2, 1.2, 1.2
    cp_wall(2,:)  = 2e6, 3e6, 2e6, 2e6, 2e6

    dz_wall(3,:)  = .2,  .1,  .1,  .01, .01
    k_wall(3,:) = 1.2, 1.2, 1.2, 1.2, 1.2
    cp_wall(3,:)  = 2e6, 3e6, 2e6, 2e6, 2e6
    /
    &surf
    tin_surf = 2, 2, 2, 2, 2, 2, 2

    dz_surf(1,:)  = .2,    .15,   .01,   .01,   .01
    k_surf(1,:) = 1.1,   1.1,   1.1,   1.1,   1.1
    cp_surf(1,:)  = 2.2e6, 2.2e6, 2.2e6, 2.2e6, 2.6e6

    dz_surf(2,:)  = .2,    .1,    .1,    .5,    1.6
    k_surf(2,:) = 1.2,   1.1,   1.1,   1.5,   1.6
    cp_surf(2,:)  = 1.2e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6

    dz_surf(3,:)  = .2,    .1,    .1,    .5,    1.6
    k_surf(3,:) = 1.2,   1.1,   1.1,   1.5,   1.6
    cp_surf(3,:)  = 3.2e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6

    dz_surf(4,:)  = .2,    .1,    .1,    .1,    2.2
    k_surf(4,:) = 1.2,   1.1,   1.1,   1.5,   1.6
    cp_surf(4,:)  = 3.2e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6

    dz_surf(5,:)  = .2,    .05,   .1,    .1,    2.2
    k_surf(5,:) = 1.2,   1.1,   1.1,   1.5,   1.6
    cp_surf(5,:)  = 1.6e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6

    dz_surf(6,:)  = .2,    .05,   .1,    .1,    2.2
    k_surf(6,:) = 1.2,   1.1,   1.1,   1.5,   1.6
    cp_surf(6,:)  = 1.9e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6


    dz_surf(7,:)  = .2,    .05,   .1,    .1,    2.2
    k_surf(7,:) = 1.2,   1.1,   1.1,   1.5,   1.6
    cp_surf(7,:)  = 1.9e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6

    /
    """
)


def _version_index(version):
    """Return the index of a version string within VERSION_SEQUENCE."""
    if version is None:
        return None
    try:
        return VERSION_SEQUENCE.index(version)
    except ValueError:
        return None


def _requires_grid_layout(version):
    """Determine if the target version needs GridLayout*.nml support."""
    version_idx = _version_index(version)
    grid_idx = _version_index("2024a")
    if version_idx is None or grid_idx is None:
        return False
    return version_idx >= grid_idx


def _check_required_files(input_path, required_files):
    """Check if all required files exist."""
    return all((input_path / f).exists() for f in required_files)


def _check_specific_files(input_path, specific_files):
    """Check if specific files exist based on RunControl.nml paths."""
    # Try to read RunControl.nml to get actual input path
    runcontrol_path = input_path / "RunControl.nml"

    if runcontrol_path.exists():
        try:
            ser_nml = load_SUEWS_nml_simple(str(runcontrol_path)).runcontrol
            fileinputpath = ser_nml.get("fileinputpath", "./input/")

            if os.path.isabs(fileinputpath):
                actual_input_dir = Path(fileinputpath)
            else:
                actual_input_dir = (input_path / fileinputpath).resolve()

            # Check in the actual input directory
            for f in specific_files:
                if not ((input_path / f).exists() or (actual_input_dir / f).exists()):
                    return False
            return True
        except Exception:
            pass

    # Fallback: check root and Input/ subdirectory
    for f in specific_files:
        if not ((input_path / f).exists() or (input_path / "Input" / f).exists()):
            return False
    return True


def _check_columns_in_file(file_path, columns_to_check):
    """Check if specific columns exist in a file's header."""
    if not file_path.exists():
        return False

    try:
        with open(file_path, encoding="utf-8") as f:
            lines = f.readlines()
            if len(lines) > 1:
                headers = lines[1].strip().split()
                return all(col in headers for col in columns_to_check)
    except Exception:
        return False

    return False


def _check_columns(input_path, check_columns):
    """Check if required columns exist in specified files."""
    # Try to read RunControl.nml to get actual input path
    runcontrol_path = input_path / "RunControl.nml"
    actual_input_dir = None

    if runcontrol_path.exists():
        try:
            ser_nml = load_SUEWS_nml_simple(str(runcontrol_path)).runcontrol
            fileinputpath = ser_nml.get("fileinputpath", "./input/")

            if os.path.isabs(fileinputpath):
                actual_input_dir = Path(fileinputpath)
            else:
                actual_input_dir = (input_path / fileinputpath).resolve()
        except Exception:
            pass

    for file, columns in check_columns.items():
        # Check root first
        file_path = input_path / file

        # Then check actual input directory from RunControl
        if not file_path.exists() and actual_input_dir:
            file_path = actual_input_dir / file

        # Fallback to Input/ subdirectory
        if not file_path.exists():
            file_path = input_path / "Input" / file

        if not _check_columns_in_file(file_path, columns):
            return False

    return True


def _check_negative_columns(input_path, negative_columns):
    """Check that specified columns do NOT exist in files."""
    # Try to read RunControl.nml to get actual input path
    runcontrol_path = input_path / "RunControl.nml"
    actual_input_dir = None

    if runcontrol_path.exists():
        try:
            ser_nml = load_SUEWS_nml_simple(str(runcontrol_path)).runcontrol
            fileinputpath = ser_nml.get("fileinputpath", "./input/")

            if os.path.isabs(fileinputpath):
                actual_input_dir = Path(fileinputpath)
            else:
                actual_input_dir = (input_path / fileinputpath).resolve()
        except Exception:
            pass

    for file, columns in negative_columns.items():
        # Check root first
        file_path = input_path / file

        # Then check actual input directory from RunControl
        if not file_path.exists() and actual_input_dir:
            file_path = actual_input_dir / file

        # Fallback to Input/ subdirectory
        if not file_path.exists():
            file_path = input_path / "Input" / file

        if not file_path.exists():
            # If file doesn't exist, that's fine for negative check
            continue

        try:
            with open(file_path, encoding="utf-8") as f:
                lines = f.readlines()
                if len(lines) > 1:
                    headers = lines[1].strip().split()
                    for col in columns:
                        if col in headers:  # Should NOT be present
                            return False
        except Exception:
            return False

    return True


def _check_nml_parameters(input_path, check_nml):
    """Check if required parameters exist in .nml files."""
    for nml_file, params in check_nml.items():
        nml_path = input_path / nml_file
        if not nml_path.exists():
            return False

        try:
            nml = f90nml.read(str(nml_path))
            # Get the first (and usually only) section
            section = next(iter(nml.values())) if nml else {}
            # Check if ALL required parameters exist
            for param in params:
                if param.lower() not in [k.lower() for k in section]:
                    return False
        except Exception:
            return False

    return True


[docs] def detect_table_version(input_dir): """Auto-detect the version of SUEWS table files. Detection is based on: - File existence (e.g., AnthropogenicEmission vs AnthropogenicHeat) - Column presence/absence in specific tables - Parameters in RunControl.nml (for 2024a+) - Optional files like SPARTACUS.nml Each version has unique characteristics that allow precise identification. Args: input_dir: Path to the directory containing SUEWS table files Returns ------- str: Detected version (e.g., '2016a', '2024a') or None if unable to detect Note ---- Detection checks versions from newest to oldest using unique characteristics of each version. Some versions (e.g., 2018a/b/c, 2020a/2021a) are identical in structure; any detection among them is acceptable. """ input_path = Path(input_dir) # Key indicators for different versions based on actual conversion rules # Structure of indicators: # - required_files: Must exist in root directory # - file_exists: Must exist in root or Input/ subdirectory # - check_columns: Columns that MUST exist in specified files # - negative_columns: Columns that must NOT exist (for differentiation) # - check_nml: Parameters that MUST exist in .nml files # - optional_files: Files that may exist and support identification # - fallback: Use this version if no other matches version_indicators = { # 2025a: Added building statistics columns "2025a": { "required_files": ["RunControl.nml"], "check_columns": { "SUEWS_SiteSelect.txt": ["h_std", "n_buildings"] # Added in 2025a }, }, # 2024a: Added diagnostic methods and SPARTACUS radiation scheme support "2024a": { "required_files": ["RunControl.nml"], # SPARTACUS files are part of 2024a specification "file_exists": ["SUEWS_SPARTACUS.nml", "GridLayoutKc.nml"], # Also has new parameters in RunControl "check_nml": {"RunControl.nml": ["rslmethod", "rsllevel", "faimethod"]}, }, # 2023a: Removed DiagQS/DiagQN from RunControl, removed BaseT_HC from AnthropogenicEmission "2023a": { "required_files": ["RunControl.nml"], # 2023a has H_maintain but NOT BaseT_HC (which was deleted in 2021a->2023a) "check_columns": { "SUEWS_Irrigation.txt": ["H_maintain"], }, "negative_columns": { "SUEWS_AnthropogenicEmission.txt": ["BaseT_HC"] # Removed in 2023a }, }, # 2021a: No changes from 2020a (Keep action only) "2021a": { "required_files": ["RunControl.nml"], # Has both H_maintain and BaseT_HC "check_columns": { "SUEWS_Irrigation.txt": ["H_maintain"], "SUEWS_AnthropogenicEmission.txt": [ "BaseT_HC" ], # Still present in 2021a }, }, # 2020a: Added H_maintain and irrigation fractions "2020a": { "required_files": ["RunControl.nml"], "check_columns": { "SUEWS_Irrigation.txt": ["H_maintain"], # Added in 2020a "SUEWS_SiteSelect.txt": [ "IrrFr_Paved", "IrrFr_Bldgs", ], # Added in 2020a }, }, "2019b": { "required_files": ["RunControl.nml", "SUEWS_AnthropogenicEmission.txt"], "check_columns": { "SUEWS_AnthropogenicEmission.txt": ["BaseT_HC"] # Renamed from BaseTHDD }, }, "2019a": { "required_files": ["RunControl.nml"], "file_exists": [ "SUEWS_AnthropogenicEmission.txt" ], # Renamed from AnthropogenicHeat # Check for BaseTHDD column (renamed to BaseT_HC in 2019b/2020a) "check_columns": { "SUEWS_AnthropogenicEmission.txt": [ "BaseTHDD" ] # Original name before 2019b }, }, # 2018c: Added FcEF_v columns and CO2PointSource (converted to 2019a) "2018c": { "required_files": ["RunControl.nml"], "file_exists": ["SUEWS_AnthropogenicHeat.txt"], # Old name before 2019a "check_columns": { # These columns were added when converting 2018c->2019a "SUEWS_AnthropogenicHeat.txt": [ "FcEF_v_kgkmWE", "FcEF_v_kgkmWD", "CO2PointSource", ] }, }, # 2018b: No changes from 2018a (Keep action only) "2018b": { "required_files": ["RunControl.nml"], "file_exists": ["SUEWS_AnthropogenicHeat.txt"], # Same structure as 2018a - differentiate by NOT having 2018c columns "negative_columns": { "SUEWS_AnthropogenicHeat.txt": [ "FcEF_v_kgkmWE", "CO2PointSource", ] # Not in 2018b }, "check_columns": { "SUEWS_BiogenCO2.txt": ["alpha", "beta", "theta"], # Has 2018a features }, }, # 2018a: Major restructuring from 2017a "2018a": { "required_files": ["RunControl.nml"], "file_exists": ["SUEWS_AnthropogenicHeat.txt"], "check_columns": { "SUEWS_BiogenCO2.txt": ["alpha", "beta", "theta"], # Added in 2018a "SUEWS_SiteSelect.txt": [ "TrafficRate_WD", "TrafficRate_WE", ], # Added in 2018a "SUEWS_AnthropogenicHeat.txt": [ "AHMin_WD", "AHMin_WE", ], # Added in 2018a }, }, "2017a": { "required_files": ["RunControl.nml"], "file_exists": ["SUEWS_AnthropogenicHeat.txt"], # 2017a has ESTMCoefficients but different structure than 2018a "check_columns": { "SUEWS_Conductance.txt": ["gsModel"], # Added in 2017a }, }, "2016a": { "required_files": ["RunControl.nml"], # 2016a has old parameter names and lacks ESTM/gsModel features "negative_columns": { "SUEWS_Conductance.txt": ["gsModel"], # Not in 2016a "SUEWS_NonVeg.txt": ["OHMThresh_SW", "ESTMCode"], # Not in 2016a "SUEWS_ESTMCoefficients.txt": [ "Surf_thick1", "Wall_thick1", ], # Not in 2016a }, # Has old RunControl parameter names "check_nml": { "RunControl.nml": [ "AnthropHeatChoice", "QSChoice", ] # Old names in 2016a }, "fallback": True, # Still use as fallback if no other matches }, } # Check versions from newest to oldest - ORDER IS CRITICAL! # Newer versions often contain all features of older versions plus additions. # By checking newest first with negative checks, we avoid false positives. # Example: 2025a has H_maintain (like 2020a) but also has h_std/n_buildings. # If we checked 2020a first, it would incorrectly match 2025a files. for version in [ "2025a", # Has h_std and n_buildings columns (unique to 2025a) "2024a", # Has SPARTACUS files and new RunControl parameters "2023a", # Has H_maintain but NOT BaseT_HC (removed in this version) "2021a", # Has both H_maintain and BaseT_HC "2020a", # Has H_maintain and IrrFr_ columns (same as 2021a) "2019b", # Has BaseT_HC in AnthropogenicEmission (renamed from BaseTHDD) "2019a", # Has BaseTHDD and AnthropogenicEmission.txt file "2018c", # Same as 2018a/b (will be added FcEF columns when converting to 2019a) "2018b", # Same as 2018a (no structural differences) "2018a", # Has BiogenCO2 with alpha/beta, TrafficRate_WD "2017a", # Has gsModel in Conductance, ESTM features "2016a", # Oldest version with old parameter names ]: indicators = version_indicators.get(version, {}) # Check required files exist required_files = indicators.get("required_files", []) if required_files and not _check_required_files(input_path, required_files): continue # Check for specific file existence (version-specific files) specific_files = indicators.get("file_exists", []) if specific_files and not _check_specific_files(input_path, specific_files): continue # Check for optional files (these can help identify version but aren't required) optional_files = indicators.get("optional_files", []) if optional_files: # If any optional file exists, it's a positive indicator for f in optional_files: if (input_path / f).exists() or (input_path / "Input" / f).exists(): # Found an optional file that helps identify this version break # Check columns in text files check_columns = indicators.get("check_columns", {}) if check_columns and not _check_columns(input_path, check_columns): continue # Check for columns that should NOT exist (negative check) negative_columns = indicators.get("negative_columns", {}) if negative_columns and not _check_negative_columns( input_path, negative_columns ): continue # Check nml parameters for versions that need it (e.g., 2024a) check_nml = indicators.get("check_nml", {}) if check_nml and not _check_nml_parameters(input_path, check_nml): continue # If this is a fallback version, only use if nothing else matched if indicators.get("fallback", False): logger_supy.warning( f"Could not determine exact version, assuming {version}" ) # For versions without distinct table changes (e.g., 2023a, 2024a have same # structure as 2021a/2020a), we may detect an earlier version. This is fine # since the conversion rules are identical for these versions. logger_supy.info(f"Auto-detected table version: {version}") return version logger_supy.warning("Could not auto-detect table version") return None
# %% ######################################################## # define action functions: # the current supported actions: # rename, delete, add, move # rename: # rename file def rename_file(toFile, _toVar, _toCol, toVal): # _toVar, _toCol are ignored if not Path(toFile).exists(): logger_supy.error(f"{toFile} not existing") sys.exit() else: dir = Path(toFile).resolve().parent path_toFile_renamed = dir / toVal os.rename(toFile, path_toFile_renamed) # rename variable def rename_var(toFile, toVar, _toCol, toVal): # if namelist: if toFile.endswith(".nml"): logger_supy.info(f"{toFile} {toVar} {toVal}") rename_var_nml(toFile, toVar, toVal) else: # First, read the file to find where data ends (before -9 lines) with open(toFile, encoding="utf-8") as f: lines = f.readlines() # Find where data ends (first line starting with -9) data_end_idx = len(lines) for i, line in enumerate(lines): if line.strip().startswith("-9"): data_end_idx = i break # Read only the data portion try: dataX = pd.read_csv( toFile, sep=r"\s+", comment="!", encoding="UTF8", skiprows=2, # Skip both header lines nrows=data_end_idx - 2 if data_end_idx > 2 else None, header=None, ) # Get the header from the second line if len(lines) > 1: headers = lines[1].strip().split() dataX.columns = headers except Exception as e: logger_supy.error(f"Could not read {toFile}: {e}") return # Rename the column if toVar in dataX.columns: dataX = dataX.rename(columns={toVar: toVal}) else: logger_supy.warning(f"Column {toVar} not found in {toFile}") # Get headers headers = list(dataX.columns) # Create header line headerLine = ( " ".join(str(i + 1) for i in range(len(headers))) + "\n" + " ".join(headers) ) # Convert to string dataX = dataX.astype(str) # Write the file with open(toFile, "w", encoding="utf-8") as f: f.write(headerLine + "\n") dataX.to_csv(f, sep=" ", index=False, header=False) # NO footer lines - these are legacy and should not be added logger_supy.debug(f"Renamed {toVar} to {toVal} in {toFile}") return def rename_var_nml(to_file, to_var, to_val): """Rename a variable in a .nml file, using lower case for consistency.""" nml = f90nml.read(to_file) title = next(iter(nml.keys())) to_var_lower = to_var.lower() to_val_lower = to_val.lower() if to_var_lower in nml[title]: nml[title][to_val_lower] = nml[title].pop(to_var_lower) else: logger_supy.warning(f"{to_var} does not exist!") nml.write(to_file, force=True) # delete: # delete variable def delete_var(toFile, toVar, _toCol, toVal): if toFile.endswith(".nml"): delete_var_nml(toFile, toVar, toVal) else: # First, read the file to find where data ends (before -9 lines) with open(toFile, encoding="utf-8") as f: lines = f.readlines() # Find where data ends (first line starting with -9) data_end_idx = len(lines) for i, line in enumerate(lines): if line.strip().startswith("-9"): data_end_idx = i break # Read only the data portion try: dataX = pd.read_csv( toFile, sep=r"\s+", comment="!", encoding="UTF8", skiprows=2, # Skip both header lines nrows=data_end_idx - 2 if data_end_idx > 2 else None, header=None, ) # Get the header from the second line if len(lines) > 1: headers = lines[1].strip().split() dataX.columns = headers except Exception as e: logger_supy.error(f"Could not read {toFile}: {e}") return # Delete the column if toVar in dataX.columns: dataX = dataX.drop(columns=[toVar]) else: logger_supy.warning(f"Column {toVar} not found in {toFile}") return # Get headers after deletion headers = list(dataX.columns) # Create header line headerLine = ( " ".join(str(i + 1) for i in range(len(headers))) + "\n" + " ".join(headers) ) # Convert to string dataX = dataX.astype(str) # Write the file with open(toFile, "w", encoding="utf-8") as f: f.write(headerLine + "\n") dataX.to_csv(f, sep=" ", index=False, header=False) # NO footer lines - these are legacy and should not be added logger_supy.debug(f"Deleted column {toVar} from {toFile}") return def delete_var_nml(toFile, toVar, _toVal): nml = f90nml.read(toFile) toVarX = toVar.lower() title = next(iter(nml.keys())) if toVarX in nml[title]: nml[title].pop(toVarX) else: logger_supy.warning(f"{toVar} does not exist!") nml.write(toFile, force=True) def _should_skip_line(line): """Check if a line should be skipped during cleaning.""" stripped = line.strip() # Skip empty lines and full-line comments if not stripped or stripped.startswith("#"): return True # Detect whether this looks like a data line (starts with numeric code) first_token = stripped.split()[0] is_data_line = first_token.lstrip("-").isdigit() # Skip lines that contain triple quotes or problematic quoted comments if '"""' in line: return True if ( '"' in line and not is_data_line and ("Vegetation (average)" in line or "used for" in line) ): return True # Skip lines starting with -9 (legacy footers) return stripped.startswith("-9") def _process_line(line): """Process a single line: remove comments and tabs.""" # Replace tabs with spaces line = line.replace("\t", " ") # Remove inline comments (everything after !) if "!" in line: line = line[: line.index("!")].rstrip() return line def _ensure_consistent_columns(fields, header_col_count): """Ensure field count matches header column count.""" if not header_col_count: return fields if len(fields) == header_col_count: return fields # Truncate extra fields or pad with -999 if len(fields) > header_col_count: return fields[:header_col_count] else: while len(fields) < header_col_count: fields.append("-999") return fields def clean_legacy_table(file_path, output_path=None): r""" Clean legacy SUEWS table files for pandas compatibility. This function: - Removes inline comments (text after ! character) - Standardizes line endings (removes \r) - Removes empty trailing columns - Ensures consistent column counts - Handles tab-separated values - Removes ALL lines that start with -9 (legacy footers) Args: file_path: Path to the input file output_path: Optional path for cleaned output (if None, overwrites input) Returns ------- Path to the cleaned file """ if output_path is None: output_path = file_path logger_supy.debug(f"Cleaning legacy file: {file_path}") # Track what was cleaned for reporting cleaning_actions = [] with open(file_path, encoding="utf-8", errors="replace") as f: lines = f.readlines() if len(lines) < 2: logger_supy.warning( f"File {file_path} has less than 2 lines, skipping cleaning" ) return file_path header_lines = [] # Store header lines (first 2 lines) data_lines = [] # Store data lines header_col_count = None line_count = 0 # Track non-empty lines # Track cleaning statistics comments_removed = 0 tabs_replaced = 0 footer_removed = False columns_adjusted = 0 for i, raw_line in enumerate(lines): # Remove carriage returns and trailing whitespace line = raw_line.replace("\r", "").rstrip() # Track tabs for reporting if "\t" in line: tabs_replaced += 1 # Check if line should be skipped if _should_skip_line(line): if line.strip().startswith("-9"): footer_removed = True logger_supy.debug( f"Removing legacy footer line {i + 1}: {line[:50]}... Stopping read after footer." ) break # Stop processing after footer elif '"""' in line or ( '"' in line and ("Vegetation (average)" in line or "used for" in line) ): logger_supy.debug( f"Skipping line {i + 1} with problematic quoted comments: {line[:50]}..." ) cleaning_actions.append(f"Removed metadata line {i + 1}") continue # Process the line (remove comments and tabs) original_line = line line = _process_line(line) if "!" in original_line: comments_removed += 1 # Split by spaces (tabs have been replaced with spaces) fields = line.split() # Skip empty lines after processing if not fields: continue # For the header rows (first 2 non-empty lines), establish column count if line_count < 2: # Store header line header_lines.append(" ".join(fields)) line_count += 1 # Set column count from the SECOND line (column names), not first # First line may have trailing empty fields from tabs if line_count == 2: header_col_count = len(fields) logger_supy.debug( f"Header column count set to {header_col_count} from column names line" ) # Adjust first header line if needed if len(header_lines[0].split()) != header_col_count: first_line_fields = header_lines[0].split() if len(first_line_fields) > header_col_count: header_lines[0] = " ".join(first_line_fields[:header_col_count]) logger_supy.debug( f"Adjusted first header line from {len(first_line_fields)} to {header_col_count} fields" ) continue # For data lines line_count += 1 # Ensure consistent column count original_field_count = len(fields) fields = _ensure_consistent_columns(fields, header_col_count) if len(fields) != original_field_count: columns_adjusted += 1 if original_field_count > header_col_count: logger_supy.debug( f"Line {i + 1}: Truncating from {original_field_count} to {header_col_count} fields" ) # Store processed data line data_lines.append(" ".join(fields)) # Combine header and data lines cleaned_lines = header_lines + data_lines # Note: We do NOT add footer lines - the -9 lines are removed entirely # Write cleaned content with open(output_path, "w", encoding="utf-8") as f: f.write("\n".join(cleaned_lines)) if cleaned_lines and not cleaned_lines[-1].endswith("\n"): f.write("\n") # Report what was cleaned if ( comments_removed > 0 or tabs_replaced > 0 or footer_removed or columns_adjusted > 0 ): clean_summary = [] if comments_removed > 0: clean_summary.append(f"{comments_removed} inline comments") if tabs_replaced > 0: clean_summary.append(f"{tabs_replaced} tabs replaced") if footer_removed: clean_summary.append("legacy footer removed") if columns_adjusted > 0: clean_summary.append( f"{columns_adjusted} lines adjusted for column consistency" ) if cleaning_actions: clean_summary.append(f"{len(cleaning_actions)} metadata lines removed") logger_supy.info( f"[OK] Cleaned {Path(file_path).name}: {', '.join(clean_summary)}" ) else: logger_supy.debug(f"File {Path(file_path).name} was already clean") return output_path # Helper function to read SUEWS files robustly (kept for backward compatibility but simplified) def read_suews_table(toFile): """Read SUEWS table file using numpy - simpler approach.""" try: dataX = np.genfromtxt( toFile, dtype=str, skip_header=1, comments="!", names=True, invalid_raise=False, encoding="UTF8", ) # Convert to pandas DataFrame for compatibility if dataX.size == 0: return pd.DataFrame(columns=list(dataX.dtype.names)) else: return pd.DataFrame(dataX.tolist(), columns=list(dataX.dtype.names)) except Exception as e: logger_supy.error(f"Failed to read {toFile}: {e!s}") raise # add: # add variable(s) to a file def add_var(toFile, toVar, toCol, toVal): # if namelist: if toFile.endswith(".nml"): add_var_nml(toFile, toVar, toVal) else: # First, read the file to find where data ends (before -9 lines) with open(toFile, encoding="utf-8") as f: lines = f.readlines() # Find where data ends (first line starting with -9) data_end_idx = len(lines) for i, line in enumerate(lines): if line.strip().startswith("-9"): data_end_idx = i break # Read only the data portion (skip headers and footers) try: # Use pandas to read only the data lines dataX = pd.read_csv( toFile, sep=r"\s+", # Use regex for whitespace separation comment="!", encoding="UTF8", skiprows=2, # Skip both header lines nrows=data_end_idx - 2 if data_end_idx > 2 else None, # Read only data rows header=None, # No header in data ) # Get the header from the second line if len(lines) > 1: headers = lines[1].strip().split() dataX.columns = headers else: headers = [] except Exception as e: logger_supy.debug(f"Could not read {toFile} with pandas: {e}") # If file doesn't exist or is empty, create minimal structure dataX = pd.DataFrame() headers = [] # Check if column already exists if toVar in headers: logger_supy.warning( f"{toVar} already exists in {toFile}, skipping add operation" ) return # Calculate target position (convert from 1-based to 0-based) target_col = int(toCol) - 1 # Insert the new column at the specified position if target_col <= len(headers): headers.insert(target_col, toVar) # Add the new column to dataX with the default value if not dataX.empty: # Insert column with the same value for all rows dataX.insert(target_col, toVar, toVal) else: # Create a new dataframe with just the header dataX = pd.DataFrame(columns=headers) # Create header line with column indices headerLine = ( " ".join(str(i + 1) for i in range(len(headers))) + "\n" + " ".join(headers) ) # Save the dataframe to file # Convert to string to ensure all values are saved as text if not dataX.empty: dataX = dataX.astype(str) # Write the file with headers with open(toFile, "w", encoding="utf-8") as f: # Write header lines f.write(headerLine + "\n") # Write data without index (only if there's data) if not dataX.empty: dataX.to_csv(f, sep=" ", index=False, header=False) # NO footer lines - these are legacy and should not be added def add_var_nml(toFile, toVar, toVal): nml = f90nml.read(toFile) toVarX = toVar.lower() title = next(iter(nml.keys())) if toVarX not in nml[title]: # Convert string values to appropriate types for .nml files # Try to convert to int or float if possible try: # First try integer if "." not in str(toVal): toVal = int(toVal) else: # If it has a decimal point, use float toVal = float(toVal) except (ValueError, TypeError): # Keep as string if conversion fails pass nml[title][toVarX] = toVal else: logger_supy.warning(f"{toVar} exists!") nml.write(toFile, force=True) def change_var_nml(toFile, toVar, toVal): nml = f90nml.read(toFile) nml[toVar] = toVal nml.write(toFile) def _copy_and_clean_files(fromDir, toDir, file_patterns, clean_txt=True): """Copy files matching patterns and optionally clean text files.""" for fileX in os.listdir(fromDir): if any(fnmatch(fileX, p) for p in file_patterns): file_src = os.path.join(fromDir, fileX) file_dst = os.path.join(toDir, fileX) copyfile(file_src, file_dst) convert_utf8(file_dst) if clean_txt and fnmatch(fileX, "*.txt"): clean_legacy_table(file_dst) def _handle_same_version_copy(fromDir, toDir, fromVer): """Handle the special case where source and target versions are the same.""" logger_supy.info( f"Source and target versions are the same ({fromVer}). Only cleaning files..." ) # Read RunControl.nml to determine file structure runcontrol_path = Path(fromDir) / "RunControl.nml" if not runcontrol_path.exists(): raise FileNotFoundError(f"RunControl.nml not found in {fromDir}") # Load RunControl to get file paths ser_nml = load_SUEWS_nml_simple(str(runcontrol_path)).runcontrol # Resolve input path from RunControl fileinputpath = ser_nml.get("fileinputpath", "./input/") if os.path.isabs(fileinputpath): # Absolute path input_dir = Path(fileinputpath) else: # Relative path from fromDir input_dir = (Path(fromDir) / fileinputpath).resolve() # Copy files from the actual input directory if input_dir.exists(): _copy_and_clean_files( str(input_dir), toDir, ["SUEWS_*.txt", "*.nml"], clean_txt=True ) # Also copy RunControl.nml and any other .nml files from root _copy_and_clean_files(fromDir, toDir, ["*.nml"], clean_txt=False) # Create the standard directory structure ser_nml = load_SUEWS_nml_simple(str(Path(toDir) / "RunControl.nml")).runcontrol path_input = (Path(toDir) / ser_nml["fileinputpath"]).resolve() path_output = (Path(toDir) / ser_nml["fileoutputpath"]).resolve() path_input.mkdir(exist_ok=True) path_output.mkdir(exist_ok=True) # Move table files to Input directory list_table_input = list(Path(toDir).glob("SUEWS*.txt")) + [ x for x in Path(toDir).glob("*.nml") if "RunControl" not in str(x) ] for fileX in list_table_input: move(fileX.resolve(), path_input / fileX.name) logger_supy.info(f"Files cleaned and copied to {toDir}") def _build_file_list(fromDir, fromVer): """Build list of files to process based on RunControl.nml structure.""" fileList = [] # Read RunControl.nml to determine file structure runcontrol_path = Path(fromDir) / "RunControl.nml" if not runcontrol_path.exists(): # If no RunControl.nml, fall back to checking root logger_supy.warning( f"RunControl.nml not found in {fromDir}, checking root directory" ) for fileX in os.listdir(fromDir): if any(fnmatch(fileX, p) for p in ["SUEWS*.txt", "*.nml", "*.txt"]): fileList.append(("", fileX)) return fileList # Load RunControl to get file paths ser_nml = load_SUEWS_nml_simple(str(runcontrol_path)).runcontrol # Resolve input path from RunControl fileinputpath = ser_nml.get("fileinputpath", "./input/") if os.path.isabs(fileinputpath): # Absolute path input_dir = Path(fileinputpath) else: # Relative path from fromDir input_dir = (Path(fromDir) / fileinputpath).resolve() # Check for files in the input directory specified by RunControl if input_dir.exists(): logger_supy.debug( f"Found input directory at {input_dir}, scanning for SUEWS_*.txt files" ) # Get relative path from fromDir to input_dir for the subdir part try: rel_path = input_dir.relative_to(Path(fromDir).resolve()) subdir = str(rel_path) except ValueError: # If not relative, use empty string subdir = "" for fileX in os.listdir(input_dir): if fnmatch(fileX, "SUEWS_*.txt") or fnmatch(fileX, "*.nml"): fileList.append((subdir, fileX)) logger_supy.debug(f"Found file in {subdir}: {fileX}") # Also check root for .nml files and txt files for fileX in os.listdir(fromDir): if fnmatch(fileX, "*.nml") or fnmatch(fileX, "*.txt"): fileList.append(("", fileX)) logger_supy.debug(f"Found file in root: {fileX}") return fileList # a single conversion between two versions def SUEWS_Converter_single(fromDir, toDir, fromVer, toVer): # copy files in fromDir to toDir, only: *.nml, SUEWS_*.txt if os.path.exists(toDir) is False: os.mkdir(toDir) # Special case: if fromVer == toVer, just copy and clean without conversion if fromVer == toVer: _handle_same_version_copy(fromDir, toDir, fromVer) return # Normal conversion process continues below fileList = _build_file_list(fromDir, fromVer) for subdir, fileX in fileList: file_src = ( os.path.join(fromDir, subdir, fileX) if subdir else os.path.join(fromDir, fileX) ) # Always copy to root of toDir (flattening the structure) file_dst = os.path.join(toDir, fileX) logger_supy.debug(f"Copying {file_src} to {file_dst}") copyfile(file_src, file_dst) convert_utf8(file_dst) # Note: File cleaning is now done once in convert_table() when files are first copied # This avoids redundant cleaning during chained conversions # Special handling: Create SPARTACUS.nml and GridLayoutKc.nml when converting 2023a→2024a # These files are introduced in 2024a and should only be created at this specific step # In a chained conversion, this ensures they're created at the right point if fromVer == "2023a" and toVer == "2024a": spartacus_path = os.path.join(toDir, "SUEWS_SPARTACUS.nml") if not os.path.exists(spartacus_path): # Create a minimal SPARTACUS.nml file with default values spartacus_content = """&Spartacus_Settings use_sw_direct_albedo = false n_vegetation_region_urban = 1 n_stream_sw_urban = 4 n_stream_lw_urban = 4 / &Spartacus_Constant_Parameters sw_dn_direct_frac = 0.45 air_ext_sw = 0.0 air_ssa_sw = 0.95 veg_ssa_sw = 0.46 air_ext_lw = 0.0 air_ssa_lw = 0.0 veg_ssa_lw = 0.06 veg_fsd_const = 0.75 veg_contact_fraction_const = 0. ground_albedo_dir_mult_fact = 1. / &radsurf_driver / &radsurf / """ with open(spartacus_path, "w", encoding="utf-8") as f: f.write(spartacus_content) logger_supy.info(f"Created placeholder SUEWS_SPARTACUS.nml for {toVer}") # Also create GridLayoutKc.nml for 2024a+ gridlayout_path = os.path.join(toDir, "GridLayoutKc.nml") if not os.path.exists(gridlayout_path): # Create a complete GridLayoutKc.nml file with thermal layer data gridlayout_content = """&dim nlayer = 3 / &geom height = 0., 11., 15., 22. building_frac = 0.43, 0.38, .2 veg_frac = 0.01, 0.02, .01 building_scale = 50., 50., 50 veg_scale = 10., 10., 10 / &roof sfr_roof = .3, .3, .4 tin_roof = 5, 5, 6 alb_roof = .5, .5, .2 emis_roof = .95, .95, .95 state_roof = .0, .0, .0 statelimit_roof = 5, 5, 5 wetthresh_roof = 5, 5, 5 soilstore_roof = 20, 20, 20 soilstorecap_roof = 120, 120, 120 roof_albedo_dir_mult_fact(1,:) = 1., 1., 1. dz_roof(1,:) = .2, .1, .1, .01, .01 k_roof(1,:) = 1.2, 1.2, 1.2, 1.2, 1.2 cp_roof(1,:) = 2e6, 2e6, 2e6, 2e6, 2e6 dz_roof(2,:) = .2, .1, .1, .01, .01 k_roof(2,:) = 2.2, 1.2, 1.2, 1.2, 1.2 cp_roof(2,:) = 2e6, 3e6, 2e6, 2e6, 2e6 dz_roof(3,:) = .2, .1, .1, .01, .01 k_roof(3,:) = 2.2, 1.2, 1.2, 1.2, 1.2 cp_roof(3,:) = 2e6, 3e6, 2e6, 2e6, 2e6 / &wall sfr_wall = .3, .3, .4 tin_wall = 5, 5, 5 alb_wall = .5, .5, .5 emis_wall = .95, .95, .95 state_wall = .0, .0, .0 statelimit_wall = 5, 5, 5 wetthresh_wall = 5, 5, 5 soilstore_wall = 20, 20, 20 soilstorecap_wall = 120, 120, 120 wall_specular_frac(1,:) = 0., 0., 0. dz_wall(1,:) = .2, .1, .1, .01, .01 k_wall(1,:) = 1.2, 1.2, 1.2, 1.2, 1.2 cp_wall(1,:) = 3e6, 2e6, 2e6, 2e6, 2e6 dz_wall(2,:) = .2, .1, .1, .01, .01 k_wall(2,:) = 1.2, 1.2, 1.2, 1.2, 1.2 cp_wall(2,:) = 2e6, 3e6, 2e6, 2e6, 2e6 dz_wall(3,:) = .2, .1, .1, .01, .01 k_wall(3,:) = 1.2, 1.2, 1.2, 1.2, 1.2 cp_wall(3,:) = 2e6, 3e6, 2e6, 2e6, 2e6 / &surf tin_surf = 2, 2, 2, 2, 2, 2, 2 dz_surf(1,:) = .2, .15, .01, .01, .01 k_surf(1,:) = 1.1, 1.1, 1.1, 1.1, 1.1 cp_surf(1,:) = 2.2e6, 2.2e6, 2.2e6, 2.2e6, 2.6e6 dz_surf(2,:) = .2, .1, .1, .5, 1.6 k_surf(2,:) = 1.2, 1.1, 1.1, 1.5, 1.6 cp_surf(2,:) = 1.2e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6 dz_surf(3,:) = .2, .1, .1, .5, 1.6 k_surf(3,:) = 1.2, 1.1, 1.1, 1.5, 1.6 cp_surf(3,:) = 3.2e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6 dz_surf(4,:) = .2, .1, .1, .1, 2.2 k_surf(4,:) = 1.2, 1.1, 1.1, 1.5, 1.6 cp_surf(4,:) = 3.2e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6 dz_surf(5,:) = .2, .05, .1, .1, 2.2 k_surf(5,:) = 1.2, 1.1, 1.1, 1.5, 1.6 cp_surf(5,:) = 1.6e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6 dz_surf(6,:) = .2, .05, .1, .1, 2.2 k_surf(6,:) = 1.2, 1.1, 1.1, 1.5, 1.6 cp_surf(6,:) = 1.9e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6 dz_surf(7,:) = .2, .05, .1, .1, 2.2 k_surf(7,:) = 1.2, 1.1, 1.1, 1.5, 1.6 cp_surf(7,:) = 1.9e6, 1.1e6, 1.1e6, 1.5e6, 1.6e6 / """ with open(gridlayout_path, "w", encoding="utf-8") as f: f.write(gridlayout_content) logger_supy.info(f"Created placeholder GridLayoutKc.nml for {toVer}") # list all files involved in the given conversion posRules = np.unique( np.where( np.array(rules.loc[:, ["From", "To"]].values.tolist()) == [fromVer, toVer] )[0] ) filesToConvert = set(rules["File"][posRules]) - {"-999"} # Also include SUEWS_*.txt files that exist in source but aren't in rules # This ensures files like OHMCoefficients, Profiles, Soil, WithinGridWaterDist are preserved existing_files = set() for fileX in os.listdir(toDir): if fnmatch(fileX, "SUEWS_*.txt"): existing_files.add(fileX) # Add existing files not in rules to the conversion list # These will just be copied without modifications files_without_rules = existing_files - filesToConvert if files_without_rules: logger_supy.info( f"Files without rules (will be preserved): {list(files_without_rules)}" ) # Combine both sets filesToConvert |= files_without_rules logger_supy.info(f"filesToConvert: {list(filesToConvert)}") for fileX in filesToConvert: logger_supy.info(f"working on file: {fileX}") # Special debugging for ESTM file if "ESTM" in fileX: full_path = os.path.join(toDir, fileX) if Path(full_path).exists(): logger_supy.warning( f"ESTM file already exists at start of processing: {full_path}, size={Path(full_path).stat().st_size}" ) try: actionList = rules.values[posRules].compress( rules["File"].values[posRules] == fileX, axis=0 ) # If no rules exist for this file, it will just be copied as-is (already done in SUEWS_Converter_single) if len(actionList) == 0: logger_supy.info( f"No conversion rules for {fileX}, file preserved as-is" ) continue actionList = actionList[:, 2:] # actionList = np.array(actionList.tolist())[:, 2:].astype('S140') # prepend toDir to fileX actionList[:, 1] = os.path.join(toDir, fileX) # print('actionList:', actionList) SUEWS_Converter_file(os.path.join(toDir, fileX), actionList) except Exception as e: logger_supy.error( f"Failed to convert {fileX} from {fromVer} to {toVer}: {e!s}" ) # Don't continue with a broken conversion - fail fast raise RuntimeError(f"Conversion stopped at {fileX}: {e!s}") from e def SUEWS_Converter_file(fileX, actionList): # actionList:[Action,File,Variable,Column,Value] # for a given fileX, action order: # 1. rename # 2. delete # 3. move # 4. add # 5. rename file order = { "Keep": 0, "Rename": 1, "Delete": 2, "Move": 3, "Add": 4, "Rename_File": 5, } todoList = np.array([ np.concatenate(([order[x[0]]], x)).tolist() for x in actionList ]) # sort by Column number, then by Action order in actionList; also expand # dtype size todoList = todoList[np.lexsort((todoList[:, 4].astype(int), todoList[:, 0]))][:, 1:] # Check if file exists before processing if "ESTM" in fileX and Path(fileX).exists(): file_size = Path(fileX).stat().st_size logger_supy.warning( f"ESTM file already exists before placeholder creation: {fileX}, size={file_size} bytes" ) # Read first few lines to see what's in it with open(fileX, encoding="utf-8") as f: first_lines = f.readlines()[:3] logger_supy.warning(f"ESTM file first lines: {first_lines}") if not Path(fileX).exists(): # Only create placeholder for .txt files, not .nml files if fileX.endswith(".txt"): # Create appropriate placeholder based on file type if "BiogenCO2" in fileX: # Create minimal BiogenCO2 file - columns will be added by conversion rules # Just create the basic structure with Code column only placeholder = "1\nCode\n" placeholder += "31\n" # Code 31 is commonly referenced elif "ESTMCoefficients" in fileX: # Create minimal ESTM file - columns will be added by conversion rules # Just create the basic structure with Code column only placeholder = "1\nCode\n" placeholder += ( "800\n801\n802\n803\n804\n805\n806\n807\n808\n60\n61\n200\n" ) logger_supy.warning( f"Creating ESTM placeholder with minimal structure: {len(placeholder)} bytes" ) else: # Default placeholder placeholder = "1\nCode\n800\n" Path(fileX).write_text(placeholder, encoding="UTF8") logger_supy.debug(f"Created placeholder for missing file: {fileX}") elif fileX.endswith(".nml"): # For missing .nml files, skip processing logger_supy.warning(f"Namelist file {fileX} does not exist, skipping") return # Skip processing this file else: logger_supy.warning(f"Unknown file type {fileX} does not exist, skipping") return if not fileX.endswith("-999"): logger_supy.info(f"working on {fileX} in {get_encoding_type(fileX)}") # correct file names with proper path todoList[:, 1] = fileX # print todoList,fileX for action in todoList: # print(action) try: SUEWS_Converter_action(*action) except Exception as e: logger_supy.error(f"Failed to perform action {action[0]} on {fileX}: {e!s}") raise RuntimeError( f"Conversion failed at {action[0]} for {fileX}: {e!s}" ) from e def keep_file(_toFile, _var, _col, _val): pass def SUEWS_Converter_action(action, toFile, var, col, val): logger_supy.info(f"{action}, {toFile}, {var}, {col}, {val}") actionFunc = { "Rename": rename_var, "Delete": delete_var, "Add": add_var, "Rename_File": rename_file, "Keep": keep_file, } actionFunc[action](toFile, var, col, val) logger_supy.info(f"{action} {var} for {toFile} done!") def dijkstra(edges, f, t): g = defaultdict(list) for src, dst, weight in edges: g[src].append((weight, dst)) q, seen = [(0, f, ())], set() while q: (cost, v1, path) = heappop(q) if v1 not in seen: seen.add(v1) path = (v1, path) if v1 == t: return cost, path for c, v2 in g.get(v1, ()): if v2 not in seen: heappush(q, (cost + c, v2, path)) return float("inf") def version_list(fromVer, toVer): edges = [] # a = pd.read_csv('rules.csv') a = rules v_from = np.unique(a["From"]) for i in v_from: df = a[a["From"] == i] for k in np.unique(df["To"]): edges.append((i, k, 1)) s = dijkstra(edges, fromVer, toVer) chain_ver = [] while s: chain_ver.append(s[0]) s = s[1] return chain_ver # a chained conversion across multiple versions
[docs] def convert_table( fromDir, toDir, fromVer, toVer, debug_dir=None, validate_profiles=True ): """Convert SUEWS table files between versions. This function performs chained conversion between SUEWS table versions, automatically handling intermediate version transitions when needed. Args: fromDir: Path to directory containing source SUEWS table files toDir: Path to directory where converted tables will be saved fromVer: Source version (e.g., '2016a', '2020a', '2024a') toVer: Target version (e.g., '2024a', '2025a') debug_dir: Optional directory to save intermediate conversion files validate_profiles: Whether to validate and auto-create missing profile entries Returns ------- None Note ---- If fromVer == toVer, the function only cleans/reformats files without conversion. The conversion process: 1. Reads input files from fromDir (using paths in RunControl.nml) 2. Performs chained conversion through intermediate versions if needed 3. Writes converted files to toDir in the target version format With debug_dir specified, intermediate conversion steps are preserved for inspection. Examples -------- >>> from supy.util.converter import convert_table >>> >>> # Convert from 2016a to 2024a >>> convert_table( ... fromDir="path/to/old_data", ... toDir="path/to/new_data", ... fromVer="2016a", ... toVer="2024a", ... ) >>> >>> # Convert with debug output >>> convert_table( ... fromDir="path/to/old_data", ... toDir="path/to/new_data", ... fromVer="2020a", ... toVer="2024a", ... debug_dir="debug_output", ... ) """ # Special case: if fromVer == toVer, just clean without conversion if fromVer == toVer: logger_supy.info( f"Source and target versions are the same ({fromVer}). Only cleaning files..." ) SUEWS_Converter_single(fromDir, toDir, fromVer, toVer) return chain_ver = version_list(fromVer, toVer) len_chain = chain_ver[0] logger_supy.info(f"working on chained conversion {len_chain} actions to take") logger_supy.info(f"chained list: {chain_ver[1:]} \n") # Create debug directory if specified if debug_dir is not None: debug_path = Path(debug_dir) debug_path.mkdir(parents=True, exist_ok=True) logger_supy.info( f"Debug mode: intermediate files will be saved in {debug_path}" ) # use a persistent directory when debug_dir is provided temp_ctx = ( TemporaryDirectory() if debug_dir is None else nullcontext(str(debug_path) if debug_dir else None) ) with temp_ctx as dir_temp: # dir_temp=xx tempDir_1 = Path(dir_temp) / "temp1" tempDir_2 = Path(dir_temp) / "temp2" i = chain_ver[0] # Create temporary folders if os.path.exists(tempDir_1) is False: os.mkdir(tempDir_1) if os.path.exists(tempDir_2) is False: os.mkdir(tempDir_2) # flatten all file structures in tempDir_1 # locate input folder ser_nml = load_SUEWS_nml_simple( str(Path(fromDir) / "RunControl.nml") ).runcontrol path_input = (Path(fromDir) / ser_nml["fileinputpath"]).resolve() list_table_input = ( list( path_input.glob("SUEWS_*.txt") ) # Fixed: Added underscore to match SUEWS_*.txt files + list(path_input.glob("*.nml")) + list(Path(fromDir).resolve().glob("*.nml")) + list( Path(fromDir).resolve().glob("SUEWS_*.txt") ) # Also check root for SUEWS_*.txt files ) # copy flattened files into tempDir_1 for later processing # also convert all files to UTF-8 encoding in case inconsistent encoding exists for fileX in list_table_input: # print(fileX) path_dst = Path(tempDir_1) / fileX.name copyfile(fileX.resolve(), path_dst) convert_utf8(path_dst) # Clean legacy table files once at the beginning if path_dst.suffix == ".txt": logger_supy.debug(f"Cleaning original file: {fileX.name}") clean_legacy_table(path_dst) # Indirect version conversion process # The alternation logic needs to account for starting position # Files start in tempDir_1, so first conversion should read from tempDir_1 while i > 1: logger_supy.info("**************************************************") logger_supy.info(f"working on: {chain_ver[i + 1]} --> {chain_ver[i]}") # Create snapshot directory for this step if in debug mode if debug_dir is not None: snapshot_dir = ( Path(dir_temp) / f"step_{chain_ver[i + 1]}_to_{chain_ver[i]}" ) snapshot_dir.mkdir(exist_ok=True) # Fix the alternation logic: if chain starts with even length, first step should be from temp1 # Original length is chain_ver[0], current step is i # If (original_length - i) is even, use temp1 -> temp2, else temp2 -> temp1 steps_completed = chain_ver[0] - i if steps_completed % 2 == 0: # Even number of steps completed (including 0), so temp1 -> temp2 SUEWS_Converter_single( tempDir_1, tempDir_2, chain_ver[i + 1], chain_ver[i] ) # Validate and fix profiles after conversion if enabled if validate_profiles: try: profile_manager = ProfileManager( tempDir_2 / "SUEWS_Profiles.txt" ) profile_manager.ensure_required_profiles(tempDir_2) if profile_manager.missing_profiles: logger_supy.info( f"Fixed {len(profile_manager.missing_profiles)} missing profile references: {sorted(profile_manager.missing_profiles)}" ) except Exception as e: logger_supy.warning(f"Profile validation skipped: {e}") # Save snapshot in debug mode if debug_dir is not None: for file in Path(tempDir_2).glob("*"): copyfile(file, snapshot_dir / file.name) logger_supy.info( f"Debug: Saved snapshot of {chain_ver[i]} in {snapshot_dir}" ) # Remove input temporary folders only if not in debug mode if debug_dir is None: rmtree(tempDir_1, ignore_errors=True) else: # In debug mode, preserve intermediate results logger_supy.info( f"Debug: Preserved intermediate files in {tempDir_2}" ) else: # Odd number of steps completed, so temp2 -> temp1 SUEWS_Converter_single( tempDir_2, tempDir_1, chain_ver[i + 1], chain_ver[i] ) # Validate and fix profiles after conversion if enabled if validate_profiles: try: profile_manager = ProfileManager( tempDir_1 / "SUEWS_Profiles.txt" ) profile_manager.ensure_required_profiles(tempDir_1) if profile_manager.missing_profiles: logger_supy.info( f"Fixed {len(profile_manager.missing_profiles)} missing profile references: {sorted(profile_manager.missing_profiles)}" ) except Exception as e: logger_supy.warning(f"Profile validation skipped: {e}") # Save snapshot in debug mode if debug_dir is not None: for file in Path(tempDir_1).glob("*"): copyfile(file, snapshot_dir / file.name) logger_supy.info( f"Debug: Saved snapshot of {chain_ver[i]} in {snapshot_dir}" ) # Remove input temporary folders only if not in debug mode if debug_dir is None: rmtree(tempDir_2, ignore_errors=True) else: # In debug mode, preserve intermediate results logger_supy.info( f"Debug: Preserved intermediate files in {tempDir_1}" ) logger_supy.info("**************************************************") i -= 1 logger_supy.info("**************************************************") logger_supy.info(f"working on: {chain_ver[i + 1]} --> {chain_ver[i]}") # Determine which temp directory has the final results # After the loop, we've completed (chain_ver[0] - 1) steps total_steps = chain_ver[0] - 1 if total_steps % 2 == 0: # Even number of steps means files are in tempDir_1 final_source = tempDir_1 else: # Odd number of steps means files are in tempDir_2 final_source = tempDir_2 SUEWS_Converter_single(final_source, toDir, chain_ver[2], chain_ver[1]) # Final profile validation if validate_profiles: try: profile_manager = ProfileManager( Path(toDir) / "input" / "SUEWS_Profiles.txt" ) profile_manager.ensure_required_profiles(Path(toDir) / "input") if profile_manager.missing_profiles: logger_supy.info( f"Final profile validation: Fixed {len(profile_manager.missing_profiles)} missing profiles" ) logger_supy.info( f"Missing profile codes: {sorted(profile_manager.missing_profiles)}" ) except Exception: # Try the toDir directly if input dir doesn't exist yet try: profile_manager = ProfileManager(Path(toDir) / "SUEWS_Profiles.txt") profile_manager.ensure_required_profiles(Path(toDir)) if profile_manager.missing_profiles: logger_supy.info( f"Final profile validation: Fixed {len(profile_manager.missing_profiles)} missing profiles" ) except Exception as e2: logger_supy.warning(f"Final profile validation skipped: {e2}") # Save final snapshot in debug mode if debug_dir is not None: snapshot_dir = ( Path(dir_temp) / f"step_{chain_ver[2]}_to_{chain_ver[1]}_final" ) snapshot_dir.mkdir(exist_ok=True) for file in Path(toDir).glob("*"): if file.is_file(): copyfile(file, snapshot_dir / file.name) logger_supy.info(f"Debug: Saved final snapshot in {snapshot_dir}") logger_supy.info("**************************************************") # Remove temporary folders unless in debug mode if debug_dir is None: rmtree(tempDir_1, ignore_errors=True) rmtree(tempDir_2, ignore_errors=True) # cleaning and move input tables into the `input` folder ser_nml = load_SUEWS_nml_simple(str(Path(toDir) / "RunControl.nml")).runcontrol path_input = (Path(toDir) / ser_nml["fileinputpath"]).resolve() path_output = (Path(toDir) / ser_nml["fileoutputpath"]).resolve() path_input.mkdir(exist_ok=True) path_output.mkdir(exist_ok=True) list_table_input = list(Path(toDir).glob("SUEWS*.txt")) + [ x for x in Path(toDir).glob("*.nml") if "RunControl" not in str(x) ] for fileX in list_table_input: # Check if we need to rename InitialConditions files when multipleinitfiles == 0 target_name = fileX.name if ( "InitialConditions" in fileX.name and ser_nml.get("multipleinitfiles", 0) == 0 ): # Remove grid number from filename (e.g., InitialConditionsKc1_2011.nml -> InitialConditionsKc_2011.nml) # Pattern to match InitialConditionsXXX#_YYYY.nml where XXX is filecode, # is grid number, YYYY is year pattern = r"(InitialConditions[A-Za-z]+)\d+(_\d{4}\.nml)" new_name = re.sub(pattern, r"\1\2", fileX.name) if new_name != fileX.name: target_name = new_name logger_supy.debug( f"Renaming {fileX.name} to {target_name} (multipleinitfiles=0)" ) move(fileX.resolve(), path_input / target_name) # Ensure expected grid layout file exists (legacy datasets may ship with mismatched names) if _requires_grid_layout(toVer): expected_grid = f"GridLayout{ser_nml['filecode']}.nml" path_expected_grid = path_input / expected_grid if not path_expected_grid.exists(): grid_candidates = sorted(path_input.glob("GridLayout*.nml")) normalized = expected_grid.lower() matched_candidate = next( ( candidate for candidate in grid_candidates if candidate.name.lower() == normalized ), None, ) if matched_candidate is not None: shutil.copy2(matched_candidate, path_expected_grid) logger_supy.info( f"Created {expected_grid} from {matched_candidate.name} to match RunControl filecode" ) elif grid_candidates: candidate_names = ", ".join( candidate.name for candidate in grid_candidates ) path_expected_grid.write_text(PLACEHOLDER_GRIDLAYOUT, encoding="utf-8") logger_supy.warning( f"Expected {expected_grid} but found non-matching GridLayout files ({candidate_names}); " f"created placeholder {expected_grid} instead of copying unrelated layout." ) else: path_expected_grid.write_text(PLACEHOLDER_GRIDLAYOUT, encoding="utf-8") logger_supy.info( f"No GridLayout*.nml files found for filecode {ser_nml['filecode']}; " f"created placeholder {expected_grid} with default geometry." )
# get file encoding type def get_encoding_type(file): with open(file, "rb") as f: rawdata = f.read() return detect(rawdata)["encoding"] def convert_utf8(file_src): path_src = Path(file_src).resolve() from_codec = get_encoding_type(path_src) logger_supy.debug(f"encoding {from_codec} detected in {path_src.name}") with TemporaryDirectory() as dir_temp: path_dst = Path(dir_temp) / "out-UTF8.txt" path_dst.touch() # add try: except block for reliability try: with ( open(path_src, encoding=from_codec) as f, open(path_dst, "w", encoding="utf-8") as e, ): text = f.read() # for small files, for big use chunks e.write(text) os.remove(path_src) # remove old encoding file try: path_dst.rename(path_src) except OSError as e: if e.errno == 18: logger_supy.error("Invalid cross-link device") shutil.copy2(path_dst, path_src) os.remove(path_dst) else: raise e # os.rename(trgfile, srcfile) # rename new encoding except UnicodeDecodeError: logger_supy.error("Decode Error") except UnicodeEncodeError: logger_supy.error("Encode Error")