Source code for supy.util._tmy

import logging
from pathlib import Path
from typing import Optional, Tuple, Union

import numpy as np
import pandas as pd

from .._post import resample_output

# Logger for this module
logger = logging.getLogger(__name__)


#################################################################
# generate TMY dataframe from supy results
# weight class to determine constants for TMY generation
class Const:
    class ConstError(TypeError):
        pass

    class ConstCaseError(ConstError):
        pass

    def __setattr__(self, name, value):
        if name in self.__dict__:
            raise self.ConstError("can't change const %s" % name)
        if not name.isupper():
            raise self.ConstCaseError('const name "%s" is not all uppercase' % name)
        self.__dict__[name] = value


def gen_score_list(length):
    list_score = (np.arange(length) + 0.5) / length
    return list_score


def gen_score_ser(ser_test):
    ser_score = ser_test.sort_values(ascending=True)
    length = ser_score.size
    list_score = (np.arange(length) + 0.5) / length
    ser_score.loc[:] = list_score
    return ser_score


def gen_FS_DF(df_output):
    """generate DataFrame of scores.

    Parameters
    ----------
    df_output

    Returns
    -------
    type
        Description of returned object.

    """
    df_day = pd.pivot_table(
        df_output,
        values=["T2", "U10", "Kdown", "RH2"],
        index=["Year", "Month", "Day"],
        aggfunc=[
            min,
            max,
            np.mean,
        ],
    )
    df_day_all_year = pd.pivot_table(
        df_output,
        values=["T2", "U10", "Kdown", "RH2"],
        index=["Month", "Day"],
        aggfunc=[
            min,
            max,
            np.mean,
        ],
    )

    array_yr_mon = df_day.index.droplevel("Day").to_frame().drop_duplicates().values

    df_fs = pd.DataFrame({
        (yr, mon): (
            df_day.loc[(yr, mon)].apply(gen_score_ser)
            - df_day_all_year.loc[mon].apply(gen_score_ser)
        )
        .abs()
        .mean()
        for yr, mon in array_yr_mon
    })

    return df_fs


def gen_WS_DF(df_met):
    """generate DataFrame of weighted sums of F-score.

    Parameters
    ----------
    df_met : pd.DataFrame
        A dataframe of meterological info that must include these columns/variables:
        - T2: near surface air temperature at 2 m agl
        - RH2: near surface relative humidity at 2 m agl
        - U10: near surface wind speed at 10 m agl
        - Kdown: incomidng solar radiation
        - Year: calendar year
        - Month: calendar month
        - Day: calendar day

    Returns
    -------
    pd.DataFrame
        Converted dataframe with calculated metrics for TMY generation.

    """
    df_fs = gen_FS_DF(df_met)

    list_index = [
        ("mean", "T2"),
        ("max", "T2"),
        ("min", "T2"),
        ("mean", "U10"),
        ("max", "U10"),
        ("min", "U10"),
        ("mean", "RH2"),
        ("max", "RH2"),
        ("min", "RH2"),
        ("mean", "Kdown"),
    ]

    # generate weights: Sandia method
    const = Const()

    const.T_MEAN = 2 / 24
    const.T_MAX = 1 / 24
    const.T_MIN = 1 / 24
    const.T_RANGE = 0  # todo
    const.RH_MEAN = 2 / 24
    const.RH_MAX = 1 / 24
    const.RH_MIN = 1 / 24
    const.RH_RANGE = 0  # todo
    const.WIND_MEAN = 2 / 24
    const.WIND_MAX = 2 / 24
    const.WIND_MIN = 0
    const.WIND_RANGE = 0  # todo
    const.WIND_DIRECTION = 0  # todo
    const.SOLAR_RADIATION_GLOBAL = 12 / 24
    const.SOLAR_RADIATION_DIRECT = 0  # todo

    list_const = [
        getattr(const, attr)
        for attr in [
            "T_MEAN",
            "T_MAX",
            "T_MIN",
            "WIND_MEAN",
            "WIND_MAX",
            "WIND_MIN",
            "RH_MEAN",
            "RH_MAX",
            "RH_MIN",
            "SOLAR_RADIATION_GLOBAL",
        ]
    ]
    list_ws = [df_fs.loc[idx] * cst for idx, cst in zip(list_index, list_const)]
    df_ws = pd.concat(list_ws, axis=1).sum(axis=1).unstack().dropna()

    return df_ws


