# suppress pandas warnings
import logging
import os
import shutil
import time
import warnings
import contextlib
from pathlib import Path
import numpy as np
import pandas as pd
from numpy import cos, deg2rad, sin, sqrt
from .._env import logger_supy
warnings.simplefilter(action="ignore", category=FutureWarning)
################################################
# more ERA-5 related functions
################################################
# utility functions
def roundPartial(value, resolution):
"""
Geopotential Functions on WGS84 Reference Ellipsoid
This module contains code for converting Geopotential to Geometric and vice-versa on the WGS84 reference ellipsoid
ERA-5 utility functions from Chris Roth # https://pypi.org/project/eratools/
"""
return round(value / resolution) * resolution
Rmax_WGS84 = 6378137
Rmin_WGS84 = Rmax_WGS84 * (1 - 1 / 298.257223563)
def _geoid_radius(latitude: float) -> float:
"""Calculates the GEOID radius at a given latitude
Parameters
----------
latitude : float
Latitude (degrees)
Returns
-------
R : float
GEOID Radius (meters)
"""
lat = deg2rad(latitude)
return sqrt(1 / (cos(lat) ** 2 / Rmax_WGS84**2 + sin(lat) ** 2 / Rmin_WGS84**2))
def geometric2geopotential(z: float, latitude: float) -> float:
"""Converts geometric height to geopotential height
Parameters
----------
z : float
Geometric height (meters)
latitude : float
Latitude (degrees)
Returns
-------
h : float
Geopotential Height (meters) above the reference ellipsoid
"""
twolat = deg2rad(2 * latitude)
g = 9.80616 * (1 - 0.002637 * cos(twolat) + 0.0000059 * cos(twolat) ** 2)
re = _geoid_radius(latitude)
return z * g * re / (re + z)
def geopotential2geometric(h: float, latitude: float) -> float:
"""Converts geopotential height to geometric height
Parameters
----------
h : float
Geopotential height (meters)
latitude : float
Latitude (degrees)
Returns
-------
z : float
Geometric Height (meters) above the reference ellipsoid
"""
twolat = deg2rad(2 * latitude)
g = 9.80616 * (1 - 0.002637 * cos(twolat) + 0.0000059 * cos(twolat) ** 2)
re = _geoid_radius(latitude)
return h * re / (g * re - h)
# functions to interpolate the atmospheric variables to a specified height/altitude
def get_ser_val_alt(
lat: float,
lon: float,
da_alt_x,
da_alt,
da_val,
) -> pd.Series:
"""interpolate atmospheric variable to a specified altitude
Parameters
----------
lat : float
latitude of specified site
lon : float
longitude of specified site
da_alt_x
desired altitude to interpolate variable at
da_alt
altitude associated with `da_val`: variable array to interpolate
da_val
atmospheric variable to interpolate
Returns
-------
pd.Series
interpolated values at the specified altitude of site positioned by [`lat`, `lon`]
"""
from scipy.interpolate import interp1d
alt_t_1d = da_alt.sel(latitude=lat, longitude=lon, method="nearest")
val_t_1d = da_val.sel(latitude=lat, longitude=lon, method="nearest")
alt_x = da_alt_x.sel(latitude=lat, longitude=lon, method="nearest")[0]
val_alt = np.array([
interp1d(alt_1d, val_1d)(alt_x) for alt_1d, val_1d in zip(alt_t_1d, val_t_1d)
])
ser_alt = pd.Series(
val_alt,
index=da_val.time.values,
name=da_val.name,
)
return ser_alt
def get_df_val_alt(lat: float, lon: float, da_alt_meas, ds_val):
"""interpolate atmospheric variables to a specified altitude
Parameters
----------
lat : float
latitude of specified site
lon : float
longitude of specified site
da_alt_meas
altitude associated with `da_val`: variable array to interpolate
ds_val
atmospheric varialble to interpolate
Returns
-------
pd.DataFrame
interpolated values at the specified altitude of site positioned by [`lat`, `lon`]
"""
from scipy.interpolate import interp1d
da_alt = geopotential2geometric(ds_val.z, ds_val.latitude)
# generate pressure series for grid x
da_alt_x = da_alt.sel(latitude=lat, longitude=lon, method="nearest")
alt_meas_x = da_alt_meas.sel(latitude=lat, longitude=lon, method="nearest")[0]
val_pres = np.array([interp1d(alt, da_alt_x.level)(alt_meas_x) for alt in da_alt_x])
df_val_alt = pd.concat(
[
get_ser_val_alt(lat, lon, da_alt_meas, da_alt, ds_val[var])
for var in ds_val.data_vars
],
axis=1,
)
# add pressure
df_val_alt["p"] = val_pres
df_val_alt.index = df_val_alt.index.set_names("time")
df_val_alt.columns = df_val_alt.columns.set_names("var")
return df_val_alt
# cds download related functions
def gen_dict_dt(dt_index):
list_key = ["year", "month", "day", "time"]
list_fmt = ["%Y", "%m", "%d", "%H:%M"]
dict_dt = {
k: dt_index.dt.strftime(fmt).unique().tolist()
for k, fmt in zip(list_key, list_fmt)
}
return dict_dt
def gen_dict_dt_sub(dt_index):
# divide by [year, month] for surface level data
ser_dict_sub = dt_index.groupby(dt_index.dt.strftime("%Y%m")).apply(gen_dict_dt)
dict_sub = ser_dict_sub.unstack().T.to_dict()
return dict_sub
# generate filename
def gen_fn(dict_x):
# centre coordinates
ar_coords = np.array(dict_x["area"]).reshape(2, -1)
lat_c, lon_c = ar_coords.T.mean(axis=1)
# starting year
yr_x = dict_x["year"][0]
# starting month
mon_x = dict_x["month"][0]
# type of dataset: `sfc` for surface level while `ml` for atmospheric multi-level
type_x = "sfc" if "2m_temperature" in dict_x["variable"] else "ml"
# format location coordinates in filename
lat_c = f"{lat_c}N" if lat_c > 0 else f"{-lat_c}S"
lon_c = f"{lon_c}E" if lon_c > 0 else f"{-lon_c}W"
# construct filename
fn_x = f"{lat_c}{lon_c}-{yr_x}{mon_x}-{type_x}.nc"
return fn_x
# dict_x: a dict describing download elements
def gen_dict_proc(dict_x):
type_x = "sfc" if "2m_temperature" in dict_x["variable"] else "ml"
dict_feed = {
"sfc": "reanalysis-era5-single-levels",
"ml": "reanalysis-era5-pressure-levels",
}
feed_x = dict_feed[type_x]
dict_proc = dict(
name=feed_x,
request=dict_x,
target=gen_fn(dict_x),
)
return dict_proc
# generate a dict of reqs kwargs for (lat_x,lon_x) spanning [start, end]
@contextlib.contextmanager
def chdir(path):
orig = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(orig)
@contextlib.contextmanager
def safe_stdout_stderr():
"""Ensure stdout/stderr are valid file objects for libraries that assume they exist.
Some GUI environments (e.g., QGIS Processing toolbox) set stdout/stderr to None,
which causes libraries like tqdm to crash with:
AttributeError: 'NoneType' object has no attribute 'write'
This context manager temporarily replaces None streams with os.devnull, which is
more robust than setting TQDM_DISABLE as it works for any library that writes to
stdout/stderr.
See: https://github.com/UMEP-dev/SUEWS/issues/902
"""
import sys
stdout_was_none = sys.stdout is None
stderr_was_none = sys.stderr is None
# Only act if streams are None
if not stdout_was_none and not stderr_was_none:
yield
return
# Replace None streams with devnull
devnull = open(os.devnull, "w")
try:
if stdout_was_none:
sys.stdout = devnull
if stderr_was_none:
sys.stderr = devnull
yield
finally:
if stdout_was_none:
sys.stdout = None
if stderr_was_none:
sys.stderr = None
devnull.close()
def download_cds(fn, dict_req):
import cdsapi
import tempfile
c = cdsapi.Client()
path_fn = Path(fn)
if path_fn.exists():
logger_supy.warning(f"{fn} exists!")
else:
logger_supy.info(f"To download: {fn}")
# this will download the file to a secure temporary directory without requirement for extra permission
with tempfile.TemporaryDirectory() as td:
with chdir(td):
# Ensure stdout/stderr exist for tqdm in headless environments
with safe_stdout_stderr():
c.retrieve(**dict_req)
# move the downloaded file to desired location
shutil.move(path_fn.name, fn)
# hold on a bit for the next request
time.sleep(0.0100)
def download_era5_timeseries(
lat_x: float,
lon_x: float,
start: str,
end: str,
dir_save=Path("."),
logging_level=logging.INFO,
) -> str:
"""Download ERA5 data using CDS API timeseries dataset.
This function uses the ERA5 timeseries dataset which is optimised for
fast retrieval of point location data. Only provides surface-level variables.
Parameters
----------
lat_x : float
Latitude of the point of interest.
lon_x : float
Longitude of the point of interest.
start : str
Start date in format 'YYYY-MM-DD'.
end : str
End date in format 'YYYY-MM-DD'.
dir_save : Path or str, optional
Directory to save downloaded file, by default Path(".").
logging_level : int, optional
Logging level, by default logging.INFO.
Returns
-------
str
Path to the downloaded CSV file.
Note
----
1. This method only provides surface-level variables and should be used with simple_mode=True.
2. Uses CDS API timeseries dataset for fast point location downloads.
3. Much faster than traditional gridded ERA5 (~26s for 30 years vs several minutes).
4. Only works for point locations (no spatial grid).
5. Downloads CSV format directly (no netCDF dependencies needed).
6. Requires CDS API credentials configured: https://cds.climate.copernicus.eu/api-how-to
Reference
---------
ERA5 timeseries documentation: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-single-levels-timeseries
"""
import cdsapi
import zipfile
import tempfile
# adjust logging level
logger_supy.setLevel(logging_level)
# parse and create (if needed) the saving directory
path_dir_save = Path(dir_save).expanduser().resolve()
if not path_dir_save.exists():
path_dir_save.mkdir(parents=True)
# define variables available in ERA5 timeseries dataset
# NOTE: This dataset only provides basic surface variables (16 total).
# Advanced variables (geopotential, heat fluxes, surface properties) are NOT available.
# These must be obtained from the gridded ERA5 dataset if needed.
variables = [
"2m_dewpoint_temperature",
"2m_temperature",
"10m_u_component_of_wind",
"10m_v_component_of_wind",
"surface_pressure",
"surface_solar_radiation_downwards",
"surface_thermal_radiation_downwards",
"total_precipitation",
]
# format location coordinates in filename
lat_c = f"{lat_x}N" if lat_x > 0 else f"{-lat_x}S"
lon_c = f"{lon_x}E" if lon_x > 0 else f"{-lon_x}W"
# parse dates
date_start = pd.to_datetime(start).strftime("%Y-%m-%d")
date_end = pd.to_datetime(end).strftime("%Y-%m-%d")
year_start = pd.to_datetime(start).year
# construct filename (CSV format)
fn = f"{lat_c}{lon_c}-{year_start}-sfc.csv"
path_fn = path_dir_save / fn
if path_fn.exists():
logger_supy.warning(f"{fn} exists!")
else:
logger_supy.info(f"Downloading ERA5 timeseries data using CDS API...")
logger_supy.info(f"Location: ({lat_x}, {lon_x})")
logger_supy.info(f"Period: {date_start} to {date_end}")
# define request parameters (CSV - no netCDF dependencies needed)
dataset = "reanalysis-era5-single-levels-timeseries"
request = {
"variable": variables,
"location": {"longitude": lon_x, "latitude": lat_x},
"date": [f"{date_start}/{date_end}"],
"data_format": "csv",
}
# retrieve data to temporary file (CDS API returns zip archives)
t0 = time.time()
client = cdsapi.Client()
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir) / "download.zip"
# Ensure stdout/stderr exist for tqdm in headless environments
with safe_stdout_stderr():
client.retrieve(dataset, request).download(str(tmp_path))
# extract CSV from zip archive
with zipfile.ZipFile(tmp_path, "r") as zip_ref:
# find the CSV file in the archive
csv_files = [f for f in zip_ref.namelist() if f.endswith(".csv")]
if not csv_files:
raise ValueError(f"No CSV file found in downloaded archive")
# extract the first CSV file
zip_ref.extract(csv_files[0], path_dir_save)
extracted_path = path_dir_save / csv_files[0]
# rename to expected filename
extracted_path.rename(path_fn)
# cleanup happens automatically when tmp_dir context exits
t1 = time.time()
logger_supy.info(f"Download completed in {t1 - t0:.1f} seconds")
return str(path_fn)
# generate requests
# load downloaded files
# generate supy forcing using ERA-5 data
[docs]
def gen_forcing_era5(
lat_x: float,
lon_x: float,
start: str,
end: str,
dir_save=Path("."),
hgt_agl_diag=100.0,
logging_level=logging.INFO,
) -> list:
"""Generate SUEWS forcing files using ERA-5 timeseries data.
Downloads ERA5 surface timeseries data from CDS API and converts to SUEWS forcing format.
Uses simple diagnostics: environmental lapse rate (6.5 K/km) for temperature and
neutral MOST for wind speed.
Parameters
----------
lat_x : float
Latitude of the location.
lon_x : float
Longitude of the location.
start : str
Start date (e.g., "2020-01-01").
end : str
End date (e.g., "2020-12-31").
dir_save : pathlib.Path or str, optional
Directory for saving ERA5 CSV files, by default Path(".").
hgt_agl_diag : float, optional
Height above ground level for diagnostics (m), by default 100.
logging_level : int, optional
Logging level [50=CRITICAL, 40=ERROR, 30=WARNING, 20=INFO, 10=DEBUG].
Returns
-------
list
List of SUEWS forcing file paths.
Note
----
1. Requires CDS API configuration: https://cds.climate.copernicus.eu/api-how-to
2. Uses ERA5 timeseries dataset (surface data, ~26s for 30 years)
3. Point location only (no spatial grids)
4. Use supy.util.read_forcing() to import generated files
Examples
--------
>>> list_fn = gen_forcing_era5(50.86, 4.35, "2020-01-01", "2020-01-31")
Reference
---------
ECMWF, S. P. (2016). In IFS documentation CY41R2 Part IV: Physical Processes.
ECMWF: Reading, UK, 111-113. https://www.ecmwf.int/en/elibrary/16648-part-iv-physical-processes
"""
# adjust logging level
logger_supy.setLevel(logging_level)
# download ERA5 timeseries CSV
fn_sfc = download_era5_timeseries(lat_x, lon_x, start, end, dir_save, logging_level)
# generate diagnostics from CSV
df_forcing_raw = gen_df_diag_era5_csv(fn_sfc, hgt_agl_diag)
# extract coordinates
lat = df_forcing_raw.attrs["latitude"]
lon = df_forcing_raw.attrs["longitude"]
# format to SUEWS convention
df_forcing = format_df_forcing(df_forcing_raw)
# add coordinates to match expected structure
df_forcing["latitude"] = lat
df_forcing["longitude"] = lon
df_forcing = df_forcing.reset_index().set_index(["latitude", "longitude", "time"])
# save results as SUEWS met input files
list_fn = save_forcing_era5(df_forcing, dir_save)
return list_fn
# format dataframe to SUEWS convention
def format_df_forcing(df_forcing_raw):
from .._load import dict_var_type_forcing
df_forcing_grid = df_forcing_raw.copy()
# convert energy fluxes: [J m-2] to [W m-2]
df_forcing_grid.loc[:, ["ssrd", "strd", "sshf", "slhf"]] /= 3600
# reverse the sign of qh and qe
df_forcing_grid.loc[:, ["sshf", "slhf"]] *= -1
# convert rainfall: from [m] to [mm]
df_forcing_grid.loc[:, "tp"] *= 1000
# convert bulb temperature from K to degC
df_forcing_grid = df_forcing_grid.assign(Tair=df_forcing_grid.t_z - 273.15)
# convert atmospheric pressure: [Pa] to [kPa]
df_forcing_grid.loc[:, "p_z"] /= 1000
# renaming for consistency with SUEWS
df_forcing_grid = df_forcing_grid.rename(
{
"ssrd": "kdown",
"strd": "ldown",
"sshf": "qh",
"slhf": "qe",
"tp": "rain",
"uv_z": "U",
"RH_z": "RH",
"p_z": "pres",
},
axis=1,
)
col_suews = list(dict_var_type_forcing.keys())[:-1] + ["alt_z"]
df_forcing_grid = df_forcing_grid.reindex(col_suews, axis=1)
df_forcing_grid = df_forcing_grid.assign(
iy=df_forcing_grid.index.year,
id=df_forcing_grid.index.dayofyear,
it=df_forcing_grid.index.hour,
imin=df_forcing_grid.index.minute,
)
# corrections
df_forcing_grid.loc[:, "RH"] = df_forcing_grid.loc[:, "RH"].where(
df_forcing_grid.loc[:, "RH"].between(0.001, 105), 105
)
df_forcing_grid.loc[:, "kdown"] = df_forcing_grid.loc[:, "kdown"].where(
df_forcing_grid.loc[:, "kdown"] > 0, 0
)
# trim decimals
df_forcing_grid.iloc[:, 4:] = df_forcing_grid.iloc[:, 4:].round(2)
# coerce integer
df_forcing_grid = df_forcing_grid.astype({
"iy": "int32",
"id": "int32",
"it": "int32",
"imin": "int32",
})
# replace nan with -999
df_forcing_grid = df_forcing_grid.replace(np.nan, -999).asfreq("1h")
return df_forcing_grid
# pandas-only version for CSV timeseries (no xarray dependency)
def gen_df_diag_era5_csv(fn_csv, hgt_agl_diag=100):
"""Generate diagnostics from CSV timeseries data using pandas only.
This function avoids xarray dependency for the common timeseries use case.
Only supports simple_mode=True (log law + lapse rate).
"""
from atmosp import calculate as ac
# load CSV timeseries data
df = pd.read_csv(fn_csv)
df["valid_time"] = pd.to_datetime(df["valid_time"])
df = df.set_index("valid_time")
df.index.name = "time"
# extract coordinates
lat = df["latitude"].iloc[0]
lon = df["longitude"].iloc[0]
# extract variables (using abbreviated CSV column names)
pres_z0 = df["sp"] # surface pressure [Pa]
u10 = df["u10"] # 10m u-wind [m/s]
v10 = df["v10"] # 10m v-wind [m/s]
t2 = df["t2m"] # 2m temperature [K]
d2 = df["d2m"] # 2m dewpoint [K]
# computed variables
uv10 = np.sqrt(u10**2 + v10**2) # wind speed
q2 = ac("qv", Td=d2, T=t2, p=pres_z0) # specific humidity
# default values for missing variables (timeseries doesn't provide these)
alt_z0 = 0.0 # assume sea level
z0m = 0.1 # default roughness [m]
# constants for diagnostics
env_lapse = 6.5 / 1000.0 # environmental lapse rate [K m^-1]
grav = 9.80616 # gravity [m s^-2]
rd = 287.04 # gas constant for dry air [J K^-1 kg^-1]
z = hgt_agl_diag # diagnostic height [m]
# --- Compute diagnostics at height z using simple mode ---
# temperature correction using lapse rate
t_z = t2 - (z - 2) * env_lapse
# barometric equation with varying temperature
p_z = pres_z0 * (t_z / t2) ** (grav / (rd * env_lapse))
# humidity assuming invariable relative humidity
RH_z = ac("RH", qv=q2, p=pres_z0, T=t2)
q_z = ac("qv", RH=RH_z, p=p_z, T=t_z)
# wind speed using log law (neutral condition)
uv_z = uv10 * (np.log((z + z0m) / z0m) / np.log((10 + z0m) / z0m))
# altitude at diagnostic height
alt_z = alt_z0 + z
# create DataFrame with diagnostics
df_diag = pd.DataFrame(
{
"uv_z": uv_z,
"t_z": t_z,
"q_z": q_z,
"RH_z": RH_z,
"p_z": p_z,
"alt_z": alt_z,
# also include surface variables needed downstream
"ssrd": df["ssrd"],
"strd": df["strd"],
"tp": df["tp"],
"sshf": 0.0, # not available in timeseries
"slhf": 0.0, # not available in timeseries
},
index=df.index,
)
# add coordinates as attributes (will be used for filename)
df_diag.attrs = {"latitude": lat, "longitude": lon}
return df_diag
# diagnose ISL variable using MOST
# diagnose ISL variable using MOST
# save ERA5 forcing dataframe to SUEWS-simulation ready txt files
def save_forcing_era5(df_forcing_era5, dir_save):
gpb = df_forcing_era5.groupby(["latitude", "longitude"])
list_grid = list(gpb.groups.keys())
list_fn = []
path_dir_save = Path(dir_save)
# split into grids
for lat, lon in list_grid:
df_grid = df_forcing_era5.loc[lat, lon]
s_lat = f"{lat}N" if lat >= 0 else f"{-lat}S"
s_lon = f"{lon}E" if lon >= 0 else f"{-lon}W"
alt_z = df_grid.alt_z.iloc[0]
df_grid = df_grid.drop("alt_z", axis=1)
s_alt = f"{alt_z:.1f}A"
idx_grid = df_grid.index
# split into years
grp_year = df_grid.groupby(idx_grid.year)
for year in grp_year.groups:
df_year = grp_year.get_group(year)
idx_year = df_year.index
s_year = idx_year[0].year
# Calculate frequency from actual time differences (groupby loses freq metadata)
time_diff = (
(idx_year[1] - idx_year[0])
if len(idx_year) > 1
else pd.Timedelta("60min")
)
s_freq = time_diff / pd.Timedelta("1min")
s_fn = f"ERA5_UTC-{s_lat}-{s_lon}-{s_alt}_{s_year}_data_{s_freq:.0f}.txt"
path_fn = path_dir_save / s_fn
df_year.to_csv(path_fn, sep=" ", index=False)
# collect file names
list_fn.append(str(path_fn))
return list_fn