"""
T2 (2m air temperature) attribution functions.
Provides functions to decompose T2 differences between scenarios
into physically attributable components.
"""
from typing import Literal
import numpy as np
import pandas as pd
from ._core import shapley_triple_product
from ._helpers import (
align_scenarios,
detect_anomalies,
extract_suews_group,
_group_means,
unwrap_forcing,
)
from ._physics import cal_gamma_heat, cal_r_eff_heat, decompose_flux_budget
from ._result import AttributionResult
def _extract_flux_components(
df: pd.DataFrame,
flux_like: np.ndarray,
aggregate: bool = False,
) -> dict[str, np.ndarray]:
"""Extract signed energy-budget components with zero-fill for missing columns."""
def _extract(col: str, sign: float = 1.0) -> np.ndarray:
if col not in df.columns:
return np.zeros_like(flux_like, dtype=float)
values = np.array([df[col].mean()]) if aggregate else df[col].values
return sign * values
return {
"Qstar": _extract("QN"),
"QE": _extract("QE", sign=-1.0),
"dQS": _extract("QS", sign=-1.0),
"QF": _extract("QF"),
}
[docs]
def attribute_t2(
df_output_A: pd.DataFrame,
df_output_B: pd.DataFrame,
df_forcing_A: pd.DataFrame,
df_forcing_B: pd.DataFrame,
hierarchical: bool = True,
min_flux: float = 0.1,
) -> AttributionResult:
"""
Decompose T2 differences between two SUEWS scenarios.
The difference in 2m air temperature is attributed to:
- Flux changes (Q_H, with optional breakdown into Q*, Q_E, dQ_S, Q_F)
- Resistance changes (turbulent exchange efficiency)
- Air property changes (rho * c_p)
Parameters
----------
df_output_A : pd.DataFrame
SUEWS output DataFrame for scenario A (reference/baseline)
df_output_B : pd.DataFrame
SUEWS output DataFrame for scenario B (modified/test)
df_forcing_A : pd.DataFrame
Forcing DataFrame for scenario A. Must contain 'Tair', 'RH', 'pres' columns.
df_forcing_B : pd.DataFrame
Forcing DataFrame for scenario B. Must contain 'Tair', 'RH', 'pres' columns.
hierarchical : bool, optional
If True, decompose flux contribution into budget components
(Q*, Q_E, dQ_S, Q_F). Default True.
min_flux : float, optional
Minimum flux threshold (W/m2) for resistance calculation.
Timesteps with abs(Q_H) < min_flux are flagged. Default 0.1.
Returns
-------
AttributionResult
Container with contributions timeseries, summary statistics, and metadata.
Examples
--------
Compare baseline vs. green infrastructure scenario:
>>> result = attribute_t2(df_output_baseline, df_output_green,
... df_forcing_A=df_forcing, df_forcing_B=df_forcing)
>>> print(result)
T2 Attribution Results
========================================
Mean delta_T2: -1.47 degC
Component Breakdown:
----------------------------------------
flux_total : -0.89 degC (60.5%)
resistance : -0.42 degC (28.6%)
air_props : -0.16 degC (10.9%)
>>> result.plot(kind="bar") # Visualise contributions
"""
# Unwrap OOP wrappers to raw DataFrames
df_forcing_A = unwrap_forcing(df_forcing_A)
df_forcing_B = unwrap_forcing(df_forcing_B)
# Extract SUEWS output group with column validation
required_cols = {"T2", "QH"}
df_A = extract_suews_group(df_output_A, required_cols=required_cols)
df_B = extract_suews_group(df_output_B, required_cols=required_cols)
# Align indices
df_A, df_B, common_idx = align_scenarios(df_A, df_B)
# Extract required variables
T2_A = df_A["T2"].values
T2_B = df_B["T2"].values
QH_A = df_A["QH"].values
QH_B = df_B["QH"].values
# Extract forcing data for reference temperature and air properties
T_ref_A = df_forcing_A.loc[common_idx, "Tair"].values
T_ref_B = df_forcing_B.loc[common_idx, "Tair"].values
RH_A = df_forcing_A.loc[common_idx, "RH"].values
RH_B = df_forcing_B.loc[common_idx, "RH"].values
P_A = df_forcing_A.loc[common_idx, "pres"].values
P_B = df_forcing_B.loc[common_idx, "pres"].values
# Calculate gamma = 1/(rho*cp)
# Ensure numpy arrays (cal_gamma_heat may return pandas Series)
gamma_A = np.asarray(cal_gamma_heat(T_ref_A, RH_A, P_A))
gamma_B = np.asarray(cal_gamma_heat(T_ref_B, RH_B, P_B))
# Back-calculate effective resistance
r_A = cal_r_eff_heat(T2_A, T_ref_A, QH_A, gamma_A, min_flux)
r_B = cal_r_eff_heat(T2_B, T_ref_B, QH_B, gamma_B, min_flux)
# Shapley decomposition: delta(T2 - T_ref) = delta(r * QH * gamma)
Phi_r, Phi_QH, Phi_gamma = shapley_triple_product(
r_A, r_B, QH_A, QH_B, gamma_A, gamma_B
)
# The total T2 change includes reference temperature change:
# delta_T2 = delta_T_ref + delta(r * QH * gamma)
Phi_T_ref = T_ref_B - T_ref_A
# Build contributions DataFrame
contributions = pd.DataFrame(
{
"delta_total": T2_B - T2_A,
"T_ref": Phi_T_ref,
"flux_total": Phi_QH,
"resistance": Phi_r,
"air_props": Phi_gamma,
},
index=common_idx,
)
# Hierarchical flux decomposition
if hierarchical:
# Q_H = Q* - Q_E - dQ_S + Q_F (energy balance)
flux_comps_A = _extract_flux_components(df_A, QH_A)
flux_comps_B = _extract_flux_components(df_B, QH_B)
flux_breakdown = decompose_flux_budget(
QH_A, QH_B, flux_comps_A, flux_comps_B, Phi_QH
)
for name, values in flux_breakdown.items():
contributions[f"flux_{name}"] = values
# Add flags for low-flux timesteps
contributions["flag_low_flux"] = np.abs(QH_A) < min_flux
# Calculate summary statistics
summary = contributions.describe().T[["mean", "std", "min", "max"]]
# Metadata
metadata = {
"n_timesteps": len(common_idx),
"period_start": str(common_idx.min()),
"period_end": str(common_idx.max()),
"hierarchical": hierarchical,
"min_flux_threshold": min_flux,
"n_low_flux_flagged": int(contributions["flag_low_flux"].sum()),
}
return AttributionResult(
variable="T2",
contributions=contributions,
summary=summary,
metadata=metadata,
)
[docs]
def diagnose_t2(
df_output: pd.DataFrame,
df_forcing: pd.DataFrame,
method: Literal["anomaly", "extreme", "diurnal"] = "anomaly",
threshold: float = 2.0,
hierarchical: bool = True,
min_flux: float = 0.1,
) -> AttributionResult:
"""
Automatically identify anomalous T2 values and attribute the causes.
This convenience function identifies unusual T2 behaviour within a single
simulation run and diagnoses the driving factors by comparing aggregate
statistics between reference (normal) and target (anomaly) periods.
Parameters
----------
df_output : pd.DataFrame
SUEWS output DataFrame
df_forcing : pd.DataFrame
Forcing DataFrame. Must contain 'Tair', 'RH', 'pres' columns.
method : str, optional
Detection method:
- 'anomaly': Compare timesteps > threshold sigma from daily mean
- 'extreme': Compare top/bottom 5% of T2 vs. middle 50%
- 'diurnal': Compare afternoon peak (12:00-15:00) vs. morning (06:00-10:00)
Default 'anomaly'.
threshold : float, optional
Standard deviation threshold for anomaly detection. Default 2.0.
hierarchical : bool, optional
Include flux budget breakdown. Default True.
min_flux : float, optional
Minimum flux threshold (W/m2) for resistance calculation.
Timesteps with abs(Q_H) < min_flux are excluded. Default 0.1.
Returns
-------
AttributionResult
Attribution decomposition with diagnostic interpretation.
Examples
--------
Quick anomaly diagnosis:
>>> result = diagnose_t2(df_output, df_forcing, method="anomaly")
>>> print(result)
T2 Attribution Results
========================================
Mean delta_T2: +2.80 degC
Component Breakdown:
----------------------------------------
T_ref : +2.10 degC (75.0%)
flux_total : +0.50 degC (17.9%)
resistance : +0.18 degC ( 6.4%)
air_props : +0.02 degC ( 0.7%)
Closure residual: 1.23e-15 degC
>>> result.plot() # Visualise decomposition
"""
# Unwrap OOP wrappers to raw DataFrames
df_forcing = unwrap_forcing(df_forcing)
# Extract SUEWS output
df = extract_suews_group(df_output)
t2 = df["T2"]
anomaly_mask, normal_mask = detect_anomalies(
t2, method=method, threshold=threshold
)
# Store group sizes for metadata
n_anomaly = anomaly_mask.sum()
n_normal = normal_mask.sum()
# Extract data for each group
df_normal = df.loc[normal_mask]
df_anomaly = df.loc[anomaly_mask]
# Get mean values for each group
output_means = _group_means(df_normal, df_anomaly, ["T2", "QH"])
T2_A, T2_B = output_means["T2"]
QH_A, QH_B = output_means["QH"]
# Extract forcing data for each group
df_forcing_normal = df_forcing.loc[normal_mask]
df_forcing_anomaly = df_forcing.loc[anomaly_mask]
forcing_means = _group_means(
df_forcing_normal, df_forcing_anomaly, ["Tair", "RH", "pres"]
)
T_ref_A, T_ref_B = forcing_means["Tair"]
RH_A, RH_B = forcing_means["RH"]
P_A, P_B = forcing_means["pres"]
# Calculate gamma = 1/(rho*cp)
# Ensure numpy arrays (cal_gamma_heat may return pandas Series)
gamma_A = np.asarray(cal_gamma_heat(T_ref_A, RH_A, P_A))
gamma_B = np.asarray(cal_gamma_heat(T_ref_B, RH_B, P_B))
# Back-calculate effective resistance from mean values
r_A = cal_r_eff_heat(T2_A, T_ref_A, QH_A, gamma_A, min_flux)
r_B = cal_r_eff_heat(T2_B, T_ref_B, QH_B, gamma_B, min_flux)
# Shapley decomposition: delta(T2 - T_ref) = delta(r * QH * gamma)
Phi_r, Phi_QH, Phi_gamma = shapley_triple_product(
r_A, r_B, QH_A, QH_B, gamma_A, gamma_B
)
# The total T2 change includes reference temperature change:
# delta_T2 = delta_T_ref + delta(r * QH * gamma)
Phi_T_ref = T_ref_B - T_ref_A
# Build single-row contributions DataFrame
contributions = pd.DataFrame(
{
"delta_total": T2_B - T2_A,
"T_ref": Phi_T_ref, # Reference temperature contribution
"flux_total": Phi_QH,
"resistance": Phi_r,
"air_props": Phi_gamma,
},
index=pd.Index(["aggregate"], name="type"),
)
# Hierarchical flux decomposition
if hierarchical:
flux_comps_A = _extract_flux_components(df_normal, QH_A, aggregate=True)
flux_comps_B = _extract_flux_components(df_anomaly, QH_B, aggregate=True)
flux_breakdown = decompose_flux_budget(
QH_A, QH_B, flux_comps_A, flux_comps_B, Phi_QH
)
for name, values in flux_breakdown.items():
contributions[f"flux_{name}"] = values
# Calculate summary (same as contributions for single-row)
summary = contributions.describe().T[["mean", "std", "min", "max"]]
# Metadata
metadata = {
"n_timesteps": 1, # Aggregate comparison
"method": method,
"threshold": threshold if method == "anomaly" else None,
"n_anomaly": int(n_anomaly),
"n_reference": int(n_normal),
"pct_anomaly": 100 * n_anomaly / len(t2),
"hierarchical": hierarchical,
"aggregate_comparison": True,
}
return AttributionResult(
variable="T2",
contributions=contributions,
summary=summary,
metadata=metadata,
)