def pick_year(df_output, n=1):
    # root mean square differences
    df_rmsd_mon = cal_rmsd_mon(df_output)

    # WS: weighted FS metric
    df_ws = gen_WS_DF(df_output)

    # years with smallest WS
    year_nsmallest = df_ws.apply(lambda ser: ser.nsmallest(n).index)

    # best candidate years for each month
    year_sel = df_rmsd_mon.apply(lambda ser: ser.loc[year_nsmallest[ser.name]]).idxmin()

    return year_sel


def cal_rmsd_mon(df_output):
    df_day = pd.pivot_table(
        df_output,
        values="Kdown",
        index=["Year", "Month", "Day"],
        aggfunc=[
            np.mean,
        ],
    )

    df_day_all_year = pd.pivot_table(
        df_output,
        values="Kdown",
        index=["Month", "Day"],
        aggfunc=[
            np.mean,
        ],
    )

    array_yr_mon = df_day.index.droplevel("Day").to_frame().drop_duplicates().values

    df_rmse = (
        pd.DataFrame({
            (yr, mon): np.sqrt(
                np.square(df_day.loc[(yr, mon)] - df_day_all_year.loc[mon]).mean()
            )
            for yr, mon in array_yr_mon
        })
        .stack()
        .T.dropna()
    )
    df_rmse.columns = df_rmse.columns.droplevel([0, 1])
    return df_rmse


# headers of standard EPW files
header_EPW = """
Year
Month
Day
Hour
Minute
Data Source and Uncertainty Flags
Dry Bulb Temperature
Dew Point Temperature
Relative Humidity
Atmospheric Station Pressure
Extraterrestrial Horizontal Radiation
Extraterrestrial Direct Normal Radiation
Horizontal Infrared Radiation Intensity
Global Horizontal Radiation
Direct Normal Radiation
Diffuse Horizontal Radiation
Global Horizontal Illuminance
Direct Normal Illuminance
Diffuse Horizontal Illuminance
Zenith Luminance
Wind Direction
Wind Speed
Total Sky Cover
Opaque Sky Cover
Visibility
Ceiling Height
Present Weather Observation
Present Weather Codes
Precipitable Water
Aerosol Optical Depth
Snow Depth
Days Since Last Snowfall
Albedo
Liquid Precipitation Depth
Liquid Precipitation Quantity
    """

# list of variables in EPW
list_var_EPW = header_EPW.split("\n")[1:-1]

# dict: SuPy variables -> EPW standard names
dict_supy_epw = {
    "Kdown": "Global Horizontal Radiation",
    "T2": "Dry Bulb Temperature",
    "RH2": "Relative Humidity",
    "U10": "Wind Speed",
}
dict_epw_supy = {v: k for k, v in dict_supy_epw.items()}


def gen_TMY(df_output):
    """generate TMY (typical meteorological year) from SuPy output.

    Parameters
    ----------
    df_output : pandas.DataFrame
        Output from `run_supy`: longterm (e.g., >10 years) simulation results, otherwise not very useful.

    """

    # calculate weighted score
    df_output_x = df_output.assign(
        Year=lambda df: df.index.year,
        Month=lambda df: df.index.month,
        Day=lambda df: df.index.day,
        Hour=lambda df: df.index.hour,
        Minute=lambda df: df.index.minute,
    )
    # df_ws = gen_WS_DF(df_output_x)

    # select year
    year_sel = pick_year(df_output_x, n=5)

    # convert `0h` to `24h` and take care of `day`: to follow EPW convention
    df_output_x = conv_0to24(df_output_x)

    # generate TMY data
    df_TMY = pd.concat([
        df_output_x.groupby(["Month", "Year"]).get_group(grp)
        for grp in year_sel.items()
    ])

    return df_TMY


