import logging
from pathlib import Path
from typing import Optional, Tuple, Union
import numpy as np
import pandas as pd
from .._post import resample_output
# Logger for this module
logger = logging.getLogger(__name__)
#################################################################
# generate TMY dataframe from supy results
# weight class to determine constants for TMY generation
class Const:
class ConstError(TypeError):
pass
class ConstCaseError(ConstError):
pass
def __setattr__(self, name, value):
if name in self.__dict__:
raise self.ConstError("can't change const %s" % name)
if not name.isupper():
raise self.ConstCaseError('const name "%s" is not all uppercase' % name)
self.__dict__[name] = value
def gen_score_list(length):
list_score = (np.arange(length) + 0.5) / length
return list_score
def gen_score_ser(ser_test):
ser_score = ser_test.sort_values(ascending=True)
length = ser_score.size
list_score = (np.arange(length) + 0.5) / length
ser_score.loc[:] = list_score
return ser_score
def gen_FS_DF(df_output):
"""generate DataFrame of scores.
Parameters
----------
df_output
Returns
-------
type
Description of returned object.
"""
df_day = pd.pivot_table(
df_output,
values=["T2", "U10", "Kdown", "RH2"],
index=["Year", "Month", "Day"],
aggfunc=[
min,
max,
np.mean,
],
)
df_day_all_year = pd.pivot_table(
df_output,
values=["T2", "U10", "Kdown", "RH2"],
index=["Month", "Day"],
aggfunc=[
min,
max,
np.mean,
],
)
array_yr_mon = df_day.index.droplevel("Day").to_frame().drop_duplicates().values
df_fs = pd.DataFrame({
(yr, mon): (
df_day.loc[(yr, mon)].apply(gen_score_ser)
- df_day_all_year.loc[mon].apply(gen_score_ser)
)
.abs()
.mean()
for yr, mon in array_yr_mon
})
return df_fs
def gen_WS_DF(df_met):
"""generate DataFrame of weighted sums of F-score.
Parameters
----------
df_met : pd.DataFrame
A dataframe of meterological info that must include these columns/variables:
- T2: near surface air temperature at 2 m agl
- RH2: near surface relative humidity at 2 m agl
- U10: near surface wind speed at 10 m agl
- Kdown: incomidng solar radiation
- Year: calendar year
- Month: calendar month
- Day: calendar day
Returns
-------
pd.DataFrame
Converted dataframe with calculated metrics for TMY generation.
"""
df_fs = gen_FS_DF(df_met)
list_index = [
("mean", "T2"),
("max", "T2"),
("min", "T2"),
("mean", "U10"),
("max", "U10"),
("min", "U10"),
("mean", "RH2"),
("max", "RH2"),
("min", "RH2"),
("mean", "Kdown"),
]
# generate weights: Sandia method
const = Const()
const.T_MEAN = 2 / 24
const.T_MAX = 1 / 24
const.T_MIN = 1 / 24
const.T_RANGE = 0 # todo
const.RH_MEAN = 2 / 24
const.RH_MAX = 1 / 24
const.RH_MIN = 1 / 24
const.RH_RANGE = 0 # todo
const.WIND_MEAN = 2 / 24
const.WIND_MAX = 2 / 24
const.WIND_MIN = 0
const.WIND_RANGE = 0 # todo
const.WIND_DIRECTION = 0 # todo
const.SOLAR_RADIATION_GLOBAL = 12 / 24
const.SOLAR_RADIATION_DIRECT = 0 # todo
list_const = [
getattr(const, attr)
for attr in [
"T_MEAN",
"T_MAX",
"T_MIN",
"WIND_MEAN",
"WIND_MAX",
"WIND_MIN",
"RH_MEAN",
"RH_MAX",
"RH_MIN",
"SOLAR_RADIATION_GLOBAL",
]
]
list_ws = [df_fs.loc[idx] * cst for idx, cst in zip(list_index, list_const)]
df_ws = pd.concat(list_ws, axis=1).sum(axis=1).unstack().dropna()
return df_ws
def pick_year(df_output, n=1):
# root mean square differences
df_rmsd_mon = cal_rmsd_mon(df_output)
# WS: weighted FS metric
df_ws = gen_WS_DF(df_output)
# years with smallest WS
year_nsmallest = df_ws.apply(lambda ser: ser.nsmallest(n).index)
# best candidate years for each month
year_sel = df_rmsd_mon.apply(lambda ser: ser.loc[year_nsmallest[ser.name]]).idxmin()
return year_sel
def cal_rmsd_mon(df_output):
df_day = pd.pivot_table(
df_output,
values="Kdown",
index=["Year", "Month", "Day"],
aggfunc=[
np.mean,
],
)
df_day_all_year = pd.pivot_table(
df_output,
values="Kdown",
index=["Month", "Day"],
aggfunc=[
np.mean,
],
)
array_yr_mon = df_day.index.droplevel("Day").to_frame().drop_duplicates().values
df_rmse = (
pd.DataFrame({
(yr, mon): np.sqrt(
np.square(df_day.loc[(yr, mon)] - df_day_all_year.loc[mon]).mean()
)
for yr, mon in array_yr_mon
})
.stack()
.T.dropna()
)
df_rmse.columns = df_rmse.columns.droplevel([0, 1])
return df_rmse
# headers of standard EPW files
header_EPW = """
Year
Month
Day
Hour
Minute
Data Source and Uncertainty Flags
Dry Bulb Temperature
Dew Point Temperature
Relative Humidity
Atmospheric Station Pressure
Extraterrestrial Horizontal Radiation
Extraterrestrial Direct Normal Radiation
Horizontal Infrared Radiation Intensity
Global Horizontal Radiation
Direct Normal Radiation
Diffuse Horizontal Radiation
Global Horizontal Illuminance
Direct Normal Illuminance
Diffuse Horizontal Illuminance
Zenith Luminance
Wind Direction
Wind Speed
Total Sky Cover
Opaque Sky Cover
Visibility
Ceiling Height
Present Weather Observation
Present Weather Codes
Precipitable Water
Aerosol Optical Depth
Snow Depth
Days Since Last Snowfall
Albedo
Liquid Precipitation Depth
Liquid Precipitation Quantity
"""
# list of variables in EPW
list_var_EPW = header_EPW.split("\n")[1:-1]
# dict: SuPy variables -> EPW standard names
dict_supy_epw = {
"Kdown": "Global Horizontal Radiation",
"T2": "Dry Bulb Temperature",
"RH2": "Relative Humidity",
"U10": "Wind Speed",
}
dict_epw_supy = {v: k for k, v in dict_supy_epw.items()}
def gen_TMY(df_output):
"""generate TMY (typical meteorological year) from SuPy output.
Parameters
----------
df_output : pandas.DataFrame
Output from `run_supy`: longterm (e.g., >10 years) simulation results, otherwise not very useful.
"""
# calculate weighted score
df_output_x = df_output.assign(
Year=lambda df: df.index.year,
Month=lambda df: df.index.month,
Day=lambda df: df.index.day,
Hour=lambda df: df.index.hour,
Minute=lambda df: df.index.minute,
)
# df_ws = gen_WS_DF(df_output_x)
# select year
year_sel = pick_year(df_output_x, n=5)
# convert `0h` to `24h` and take care of `day`: to follow EPW convention
df_output_x = conv_0to24(df_output_x)
# generate TMY data
df_TMY = pd.concat([
df_output_x.groupby(["Month", "Year"]).get_group(grp)
for grp in year_sel.items()
])
return df_TMY
def conv_0to24(df_TMY):
# convert `0h` to `24h` and take care of `day`
loc_24h = df_TMY.index == df_TMY.index.normalize()
ser_24h = df_TMY.loc[loc_24h].index - pd.Timedelta("1h")
df_TMY.loc[loc_24h, "Year"] = ser_24h.year
df_TMY.loc[loc_24h, "Month"] = ser_24h.month
df_TMY.loc[loc_24h, "Day"] = ser_24h.day
df_TMY.loc[loc_24h, "Hour"] = 24
return df_TMY
# function to read in EPW file
[docs]
def read_epw(
path_epw: Path,
target_height: float = 10.0,
z0m: float = 0.1,
) -> pd.DataFrame:
"""Read in EPW (EnergyPlus Weather) file as a DataFrame.
Parameters
----------
path_epw : Path
Path to EPW file.
target_height : float, optional
Target height for wind speed extrapolation [m]. EPW files contain
wind speed at 10 m agl. If target_height differs from 10 m, the
wind speed will be adjusted using a logarithmic wind profile.
Default is 10.0 (no correction applied).
z0m : float, optional
Roughness length for momentum [m], used in wind profile correction.
Typical values: 0.01 (open water), 0.1 (grassland), 0.5-2.0 (urban).
Default is 0.1.
Returns
-------
df_tmy : pd.DataFrame
DataFrame containing weather data with columns named according
to EPW standard variable names.
Notes
-----
**Measurement Height Assumptions**
EPW files follow standard meteorological station conventions:
- **Wind Speed**: 10 m above ground level (agl)
- **Temperature and Humidity**: 2 m agl (screen height)
When using EPW data with SUEWS, ensure the forcing height parameter
``z`` in your site configuration matches these heights. For EPW data,
set ``z = 10`` to match the wind speed measurement height.
**Wind Speed Height Correction**
If ``target_height != 10.0``, the wind speed is adjusted using the
logarithmic wind profile (assuming neutral atmospheric conditions):
.. math::
U(z_2) = U(z_1) \\frac{\\ln((z_2 + z_0) / z_0)}{\\ln((z_1 + z_0) / z_0)}
where :math:`z_1 = 10` m (EPW height), :math:`z_2` is the target height,
and :math:`z_0` is the roughness length.
.. warning::
The log-law profile assumes neutral atmospheric stability. Under
strongly stable or unstable conditions, actual wind profiles may
differ significantly from this approximation.
See Also
--------
gen_epw : Generate EPW file from SUEWS simulation output.
supy.util.gen_forcing_era5 : Generate forcing from ERA5 (extrapolated
to user-specified height).
Examples
--------
>>> import supy as sp
>>> from pathlib import Path
>>>
>>> # Read EPW file without height correction (default)
>>> df_epw = sp.util.read_epw(Path("weather.epw"))
>>>
>>> # Read EPW file and extrapolate wind speed to 50 m
>>> df_epw = sp.util.read_epw(
... Path("weather.epw"),
... target_height=50.0,
... z0m=0.5, # urban roughness length
... )
"""
# Input validation
if z0m <= 0:
raise ValueError(f"z0m must be positive, got {z0m}")
if target_height <= 0:
raise ValueError(f"target_height must be positive, got {target_height}")
df_tmy = pd.read_csv(path_epw, skiprows=8, sep=",", header=None)
df_tmy.columns = [x.strip() for x in header_EPW.split("\n")[1:-1]]
df_tmy["DateTime"] = pd.to_datetime(
pd.to_datetime(
df_tmy["Year"] * 10000 + df_tmy["Month"] * 100 + df_tmy["Day"],
format="%Y%m%d",
)
+ pd.to_timedelta(df_tmy["Hour"], unit="h")
)
df_tmy = df_tmy.set_index("DateTime")
# Apply wind speed height correction if target height differs from EPW standard (10 m)
epw_height = 10.0
if not np.isclose(target_height, epw_height):
logger.warning(
f"Applying wind speed height correction from {epw_height}m to {target_height}m "
f"using log-law profile (assumes neutral atmospheric conditions). "
f"This approximation may be less accurate under strongly stable or unstable conditions."
)
# Log-law wind profile correction (neutral conditions)
correction_factor = np.log((target_height + z0m) / z0m) / np.log(
(epw_height + z0m) / z0m
)
df_tmy["Wind Speed"] = df_tmy["Wind Speed"] * correction_factor
return df_tmy
# generate EPW file from `df_TMY`
[docs]
def gen_epw(
df_output: pd.DataFrame,
lat: float,
lon: float,
tz: float = 0,
path_epw: Union[str, Path] = Path("./uTMY.epw"),
freq: Optional[str] = None,
grid: Optional[int] = None,
) -> Tuple[pd.DataFrame, str, Path]:
"""Generate an ``epw`` file of uTMY (urbanised Typical Meteorological Year) using SUEWS simulation results.
Parameters
----------
df_output : pandas.DataFrame
SUEWS simulation results. Can be either:
- Full MultiIndex output from `run_supy` (grid, datetime) x (group, var)
- Pre-extracted single-grid SUEWS output (datetime) x (var)
lat : float
Latitude of the site, used for calculating solar angle.
lon : float
Longitude of the site, used for calculating solar angle.
tz : float, optional
Time zone represented by time difference from UTC+0 (e.g., 8 for UTC+8),
by default 0 (i.e., UTC+0).
path_epw : pathlib.Path or str, optional
Path to store generated epw file, by default Path('./uTMY.epw').
freq : str, optional
Target frequency for resampling (e.g., 'h', '60min', '1h').
If provided, the output is resampled before EPW generation using
variable-appropriate aggregation methods.
Recommended for sub-hourly simulation output. Default is None (no resampling).
grid : int, optional
Grid number to extract if df_output has MultiIndex (grid, datetime).
If not provided and MultiIndex detected, uses the first grid.
Returns
-------
tuple
- df_epw: pandas.DataFrame - uTMY result
- text_meta: str - meta-info text
- path_epw: pathlib.Path - path to generated ``epw`` file
Raises
------
ImportError
If pvlib is not installed. Install with: pip install pvlib
Notes
-----
This function requires pvlib for solar position and irradiance calculations.
pvlib is not included as a required dependency due to its h5py requirement
which can cause build issues on some platforms.
Examples
--------
Basic usage with pre-extracted data:
>>> df_epw, meta, path = sp.util.gen_epw(
... df_output.loc[grid, "SUEWS"], lat=51.5, lon=-0.1
... )
With automatic resampling and grid extraction:
>>> df_epw, meta, path = sp.util.gen_epw(
... df_output, # Full MultiIndex output from run_supy
... lat=51.5,
... lon=-0.1,
... freq="h", # Resample to hourly
... )
See Also
--------
supy.resample_output : Resample output with variable-appropriate aggregation
"""
import atmosp
from pathlib import Path
try:
import pvlib
except ImportError:
raise ImportError(
"TMY/EPW generation requires pvlib. Install it with: pip install pvlib\n"
"Note: pvlib requires h5py which may need compilation on some systems."
)
# Handle MultiIndex input from run_supy
if isinstance(df_output.index, pd.MultiIndex):
# Extract grid if needed
if grid is None:
grid = df_output.index.get_level_values("grid").unique()[0]
# Resample if frequency specified (before extracting to single grid)
if freq is not None:
df_output = resample_output(df_output, freq=freq, _internal=True)
# Extract SUEWS group for the specified grid
if isinstance(df_output.columns, pd.MultiIndex):
groups = df_output.columns.get_level_values("group").unique()
if "SUEWS" in groups:
df_output = df_output.loc[grid, "SUEWS"]
else:
df_output = df_output.loc[grid]
else:
df_output = df_output.loc[grid]
elif freq is not None:
# Single-grid input with freq specified - use simple resampling
# For single-grid SUEWS output, variables are typically averaged
df_output = df_output.resample(freq, closed="right", label="right").mean()
# select months from representative years
df_tmy = gen_TMY(df_output.copy())
# assign timezone info
df_tmy.index = df_tmy.index.tz_localize(tz * 3600)
# adding necessary variables that can be derive from supy output
df_tmy["Dew Point Temperature"] = (
atmosp.calculate(
"Td",
T=df_tmy["T2"].values + 273.15,
qv=df_tmy["Q2"].values,
qv_unit="g/kg",
RH=df_tmy["RH2"].values,
rho=1.23,
)
- 273.15
)
df_tmy["Atmospheric Station Pressure"] = atmosp.calculate(
"p",
T=df_tmy["T2"].values + 273.15,
qv=df_tmy["Q2"].values,
qv_unit="g/kg",
RH=df_tmy["RH2"].values,
rho=1.23,
)
# processing solar radiation components
df_tmy.loc[df_tmy["Kdown"] < 0.001, "Kdown"] = 0
# ===================================================================
# relationship of solar radiation components:
# GHI = DHI + DNI * cos (Z)
# GHI: global horizontal irridiance
# DHI: diffuse horizontal irridiance
# DNI: direct normal irridiance
# cos(Z): cosine of solar zenith angle
# transfer simulated Kdown to GHI
GHI = df_tmy["Kdown"]
# global horizontal radiation
df_tmy["Global Horizontal Radiation"] = GHI
# solar zenith angle
solar_zenith_deg = pvlib.solarposition.get_solarposition(
df_tmy.index, lat, lon
).zenith
# direct normal radaition
# Determine DNI from GHI using the DIRINT modification of the DISC model.
DNI = pvlib.irradiance.dirint(
ghi=df_tmy["Global Horizontal Radiation"],
times=df_tmy.index,
solar_zenith=solar_zenith_deg,
pressure=df_tmy["Atmospheric Station Pressure"],
temp_dew=df_tmy["Dew Point Temperature"],
use_delta_kt_prime=True,
).replace(np.nan, 0)
df_tmy["Direct Normal Radiation"] = DNI.values
# diffuse horizontal radiation
# DHI = GHI - DNI * cos (Z)
df_tmy["Diffuse Horizontal Radiation"] = GHI - DNI * np.cos(
solar_zenith_deg * np.pi / 180
)
# end: solar radiation processing
# ===================================================================
# horizontal infrared radiation
df_tmy["Horizontal Infrared Radiation Intensity"] = df_tmy["Ldown"]
# conform column names to EPW standard
df_TMY_x = df_tmy.rename(columns=dict_supy_epw)
# initialise df_epw for EPW output
df_epw = pd.DataFrame(columns=list_var_EPW, index=df_tmy.index)
# dict of default values
dict_var_dft = {
"Data Source and Uncertainty Flags": -9992,
"Extraterrestrial Horizontal Radiation": 9999,
"Extraterrestrial Direct Normal Radiation": 9999,
"Horizontal Infrared Radiation Intensity": 9999,
"Direct Normal Radiation": 9999,
"Global Horizontal Illuminance": 9999999,
"Direct Normal Illuminance": 9999999,
"Diffuse Horizontal Illuminance": 9999999,
"Zenith Luminance": 9999,
"Wind Direction": 999,
"Total Sky Cover": 99,
"Opaque Sky Cover": 99,
"Visibility": 9999,
"Ceiling Height": 99999,
"Present Weather Observation": 9999,
"Present Weather Codes": 9999,
"Precipitable Water": 999,
"Aerosol Optical Depth": 999,
"Snow Depth": 999,
"Days Since Last Snowfall": 99,
"Albedo": 999,
"Liquid Precipitation Depth": 999,
"Liquid Precipitation Quantity": 999,
}
for var in list_var_EPW:
try:
df_epw[var] = df_TMY_x[var].values
except:
# print(f'{var} not existing! This variable will be filled with default value {dict_var_dft[var]}')
try:
df_epw[var] = np.ones(len(df_epw)) * dict_var_dft[var]
except:
df_epw[var] = np.nan
# fill 'Data Source and Uncertainty Flags'
df_epw["Data Source and Uncertainty Flags"] = (
"?9?9?9?9E0?9?9?9*9*9?9*9*9?9*9*9?9?9*9*_*9*9*9*9*9"
)
# df_epw["Global Horizontal Radiation"] = np.ones(len(df_epw)) * 9999
df_epw.index = df_TMY_x.index
df_epw = df_epw.sort_values(["Month", "Day", "Hour"], axis=0)
# save pure data to a csv for formatting
path_epw = Path(path_epw)
if not path_epw.parent.exists():
path_epw.parent.mkdir(parents=True)
path_epw.touch(exist_ok=True)
df_epw.to_csv(path_epw, index=None, header=None)
text_data = path_epw.read_text().split("\n")
# delete the csv file
path_epw.unlink()
text_meta = """
LOCATION,Chongqing Shapingba,Chongqing,CHN,CSWD,575160,lat,lon,tz,259.1
DESIGN CONDITIONS,1,Climate Design Data 2009 ASHRAE Handbook,,Heating,1,3.2,4.2,-0.2,3.8,6.5,1.3,4.3,6.2,4.9,7.6,4.3,7.5,1.4,0,Cooling,7,7.4,36.9,25.6,35.5,25.6,34.2,25.4,27.4,32.7,26.9,32.2,26.4,31.6,2.5,110,26.1,22.2,30.2,25.6,21.5,29.8,25.1,20.8,29.3,89.3,32.7,86.9,32.5,84.7,31.7,909,Extremes,5.1,4.3,3.6,35.4,1.1,38.8,1.3,1.6,0.1,40,-0.6,40.9,-1.4,41.8,-2.3,43
TYPICAL/EXTREME PERIODS,6,Summer - Week Nearest Max Temperature For Period,Extreme,7/27,8/ 2,Summer - Week Nearest Average Temperature For Period,Typical,7/ 6,7/12,Winter - Week Nearest Min Temperature For Period,Extreme,12/22,1/ 5,Winter - Week Nearest Average Temperature For Period,Typical,1/13,1/19,Autumn - Week Nearest Average Temperature For Period,Typical,10/13,10/19,Spring - Week Nearest Average Temperature For Period,Typical,4/12,4/18
GROUND TEMPERATURES,3,.5,,,,13.31,10.23,9.39,10.12,14.28,18.95,23.34,26.51,27.44,25.95,22.35,17.82,2,,,,16.09,13.20,11.82,11.77,13.97,17.16,20.59,23.54,25.06,24.77,22.74,19.63,4,,,,17.90,15.65,14.27,13.85,14.66,16.52,18.83,21.09,22.62,22.98,22.11,20.29
HOLIDAYS/DAYLIGHT SAVINGS,No,0,0,0
COMMENTS 1, generated by SuPy
COMMENTS 2, none
DATA PERIODS,1,1,Data,Sunday,1/1,12/31
"""
text_meta = text_meta.split("\n")[1:-1]
# change the default latitude, longuitude and timezone from args, which will be used to calculate solar position and received direct solar radiation at each surface in EnergyPlus
text_meta[0] = text_meta[0].replace("lat", str(lat))
text_meta[0] = text_meta[0].replace("lon", str(lon))
text_meta[0] = text_meta[0].replace("tz", str(tz))
# lines = []
text_epw = "\n".join(text_meta + text_data)
# with open(path_epw, 'r') as f:
# for line in f:
# lines.append(line)
# lines.insert(0, text_meta[1:])
# s = ''.join(lines)
# write out the actual EPW file
path_epw.write_text(text_epw)
# with open(path_epw, "w") as fp:
# fp.write(text_epw)
return df_epw, text_meta, path_epw