def conv_0to24(df_TMY):
    # convert `0h` to `24h` and take care of `day`
    loc_24h = df_TMY.index == df_TMY.index.normalize()
    ser_24h = df_TMY.loc[loc_24h].index - pd.Timedelta("1h")
    df_TMY.loc[loc_24h, "Year"] = ser_24h.year
    df_TMY.loc[loc_24h, "Month"] = ser_24h.month
    df_TMY.loc[loc_24h, "Day"] = ser_24h.day
    df_TMY.loc[loc_24h, "Hour"] = 24
    return df_TMY


# function to read in EPW file
[docs] def read_epw( path_epw: Path, target_height: float = 10.0, z0m: float = 0.1, ) -> pd.DataFrame: """Read in EPW (EnergyPlus Weather) file as a DataFrame. Parameters ---------- path_epw : Path Path to EPW file. target_height : float, optional Target height for wind speed extrapolation [m]. EPW files contain wind speed at 10 m agl. If target_height differs from 10 m, the wind speed will be adjusted using a logarithmic wind profile. Default is 10.0 (no correction applied). z0m : float, optional Roughness length for momentum [m], used in wind profile correction. Typical values: 0.01 (open water), 0.1 (grassland), 0.5-2.0 (urban). Default is 0.1. Returns ------- df_tmy : pd.DataFrame DataFrame containing weather data with columns named according to EPW standard variable names. Notes ----- **Measurement Height Assumptions** EPW files follow standard meteorological station conventions: - **Wind Speed**: 10 m above ground level (agl) - **Temperature and Humidity**: 2 m agl (screen height) When using EPW data with SUEWS, ensure the forcing height parameter ``z`` in your site configuration matches these heights. For EPW data, set ``z = 10`` to match the wind speed measurement height. **Wind Speed Height Correction** If ``target_height != 10.0``, the wind speed is adjusted using the logarithmic wind profile (assuming neutral atmospheric conditions): .. math:: U(z_2) = U(z_1) \\frac{\\ln((z_2 + z_0) / z_0)}{\\ln((z_1 + z_0) / z_0)} where :math:`z_1 = 10` m (EPW height), :math:`z_2` is the target height, and :math:`z_0` is the roughness length. .. warning:: The log-law profile assumes neutral atmospheric stability. Under strongly stable or unstable conditions, actual wind profiles may differ significantly from this approximation. See Also -------- gen_epw : Generate EPW file from SUEWS simulation output. supy.util.gen_forcing_era5 : Generate forcing from ERA5 (extrapolated to user-specified height). Examples -------- >>> import supy as sp >>> from pathlib import Path >>> >>> # Read EPW file without height correction (default) >>> df_epw = sp.util.read_epw(Path("weather.epw")) >>> >>> # Read EPW file and extrapolate wind speed to 50 m >>> df_epw = sp.util.read_epw( ... Path("weather.epw"), ... target_height=50.0, ... z0m=0.5, # urban roughness length ... ) """ # Input validation if z0m <= 0: raise ValueError(f"z0m must be positive, got {z0m}") if target_height <= 0: raise ValueError(f"target_height must be positive, got {target_height}") df_tmy = pd.read_csv(path_epw, skiprows=8, sep=",", header=None) df_tmy.columns = [x.strip() for x in header_EPW.split("\n")[1:-1]] df_tmy["DateTime"] = pd.to_datetime( pd.to_datetime( df_tmy["Year"] * 10000 + df_tmy["Month"] * 100 + df_tmy["Day"], format="%Y%m%d", ) + pd.to_timedelta(df_tmy["Hour"], unit="h") ) df_tmy = df_tmy.set_index("DateTime") # Apply wind speed height correction if target height differs from EPW standard (10 m) epw_height = 10.0 if not np.isclose(target_height, epw_height): logger.warning( f"Applying wind speed height correction from {epw_height}m to {target_height}m " f"using log-law profile (assumes neutral atmospheric conditions). " f"This approximation may be less accurate under strongly stable or unstable conditions." ) # Log-law wind profile correction (neutral conditions) correction_factor = np.log((target_height + z0m) / z0m) / np.log( (epw_height + z0m) / z0m ) df_tmy["Wind Speed"] = df_tmy["Wind Speed"] * correction_factor return df_tmy
# generate EPW file from `df_TMY`
[docs] def gen_epw( df_output: pd.DataFrame, lat: float, lon: float, tz: float = 0, path_epw: Union[str, Path] = Path("./uTMY.epw"), freq: Optional[str] = None, grid: Optional[int] = None, ) -> Tuple[pd.DataFrame, str, Path]: """Generate an ``epw`` file of uTMY (urbanised Typical Meteorological Year) using SUEWS simulation results. Parameters ---------- df_output : pandas.DataFrame SUEWS simulation results. Can be either: - Full MultiIndex output from `run_supy` (grid, datetime) x (group, var) - Pre-extracted single-grid SUEWS output (datetime) x (var) lat : float Latitude of the site, used for calculating solar angle. lon : float Longitude of the site, used for calculating solar angle. tz : float, optional Time zone represented by time difference from UTC+0 (e.g., 8 for UTC+8), by default 0 (i.e., UTC+0). path_epw : pathlib.Path or str, optional Path to store generated epw file, by default Path('./uTMY.epw'). freq : str, optional Target frequency for resampling (e.g., 'h', '60min', '1h'). If provided, the output is resampled before EPW generation using variable-appropriate aggregation methods. Recommended for sub-hourly simulation output. Default is None (no resampling). grid : int, optional Grid number to extract if df_output has MultiIndex (grid, datetime). If not provided and MultiIndex detected, uses the first grid. Returns ------- tuple - df_epw: pandas.DataFrame - uTMY result - text_meta: str - meta-info text - path_epw: pathlib.Path - path to generated ``epw`` file Raises ------ ImportError If pvlib is not installed. Install with: pip install pvlib Notes ----- This function requires pvlib for solar position and irradiance calculations. pvlib is not included as a required dependency due to its h5py requirement which can cause build issues on some platforms. Examples -------- Basic usage with pre-extracted data: >>> df_epw, meta, path = sp.util.gen_epw( ... df_output.loc[grid, "SUEWS"], lat=51.5, lon=-0.1 ... ) With automatic resampling and grid extraction: >>> df_epw, meta, path = sp.util.gen_epw( ... df_output, # Full MultiIndex output from run_supy ... lat=51.5, ... lon=-0.1, ... freq="h", # Resample to hourly ... ) See Also -------- supy.resample_output : Resample output with variable-appropriate aggregation """ import atmosp from pathlib import Path try: import pvlib except ImportError: raise ImportError( "TMY/EPW generation requires pvlib. Install it with: pip install pvlib\n" "Note: pvlib requires h5py which may need compilation on some systems." ) # Handle MultiIndex input from run_supy if isinstance(df_output.index, pd.MultiIndex): # Extract grid if needed if grid is None: grid = df_output.index.get_level_values("grid").unique()[0] # Resample if frequency specified (before extracting to single grid) if freq is not None: df_output = resample_output(df_output, freq=freq, _internal=True) # Extract SUEWS group for the specified grid if isinstance(df_output.columns, pd.MultiIndex): groups = df_output.columns.get_level_values("group").unique() if "SUEWS" in groups: df_output = df_output.loc[grid, "SUEWS"] else: df_output = df_output.loc[grid] else: df_output = df_output.loc[grid] elif freq is not None: # Single-grid input with freq specified - use simple resampling # For single-grid SUEWS output, variables are typically averaged df_output = df_output.resample(freq, closed="right", label="right").mean() # select months from representative years df_tmy = gen_TMY(df_output.copy()) # assign timezone info df_tmy.index = df_tmy.index.tz_localize(tz * 3600) # adding necessary variables that can be derive from supy output df_tmy["Dew Point Temperature"] = ( atmosp.calculate( "Td", T=df_tmy["T2"].values + 273.15, qv=df_tmy["Q2"].values, qv_unit="g/kg", RH=df_tmy["RH2"].values, rho=1.23, ) - 273.15 ) df_tmy["Atmospheric Station Pressure"] = atmosp.calculate( "p", T=df_tmy["T2"].values + 273.15, qv=df_tmy["Q2"].values, qv_unit="g/kg", RH=df_tmy["RH2"].values, rho=1.23, ) # processing solar radiation components df_tmy.loc[df_tmy["Kdown"] < 0.001, "Kdown"] = 0 # =================================================================== # relationship of solar radiation components: # GHI = DHI + DNI * cos (Z) # GHI: global horizontal irridiance # DHI: diffuse horizontal irridiance # DNI: direct normal irridiance # cos(Z): cosine of solar zenith angle # transfer simulated Kdown to GHI GHI = df_tmy["Kdown"] # global horizontal radiation df_tmy["Global Horizontal Radiation"] = GHI # solar zenith angle solar_zenith_deg = pvlib.solarposition.get_solarposition( df_tmy.index, lat, lon ).zenith # direct normal radaition # Determine DNI from GHI using the DIRINT modification of the DISC model. DNI = pvlib.irradiance.dirint( ghi=df_tmy["Global Horizontal Radiation"], times=df_tmy.index, solar_zenith=solar_zenith_deg, pressure=df_tmy["Atmospheric Station Pressure"], temp_dew=df_tmy["Dew Point Temperature"], use_delta_kt_prime=True, ).replace(np.nan, 0) df_tmy["Direct Normal Radiation"] = DNI.values # diffuse horizontal radiation # DHI = GHI - DNI * cos (Z) df_tmy["Diffuse Horizontal Radiation"] = GHI - DNI * np.cos( solar_zenith_deg * np.pi / 180 ) # end: solar radiation processing # =================================================================== # horizontal infrared radiation df_tmy["Horizontal Infrared Radiation Intensity"] = df_tmy["Ldown"] # conform column names to EPW standard df_TMY_x = df_tmy.rename(columns=dict_supy_epw) # initialise df_epw for EPW output df_epw = pd.DataFrame(columns=list_var_EPW, index=df_tmy.index) # dict of default values dict_var_dft = { "Data Source and Uncertainty Flags": -9992, "Extraterrestrial Horizontal Radiation": 9999, "Extraterrestrial Direct Normal Radiation": 9999, "Horizontal Infrared Radiation Intensity": 9999, "Direct Normal Radiation": 9999, "Global Horizontal Illuminance": 9999999, "Direct Normal Illuminance": 9999999, "Diffuse Horizontal Illuminance": 9999999, "Zenith Luminance": 9999, "Wind Direction": 999, "Total Sky Cover": 99, "Opaque Sky Cover": 99, "Visibility": 9999, "Ceiling Height": 99999, "Present Weather Observation": 9999, "Present Weather Codes": 9999, "Precipitable Water": 999, "Aerosol Optical Depth": 999, "Snow Depth": 999, "Days Since Last Snowfall": 99, "Albedo": 999, "Liquid Precipitation Depth": 999, "Liquid Precipitation Quantity": 999, } for var in list_var_EPW: try: df_epw[var] = df_TMY_x[var].values except: # print(f'{var} not existing! This variable will be filled with default value {dict_var_dft[var]}') try: df_epw[var] = np.ones(len(df_epw)) * dict_var_dft[var] except: df_epw[var] = np.nan # fill 'Data Source and Uncertainty Flags' df_epw["Data Source and Uncertainty Flags"] = ( "?9?9?9?9E0?9?9?9*9*9?9*9*9?9*9*9?9?9*9*_*9*9*9*9*9" ) # df_epw["Global Horizontal Radiation"] = np.ones(len(df_epw)) * 9999 df_epw.index = df_TMY_x.index df_epw = df_epw.sort_values(["Month", "Day", "Hour"], axis=0) # save pure data to a csv for formatting path_epw = Path(path_epw) if not path_epw.parent.exists(): path_epw.parent.mkdir(parents=True) path_epw.touch(exist_ok=True) df_epw.to_csv(path_epw, index=None, header=None) text_data = path_epw.read_text().split("\n") # delete the csv file path_epw.unlink() text_meta = """ LOCATION,Chongqing Shapingba,Chongqing,CHN,CSWD,575160,lat,lon,tz,259.1 DESIGN CONDITIONS,1,Climate Design Data 2009 ASHRAE Handbook,,Heating,1,3.2,4.2,-0.2,3.8,6.5,1.3,4.3,6.2,4.9,7.6,4.3,7.5,1.4,0,Cooling,7,7.4,36.9,25.6,35.5,25.6,34.2,25.4,27.4,32.7,26.9,32.2,26.4,31.6,2.5,110,26.1,22.2,30.2,25.6,21.5,29.8,25.1,20.8,29.3,89.3,32.7,86.9,32.5,84.7,31.7,909,Extremes,5.1,4.3,3.6,35.4,1.1,38.8,1.3,1.6,0.1,40,-0.6,40.9,-1.4,41.8,-2.3,43 TYPICAL/EXTREME PERIODS,6,Summer - Week Nearest Max Temperature For Period,Extreme,7/27,8/ 2,Summer - Week Nearest Average Temperature For Period,Typical,7/ 6,7/12,Winter - Week Nearest Min Temperature For Period,Extreme,12/22,1/ 5,Winter - Week Nearest Average Temperature For Period,Typical,1/13,1/19,Autumn - Week Nearest Average Temperature For Period,Typical,10/13,10/19,Spring - Week Nearest Average Temperature For Period,Typical,4/12,4/18 GROUND TEMPERATURES,3,.5,,,,13.31,10.23,9.39,10.12,14.28,18.95,23.34,26.51,27.44,25.95,22.35,17.82,2,,,,16.09,13.20,11.82,11.77,13.97,17.16,20.59,23.54,25.06,24.77,22.74,19.63,4,,,,17.90,15.65,14.27,13.85,14.66,16.52,18.83,21.09,22.62,22.98,22.11,20.29 HOLIDAYS/DAYLIGHT SAVINGS,No,0,0,0 COMMENTS 1, generated by SuPy COMMENTS 2, none DATA PERIODS,1,1,Data,Sunday,1/1,12/31 """ text_meta = text_meta.split("\n")[1:-1] # change the default latitude, longuitude and timezone from args, which will be used to calculate solar position and received direct solar radiation at each surface in EnergyPlus text_meta[0] = text_meta[0].replace("lat", str(lat)) text_meta[0] = text_meta[0].replace("lon", str(lon)) text_meta[0] = text_meta[0].replace("tz", str(tz)) # lines = [] text_epw = "\n".join(text_meta + text_data) # with open(path_epw, 'r') as f: # for line in f: # lines.append(line) # lines.insert(0, text_meta[1:]) # s = ''.join(lines) # write out the actual EPW file path_epw.write_text(text_epw) # with open(path_epw, "w") as fp: # fp.write(text_epw) return df_epw, text_meta, path_epw