From c83989eaec4bd64730b80e72c4df7eddbad28257 Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 14 Oct 2025 09:39:13 +0200 Subject: [PATCH 01/14] [VCF] Add test for block_maxima in pot.py --- tests/distributions/test_pot.py | 65 +++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 tests/distributions/test_pot.py diff --git a/tests/distributions/test_pot.py b/tests/distributions/test_pot.py new file mode 100644 index 0000000..2766e87 --- /dev/null +++ b/tests/distributions/test_pot.py @@ -0,0 +1,65 @@ +import unittest +import numpy as np + +from bluemath_tk.distributions.pot import block_maxima + +class TestPOT(unittest.TestCase): + def setUp(self): + # Create synthetic data for testing + np.random.seed(42) + self.x = np.random.lognormal(1, 1.2, size=int(365*5)) # 5-year of daily values + + def test_block_maxima_basic(self): + # Test with default parameters + idx, bmaxs = block_maxima(self.x) + self.assertIsInstance(idx, np.ndarray) + self.assertIsInstance(bmaxs, np.ndarray) + self.assertEqual(idx.size, bmaxs.size) + + def test_block_maxima_custom_block(self): + # Test with custom block size + block_size = 5 + idx, bmaxs = block_maxima(self.x, block_size=block_size) + expected_blocks = int(np.ceil(len(self.x) / block_size)) + self.assertEqual(bmaxs.size, expected_blocks) + + def test_block_maxima_independence(self): + # Test minimum separation between peaks + block_size = 5 + min_sep = 2 + idx, bmaxs = block_maxima(self.x, block_size=block_size, min_sep=min_sep) + + # Check that peaks are separated by at least min_sep + differences = np.diff(idx) + self.assertTrue(np.all(differences >= min_sep)) + + def test_invalid_min_sep(self): + # Test error raising for invalid min_sep + block_size = 5 + min_sep = 4 # > (block_size + 1) / 2 + + with self.assertRaises(ValueError): + block_maxima(self.x, block_size=block_size, min_sep=min_sep) + + def test_block_maxima_types(self): + # Test with different input types + block_size = 5.0 # float instead of int + idx, bmaxs = block_maxima(self.x, block_size=block_size) + self.assertIsInstance(idx, np.ndarray) + self.assertIsInstance(bmaxs, np.ndarray) + + # Test with list input + x_list = self.x.tolist() + idx, bmaxs = block_maxima(x_list, block_size=5) + self.assertIsInstance(idx, np.ndarray) + self.assertIsInstance(bmaxs, np.ndarray) + + def test_empty_input(self): + # Test with empty input + x_empty = np.array([]) + idx, bmaxs = block_maxima(x_empty) + self.assertEqual(idx.size, 0) + self.assertEqual(bmaxs.size, 0) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From a6a5198840444f2c46c351c4d0dfb9fee27b1aea Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 14 Oct 2025 12:44:20 +0200 Subject: [PATCH 02/14] [VCF] Add pot functions (pot, OptimalThreshold, mrlp) --- bluemath_tk/distributions/pot.py | 430 +++++++++++++++++++ bluemath_tk/distributions/utils/pot_utils.py | 265 ++++++++++++ 2 files changed, 695 insertions(+) create mode 100644 bluemath_tk/distributions/utils/pot_utils.py diff --git a/bluemath_tk/distributions/pot.py b/bluemath_tk/distributions/pot.py index 89de828..992c92f 100644 --- a/bluemath_tk/distributions/pot.py +++ b/bluemath_tk/distributions/pot.py @@ -1,4 +1,11 @@ +import matplotlib.pyplot as plt import numpy as np +import pandas as pd +from scipy.signal import find_peaks +from scipy.stats import chi2, norm, pearsonr + +from .utils.pot_utils import RWLSfit, threshold_search +from ..core.io import BlueMathModel def block_maxima( @@ -98,3 +105,426 @@ def violates(i_left, i_right): idx = np.asarray([segments_idx[b][blocks[b][0]] for b in range(n_blocks)]) return idx, bmaxs + + +def pot( + data: np.ndarray, + threshold: float = 0.0, + n0: int = 10, + min_peak_distance: int = 2, + conf_level: float = 0.05, +): + """ + Function to identiy POT + This function identifies peaks in a dataset that exceed a specified + threshold and computes statistics such as mean exceedances, variances, + and weights for valid unique peaks. Peaks are considered independent if + they are separated by a minimum distance. + + Parameters + ---------- + data : np.ndarray(n,) + Input time series or data array + threshold : float, default=0 + Threshold above which peaks are extracted + n0 : int, default=10 + Minimum number of exceedances required for valid computation + min_peak_distance : int, default = 2 + Minimum distance between two peaks (in data points) + + Returns + ------- + pks_unicos_valid : np.ndarray + Valid unique peaks after removing NaN values + excedencias_mean_valid : np.ndarray + Mean exceedances for valid peaks + excedencias_weight_valid : np.ndarray + Weights based on exceedance variance for valid peaks + pks : np.ndarray + All detected peaks + locs : np.ndarray + Indices of the detected peaks in the data + autocorrelations : np.ndarray(n, 3) + Lags, correlations and pvalues to check the independence assumption + """ + # Find peaks exceeding the threshold with specified min distance + adjusted_data = np.maximum(data - threshold, 0) + + # Usamos la librería detecta que tiene el mismo funcionamiento que la función de matlab findpeaks + locs, _ = find_peaks(adjusted_data, distance=min_peak_distance+1) + # Con scipy + # locs, _ = find_peaks(adjusted_data, distance=min_peak_distance) + + pks = data[locs] + + # Calculate autocorrelation for lags 1 to 5 (if enough peaks) + num_lags = 5 + if len(pks) > num_lags: + autocorrelations = np.zeros((num_lags, 3), dtype=float) + for i in range(num_lags): + lag = i + 1 + r, p_value = pearsonr(pks[:-lag], pks[lag:]) # Test corr != 0 + autocorrelations[i, 0] = int(lag) + autocorrelations[i, 1] = r + autocorrelations[i, 2] = p_value + + if p_value < conf_level: + Warning( + f"Lag {int(lag)} significant, consider increase the number of min_peak_distance" + ) + else: + # Not enough peaks for autocorrelation analysis + autocorrelations = np.array([]) + + # Unique peaks (pks_unicos), ignoring duplicates + pks_unicos = np.unique(pks) + + # Allocate arrays to store mean exceedances, variances, and weights + excedencias_mean = np.zeros(len(pks_unicos), dtype=float) + excedencias_var = np.zeros(len(pks_unicos), dtype=float) + excedencias_weight = np.zeros(len(pks_unicos), dtype=float) + + # Loop through each unique peak and calculate mean exceedances, variances, and weights + for i in range(len(pks_unicos)): + # Define the current unique peak + pico_actual = pks_unicos[i] + + # Calculate the exceedances for peaks greater than the current unique peak + excedencias = pks[pks > pico_actual] + + # If there are enough exceedances (greater than or equal to n0) + if len(excedencias) >= n0: + # Compute the mean exceedance (adjusted by the current peak) + excedencias_mean[i] = np.mean(excedencias) - pico_actual + # Compute the variance of the exceedances (ddof=1 to use the same variance as matlab) + excedencias_var[i] = np.var(excedencias, ddof=1) + # Compute the weight as the number of exceedances divided by the variance + # Weight = number of exceedances / variance + # (Guard against division by zero) + if excedencias_var[i] != 0: + excedencias_weight[i] = len(excedencias) / excedencias_var[i] + else: + excedencias_weight[i] = np.nan + else: + # If fewer than n0 exceedances, truncate arrays and stop + excedencias_mean = excedencias_mean[:i] + excedencias_var = excedencias_var[:i] + excedencias_weight = excedencias_weight[:i] + break + + # Trim the list of unique peaks to match the number of valid exceedances + pks_unicos = pks_unicos[: len(excedencias_weight)] + + # Remove any NaN values from the peak and exceedance data to avoid issues in regression + valid_indices = ( + ~np.isnan(pks_unicos) + & ~np.isnan(excedencias_mean) + & ~np.isnan(excedencias_weight) + ) + pks_unicos_valid = pks_unicos[valid_indices] + excedencias_mean_valid = excedencias_mean[valid_indices] + excedencias_weight_valid = excedencias_weight[valid_indices] + + return ( + pks_unicos_valid, + excedencias_mean_valid, + excedencias_weight_valid, + pks, + locs, + autocorrelations, + ) + + +class OptimalThreshold(BlueMathModel): + """ + Class to compute the optimal threshold using different algorithms. + + Methods + ------- + studentidez_residuals : + Function to compute the threshold using the studentidez resiudals + + Notes + ----- + The list of methods implemented to select the optimal threshold are: + - Studentidez residuals method Mínguez (2025) [1]. + + + [1] Mínguez, R. (2025). Automatic Threshold Selection for Generalized + Pareto and Pareto–Poisson Distributions in Rainfall Analysis: A Case + Study Using the NOAA NCDC Daily Rainfall Database. Atmosphere, 16(1), + 61. https://doi.org/10.3390/atmos16010061 + """ + + def __init__( + self, + data, + threshold: float = 0.0, + n0: int = 10, + min_peak_distance: int = 2, + conf_level: float = 0.05, + method: str = "studentized", + plot: bool = False, + filename: str = None, + display_flag: bool = False, + ): + self.data = data + self.threshold = threshold + self.n0 = n0 + self.min_peak_distance = min_peak_distance + ( + self.pks_unicos_valid, + self.excedencias_mean_valid, + self.excedencias_weight_valid, + self.pks, + self.locs, + self.autocorrelations, + ) = pot(self.data, threshold, n0, min_peak_distance) + self.method = method + self.conf_level = conf_level + self.plot = plot + self.filename = filename + self.display_flag = display_flag + + def fit( + self, + ): + """ + Obtain the optimal threshold and POTs given the selected method + + Returns + ------- + threshold : float + Optimal threshold + pks : np.ndarray + POT + pks_idx : np.ndarray + Indices of POT + """ + if self.method == "studentized": + self.threshold, beta, fobj, r = self.studentized_residuals( + self.pks_unicos_valid, + self.excedencias_mean_valid, + self.excedencias_weight_valid, + self.conf_level, + self.plot, + self.filename, + self.display_flag, + ) + + # TODO: Añadir más metodos + + _, _, _, pks, pks_idx, _ = pot( + self.data, self.threshold, self.n0, self.min_peak_distance + ) + return self.threshold, pks, pks_idx + + def studentized_residuals( + self, + pks_unicos_valid: np.ndarray, + exceedances_mean_valid: np.ndarray, + exceedances_weight_valid: np.ndarray, + conf_level: float = 0.05, + plot: bool = False, + filename: str = None, + display_flag: bool = False, + ): + """ + Function to compute the optimal threshold based on Chi-Squared + and studentized residuals. Optionally plot the results if plot_flag is true and + displays messages if display_flag is true. + + Parameters + ---------- + pks_unicos_valid : np.ndarray(n,) + Vector of unique peaks (potential thresholds) + exceedances_mean_valid : np.ndarray(n,) + Vector of exceedance means + exceedances_weight_valid : np.ndarray(n,) + Vector of exceedance weights + conf_level : bool, default=0.05 + Significance level for Chi-squared test (default 0.05) + plot_flag : bool, default=False + Boolean flag, true to plot the graphs, false otherwise + filename : str, default=None + Path and name for making graphs + display_flag : bool, default=False + Boolean flag, true to display messages, false otherwise + + Returns + ------- + threshold : + Optimal threshold found + beta : np.ndarray + Optimal regression coefficients + fobj : + Optimal objective function (weighted leats squares) + r : np.ndarray + Optimal residuals + """ + + stop_search = 0 + it = 1 + threshold = pks_unicos_valid[0] # Initial threshold + + while stop_search == 0 and it <= 10: + # Find the current threshold in the pks_unicos_valid array + pos = np.argwhere(pks_unicos_valid == threshold).item() + u_values = pks_unicos_valid[pos:] # Threshold starting from the current one + e_values = exceedances_mean_valid[pos:] # Exceedances + w_values = exceedances_weight_valid[pos:] # Weights + + # Perform the RWLS fitting and calculate studentidez residuals + beta, fobj, r, rN = RWLSfit(u_values, e_values, w_values) + + # Plot resudals if required + if plot: + fig = plt.figure(figsize=(10, 6)) + ax = fig.add_subplot(111) + ax.plot( + u_values, + rN, + "k", + linewidth=1.5, + label=f"Internally Studentized Residuals\nMin threshold = {threshold.item()}", + ) + ax.set_xlabel(r"Threshold $u$") + ax.set_ylabel(r"$r$") + ax.set_title(f"Studentized Residuals Iteration {it}") + # ax.text(threshold + 0.5, min(rN) + 0.1 * (max(rN) - min(rN)), f'Min threshold = {threshold}') + ax.grid() + ax.legend(loc="upper right") + if filename is not None: + plt.savefig(f"{filename}_StudenRes{it}.png", dpi=300) + plt.show() + plt.close() + + if fobj > chi2.ppf(1 - conf_level, df=u_values.size - 2) or np.abs( + rN[0] + ) > norm.ppf(1 - conf_level / 2, 0, 1): + if display_flag: + if fobj > chi2.ppf(1 - conf_level, df=u_values.size - 2): + print("Chi-squared test detects anomalies") + if np.abs(rN[0]) > norm.ppf(1 - conf_level / 2, 0, 1): + print( + "The maximum studentized residual of the first record detects anomalies" + ) + + thresholdsearch = 1 + + else: + thresholdsearch = 0 + stop_search = 1 # If criteria met, stop the loop + + if thresholdsearch: + if display_flag: + print( + f"Maximum sensitivity = {np.max(np.abs(rN))} and thus the optimal threshold seems to be on the right side of the minimum sample value, looking for the location" + ) + + _, threshold = threshold_search( + u_values, + rN, + w_values, + plot, + f"{filename}_thresholdlocation{it}", + ) + if display_flag: + print(f"New threshold found: {threshold}") + + it += 1 + + return threshold, beta, fobj, r + + + +def mrlp( + data: np.ndarray, + threshold: float=None, + conf_level: float = 0.95, + ax: plt.Axes = None, + figsize: tuple = (8, 5), +) -> plt.Axes: + """ + Plot mean residual life for given threshold value. + + The mean residual life plot should be approximately linear above a threshold + for which the Generalized Pareto Distribution model is valid. + The strategy is to select the smallest threshold value immediately above + which the plot is approximately linear. + + Parameters + ---------- + data : np.ndarray + Time series of the signal. + threshold : float, optional + An array of thresholds for which the mean residual life plot is plotted. + If None (default), starting in the 90th quantile + conf_level : float, default=0.95 + Confidence interval width in the range (0, 1), by default it is 0.95. + If None, then confidence interval is not shown. + ax : matplotlib.axes._axes.Axes, optional + If provided, then the plot is drawn on this axes. + If None (default), new figure and axes are created + figsize : tuple, optional + Figure size in inches in format (width, height). + By default it is (8, 5). + + Returns + ------- + matplotlib.axes._axes.Axes + Axes object. + + """ + if threshold is None: + threshold = np.nanquantile(data, q=0.9) + + ( + pks_unicos_valid, + excedencias_mean_valid, + excedencias_weight_valid, + pks, + locs, + autocorrelations, + ) = pot(data, threshold) + + if conf_level is not None: + interlow, interup = norm.interval( + 0.95, + loc=excedencias_mean_valid, + scale=np.sqrt(1/excedencias_weight_valid), + ) + + if ax is None: + _, ax = plt.subplots(figsize=figsize, dpi=100) + ax.grid(False) + + # Plotting central estimates of mean re sidual life + ax.plot( + pks_unicos_valid, + excedencias_mean_valid, + color="#F85C50", + lw=2, + ls="-", + zorder=15, + ) + + + if conf_level is not None: + ax.plot(pks_unicos_valid, interlow, color="#5199FF", lw=1, ls="--", zorder=10) + ax.plot(pks_unicos_valid, interup, color="#5199FF", lw=1, ls="--", zorder=10) + + ax.fill_between( + pks_unicos_valid, + interlow, + interup, + facecolor="#5199FF", + edgecolor="None", + alpha=0.25, + zorder=5, + ) + + ax.set_xlabel("Threshold") + ax.set_ylabel("Mean excess") + + return ax \ No newline at end of file diff --git a/bluemath_tk/distributions/utils/pot_utils.py b/bluemath_tk/distributions/utils/pot_utils.py new file mode 100644 index 0000000..e86bce7 --- /dev/null +++ b/bluemath_tk/distributions/utils/pot_utils.py @@ -0,0 +1,265 @@ +import numpy as np +from scipy.optimize import fminbound +import matplotlib.pyplot as plt +from sklearn.metrics import r2_score +from csaps import csaps + + +def threshold_search( + u_data: np.ndarray, + e_data: np.ndarray, + W_data: np.ndarray, + plot: bool = False, + filename: str = None, +): + """ + Auxiliar function used in the studentidez_residuals method. + + Parameters + ---------- + u_data : np.ndarray + Threshold values + e_data : np.ndarray + Exceedances + W_data : np.ndarray + Weights vector + plot : bool, default=False + Flag for plotting + filename : str, default=None + File name to save plots + + Returns + ------- + fitresult : ISmoothingSpline + Fit object representing the smoothing spline fit + threshold : + Threshold value determined from the fit + """ + + if W_data is None: + W_data = np.ones(u_data.size) + if filename is None: + filename = "" + + orden = np.argsort(u_data) + u_data = u_data[orden] + e_data = e_data[orden] + W_data = W_data[orden] + + # Fit: Smoothing spline + u_mean = np.mean(u_data) + u_std = np.std(u_data, ddof=1) + + def objective_function(x): + return (smoothingspline(u_data, e_data, W_data, u_mean, u_std, x)[0] - 0.9) ** 2 + + SmoothingParam = fminbound(objective_function, 0.5, 0.99) + _, fitresult, _ = smoothingspline( + u_data, e_data, W_data, u_mean, u_std, SmoothingParam + ) + + # Find the first zero from the left + + uc = np.linspace(u_data[0], u_data[-1], 2 * len(u_data)) + # uc = np.linspace(u_data[0], u_data[-1], 1000) + ec = fitresult((uc - u_mean) / u_std) + currentsign = np.sign(ec[0]) + + ## If we want to show the fitted smoothing spline + # if ploteat: + # plt.figure(figsize=(10,6)) + # plt.plot(u_data,e_data, label="Data") + # plt.plot(uc, ec, label="Fitted") + # plt.title("Smoothing Spline Plot") + # plt.xlabel("Threshold Values (u)") + # plt.ylabel("Excedaances (e)") + # plt.grid() + # plt.show() + + zeroloc = [0, 0] + cont = 0 + for i in range(1, len(ec)): + if currentsign != np.sign(ec[i]): + # Place midpoint into zeroloc[cont] + zeroloc[cont] = (uc[i] + uc[i - 1]) / 2 + cont += 1 + currentsign = -currentsign + if cont == 2: + break + + pos1 = np.argwhere((u_data >= zeroloc[0]) & (u_data <= zeroloc[1])) + posi = np.argmax(np.abs(e_data[pos1])) + posi = pos1[0] + posi + threshold = u_data[posi] + mini = e_data[posi] + + if plot: + fig = plt.figure(figsize=(10, 6)) + ax = fig.add_subplot(111) + ax.plot(u_data, e_data, ".k", markersize=1.5, label="Data") + ax.plot( + [threshold] * 100, + np.linspace(min(e_data), max(e_data), 100), + "--", + color=[0.5, 0.5, 0.5], + linewidth=1.5, + ) + ax.plot( + threshold, + mini, + "ok", + markersize=5, + markerfacecolor="w", + linewidth=2, + label=f"Local optimum = {threshold.item()}", + ) + ax.set_xlabel(r"Threshold $u$") + ax.set_ylabel(r"$r^N$") + ax.legend(loc="upper right") + ax.grid() + plt.xticks(fontsize=14) + plt.yticks(fontsize=14) + plt.tight_layout() + if filename is not None: + plt.savefig(f"{filename}.png", dpi=300) + plt.show() + plt.close() + + return fitresult, threshold + + +def smoothingspline( + x_data: np.ndarray, + y_data: np.ndarray, + w_data: np.ndarray, + x_mean: float, + x_std: float, + lam: float, +): + """ + Fits a smoothing spline to weighted data and calculates the goodness-of-fit (R^2). + + Parameters + ---------- + x_data : np.ndarray + Independent variable. + y_data : np.ndarray + Dependent variable. + w_data : np.ndarray + Weights for the fit. + x_mean : float + Mean of independent variable + x_std : float + Standard deviation of independent variable + lam : float + Smoothing parameter (controls the tradeoff between smoothness and fit). + + Returns + ------- + fitresult : ISmoothingSpline + The fitted spline model. + r2 : float + R-squared value of the fit. + gof : dict + Goodness-of-fit metrics containing R-squared. + """ + # Normalize data + x_norm = (x_data - x_mean) / x_std + + # Ensure strict increase in x for csaps by deduplicating normalized x + x_unique, idx = np.unique(x_norm, return_index=True) + y_use = y_data[idx] + w_use = w_data[idx] + + # Final safety: if any nonpositive step remains (shouldn't after unique), nudge by eps + dx = np.diff(x_unique) + if np.any(dx <= 0): + eps = np.finfo(float).eps + bumps = np.maximum.accumulate((dx <= 0).astype(float)) + x_unique = x_unique + np.concatenate([[0.0], bumps]) * eps + + # Usando paquete CSAPS + spline = csaps(x_unique, y_use, smooth=lam, weights=w_use) + + # Usando paquete de SCIPY + # Fit smoothing spline (smoothing parameter scaled by data length) + # s_value = SmoothingParam * len(x_data) + # spline = UnivariateSpline(x_norm, y_data, w=w_data, s=s_value) + + # Compute fitted values + y_fit = spline(x_norm) + + # Compute R-squared + r2 = r2_score(y_data, y_fit) + + # Store goodness-of-fit metrics + gof = {"rsquare": r2} + + return r2, spline, gof + + # @staticmethod + + +def RWLSfit(u, e, w): + """ + Robust Weighted Least Squares (RWLS) regression. + + Parameters + ---------- + u : np.ndarray + Independent variable (predictor). + e : np.ndarray + Dependent variable (response). + w : np.ndarray + Weights for the weighted least squares fit. + + Returns + ------- + beta : np.ndarray + Estimated regression coefficients [intercept, slope]. + fobj : float + Objective function value (weighted residual sum of squares). + r : np.ndarray + Residuals. + rN : np.ndarray + Internally studentized residuals. + """ + if len(u) != len(e) or len(u) != len(w): + raise ValueError( + f"Error in the number of parameters (RWLSfit): input arrays must have the same length (u={len(u)}, e = {len(e)}, w={len(w)})." + ) + + # Data size + n = len(u) + + # Design matrix X with intercept term + X = np.column_stack((np.ones(n), u)) + Y = np.array(e) + + # Convert weights to diagonal matrix + W = np.diag(w, 0) + + # Compute optimal estimates (beta) + beta = np.linalg.inv(X.T @ W @ X) @ ( + X.T @ W @ Y + ) # Equivalent to MATLAB: (X'*W*X)\(X'*W*Y) + + # Compute residuals + r = Y - X @ beta + + # Objective function value (weighted residual sum of squares) + fobj = r.T @ W @ r + + # Standard deviation of residuals + sigres = np.sqrt(fobj / (n - 2)) + + # Residual variance-covariance matrix + # Hat or projection matrix + P = X @ np.linalg.inv(X.T @ W @ X) @ X.T + # Sensitivity matrix S = I - P * W + S = np.eye(n) - P @ W + + # Internally studentized residual + rN = (np.sqrt(np.diag(W)) * r) / (sigres * np.sqrt(1 - np.diag(W) * np.diag(P))) + + return beta, fobj, r, rN \ No newline at end of file From bb5d623f83031fcfc45911ddd411fc034fcf3796 Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 14 Oct 2025 13:52:11 +0200 Subject: [PATCH 03/14] [VCF] Change filename of plots --- bluemath_tk/distributions/pot.py | 7 ++++--- bluemath_tk/distributions/utils/pot_utils.py | 4 +--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/bluemath_tk/distributions/pot.py b/bluemath_tk/distributions/pot.py index 992c92f..9705244 100644 --- a/bluemath_tk/distributions/pot.py +++ b/bluemath_tk/distributions/pot.py @@ -427,7 +427,7 @@ def studentized_residuals( rN, w_values, plot, - f"{filename}_thresholdlocation{it}", + filename, ) if display_flag: print(f"New threshold found: {threshold}") @@ -496,7 +496,8 @@ def mrlp( ) if ax is None: - _, ax = plt.subplots(figsize=figsize, dpi=100) + fig = plt.figure(figsize=figsize) + ax = fig.add_subplot() ax.grid(False) # Plotting central estimates of mean re sidual life @@ -526,5 +527,5 @@ def mrlp( ax.set_xlabel("Threshold") ax.set_ylabel("Mean excess") - + return ax \ No newline at end of file diff --git a/bluemath_tk/distributions/utils/pot_utils.py b/bluemath_tk/distributions/utils/pot_utils.py index e86bce7..ff9274e 100644 --- a/bluemath_tk/distributions/utils/pot_utils.py +++ b/bluemath_tk/distributions/utils/pot_utils.py @@ -38,8 +38,6 @@ def threshold_search( if W_data is None: W_data = np.ones(u_data.size) - if filename is None: - filename = "" orden = np.argsort(u_data) u_data = u_data[orden] @@ -121,7 +119,7 @@ def objective_function(x): plt.yticks(fontsize=14) plt.tight_layout() if filename is not None: - plt.savefig(f"{filename}.png", dpi=300) + plt.savefig(f"{filename}_thresholdlocation.png", dpi=300) plt.show() plt.close() From 6a5bdb56bf5511e2e263b2b7fc71acaddad358b7 Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 14 Oct 2025 14:17:32 +0200 Subject: [PATCH 04/14] [VCF] Fix month initials in first year plot --- bluemath_tk/distributions/nonstat_gev.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/bluemath_tk/distributions/nonstat_gev.py b/bluemath_tk/distributions/nonstat_gev.py index a1484bc..468a638 100644 --- a/bluemath_tk/distributions/nonstat_gev.py +++ b/bluemath_tk/distributions/nonstat_gev.py @@ -7472,6 +7472,20 @@ def plot(self, return_plot: bool = False, save: bool = False, init_year: int = 0 and self.nind_sc == 0 and self.nind_sh == 0 ): + month_initials = [ + "Jul", + "Aug", + "Sep", + "Oct", + "Nov", + "Dec", + "Jan", + "Feb", + "Mar", + "Apr", + "May", + "Jun", + ] mu_t = mut[t_ord] psi_t = psit[t_ord] xi_t = epst[t_ord] From 7d8e2db4f9f7235239d8faf4f2121359dba6e43c Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 14 Oct 2025 14:17:53 +0200 Subject: [PATCH 05/14] [VCF] Fix month initials in first year plot --- bluemath_tk/distributions/nonstat_gev.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bluemath_tk/distributions/nonstat_gev.py b/bluemath_tk/distributions/nonstat_gev.py index 468a638..366bb62 100644 --- a/bluemath_tk/distributions/nonstat_gev.py +++ b/bluemath_tk/distributions/nonstat_gev.py @@ -7138,18 +7138,18 @@ def plot(self, return_plot: bool = False, save: bool = False, init_year: int = 0 ###### 1st Year PLOT month_initials = [ - "Jul", - "Aug", - "Sep", - "Oct", - "Nov", - "Dec", "Jan", "Feb", "Mar", "Apr", "May", "Jun", + "Jul", + "Aug", + "Sep", + "Oct", + "Nov", + "Dec", ] month_positions = [i / 12 for i in range(12)] From 9fca3dde9e5a03c6c8868411906fec668203f667 Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 14 Oct 2025 17:22:11 +0200 Subject: [PATCH 06/14] [VCF] Chage potplot to make it universal --- bluemath_tk/distributions/pot.py | 78 +++++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 11 deletions(-) diff --git a/bluemath_tk/distributions/pot.py b/bluemath_tk/distributions/pot.py index 9705244..65b2b68 100644 --- a/bluemath_tk/distributions/pot.py +++ b/bluemath_tk/distributions/pot.py @@ -4,8 +4,8 @@ from scipy.signal import find_peaks from scipy.stats import chi2, norm, pearsonr -from .utils.pot_utils import RWLSfit, threshold_search from ..core.io import BlueMathModel +from .utils.pot_utils import RWLSfit, threshold_search def block_maxima( @@ -151,7 +151,7 @@ def pot( adjusted_data = np.maximum(data - threshold, 0) # Usamos la librería detecta que tiene el mismo funcionamiento que la función de matlab findpeaks - locs, _ = find_peaks(adjusted_data, distance=min_peak_distance+1) + locs, _ = find_peaks(adjusted_data, distance=min_peak_distance + 1) # Con scipy # locs, _ = find_peaks(adjusted_data, distance=min_peak_distance) @@ -314,10 +314,10 @@ def fit( # TODO: Añadir más metodos - _, _, _, pks, pks_idx, _ = pot( + _, _, _, self.pks, self.pks_idx, _ = pot( self.data, self.threshold, self.n0, self.min_peak_distance ) - return self.threshold, pks, pks_idx + return self.threshold, self.pks, self.pks_idx def studentized_residuals( self, @@ -436,11 +436,68 @@ def studentized_residuals( return threshold, beta, fobj, r + def potplot( + self, + time: np.ndarray = None, + ax: plt.Axes = None, + figsize: tuple = (8, 5), + ): + fig, ax = potplot( + self.data, + self.threshold, + time, + self.n0, + self.min_peak_distance, + self.conf_level, + ax, + figsize, + ) + + return fig, ax + + +def potplot( + data: np.ndarray, + threshold: float = 0.0, + time: np.ndarray = None, + n0: int = 10, + min_peak_distance: int = 2, + conf_level: float = 0.95, + ax: plt.Axes = None, + figsize: tuple = (8, 5), +): + _, _, _, _, pks_idx, _ = pot(data, threshold, n0, min_peak_distance, conf_level) + + if time is None: + time = np.arange(data.size) + + if ax is None: + fig = plt.figure(figsize=figsize) + ax = fig.add_subplot() + + # Plot complete series + ax.plot(time, data, ls="-", color="#5199FF", lw=0.25, zorder=10) + + # Plot extremes + ax.scatter( + time[pks_idx], + data[pks_idx], + s=20, + lw=0.5, + edgecolor="w", + facecolor="#F85C50", + zorder=20, + ) + + ax.axhline(threshold, ls="--", lw=1, color="#FF756B", zorder=15) + ax.set_xlabel("Time") + + return fig, ax def mrlp( data: np.ndarray, - threshold: float=None, + threshold: float = None, conf_level: float = 0.95, ax: plt.Axes = None, figsize: tuple = (8, 5), @@ -450,7 +507,7 @@ def mrlp( The mean residual life plot should be approximately linear above a threshold for which the Generalized Pareto Distribution model is valid. - The strategy is to select the smallest threshold value immediately above + The strategy is to select the smallest threshold value immediately above which the plot is approximately linear. Parameters @@ -492,8 +549,8 @@ def mrlp( interlow, interup = norm.interval( 0.95, loc=excedencias_mean_valid, - scale=np.sqrt(1/excedencias_weight_valid), - ) + scale=np.sqrt(1 / excedencias_weight_valid), + ) if ax is None: fig = plt.figure(figsize=figsize) @@ -510,7 +567,6 @@ def mrlp( zorder=15, ) - if conf_level is not None: ax.plot(pks_unicos_valid, interlow, color="#5199FF", lw=1, ls="--", zorder=10) ax.plot(pks_unicos_valid, interup, color="#5199FF", lw=1, ls="--", zorder=10) @@ -527,5 +583,5 @@ def mrlp( ax.set_xlabel("Threshold") ax.set_ylabel("Mean excess") - - return ax \ No newline at end of file + + return ax From dff924e7a8668fe57a46232bc8f37967e85493eb Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 14 Oct 2025 17:40:57 +0200 Subject: [PATCH 07/14] [VCF] Update documentation --- bluemath_tk/distributions/pot.py | 80 ++++++++++++++++++++++++++------ 1 file changed, 65 insertions(+), 15 deletions(-) diff --git a/bluemath_tk/distributions/pot.py b/bluemath_tk/distributions/pot.py index 65b2b68..7b3018d 100644 --- a/bluemath_tk/distributions/pot.py +++ b/bluemath_tk/distributions/pot.py @@ -112,7 +112,7 @@ def pot( threshold: float = 0.0, n0: int = 10, min_peak_distance: int = 2, - conf_level: float = 0.05, + sig_level: float = 0.05, ): """ Function to identiy POT @@ -131,6 +131,8 @@ def pot( Minimum number of exceedances required for valid computation min_peak_distance : int, default = 2 Minimum distance between two peaks (in data points) + sig_level : float, default=0.05 + Significance level for Chi-squared test Returns ------- @@ -168,7 +170,7 @@ def pot( autocorrelations[i, 1] = r autocorrelations[i, 2] = p_value - if p_value < conf_level: + if p_value < sig_level: Warning( f"Lag {int(lag)} significant, consider increase the number of min_peak_distance" ) @@ -262,7 +264,7 @@ def __init__( threshold: float = 0.0, n0: int = 10, min_peak_distance: int = 2, - conf_level: float = 0.05, + sig_level: float = 0.05, method: str = "studentized", plot: bool = False, filename: str = None, @@ -281,7 +283,7 @@ def __init__( self.autocorrelations, ) = pot(self.data, threshold, n0, min_peak_distance) self.method = method - self.conf_level = conf_level + self.sig_level = sig_level self.plot = plot self.filename = filename self.display_flag = display_flag @@ -306,7 +308,7 @@ def fit( self.pks_unicos_valid, self.excedencias_mean_valid, self.excedencias_weight_valid, - self.conf_level, + self.sig_level, self.plot, self.filename, self.display_flag, @@ -324,7 +326,7 @@ def studentized_residuals( pks_unicos_valid: np.ndarray, exceedances_mean_valid: np.ndarray, exceedances_weight_valid: np.ndarray, - conf_level: float = 0.05, + sig_level: float = 0.05, plot: bool = False, filename: str = None, display_flag: bool = False, @@ -342,8 +344,8 @@ def studentized_residuals( Vector of exceedance means exceedances_weight_valid : np.ndarray(n,) Vector of exceedance weights - conf_level : bool, default=0.05 - Significance level for Chi-squared test (default 0.05) + sig_level : bool, default=0.05 + Significance level for Chi-squared test plot_flag : bool, default=False Boolean flag, true to plot the graphs, false otherwise filename : str, default=None @@ -399,13 +401,13 @@ def studentized_residuals( plt.show() plt.close() - if fobj > chi2.ppf(1 - conf_level, df=u_values.size - 2) or np.abs( + if fobj > chi2.ppf(1 - sig_level, df=u_values.size - 2) or np.abs( rN[0] - ) > norm.ppf(1 - conf_level / 2, 0, 1): + ) > norm.ppf(1 - sig_level / 2, 0, 1): if display_flag: - if fobj > chi2.ppf(1 - conf_level, df=u_values.size - 2): + if fobj > chi2.ppf(1 - sig_level, df=u_values.size - 2): print("Chi-squared test detects anomalies") - if np.abs(rN[0]) > norm.ppf(1 - conf_level / 2, 0, 1): + if np.abs(rN[0]) > norm.ppf(1 - sig_level / 2, 0, 1): print( "The maximum studentized residual of the first record detects anomalies" ) @@ -442,13 +444,32 @@ def potplot( ax: plt.Axes = None, figsize: tuple = (8, 5), ): + """ + Auxiliar function which call generic potplot to plot the POT usign the optimal threshold obtained + + Parameters + ---------- + time : np.ndarray, default=None + Time of data + ax : plt.Axes, default=None + Axes + figsize : tuple, default=(8,5) + Figure size, by default (8, 5) + + Returns + ------- + fig : plt.Figure + Figure + ax : plt.Axes + Axes + """ fig, ax = potplot( self.data, self.threshold, time, self.n0, self.min_peak_distance, - self.conf_level, + self.sig_level, ax, figsize, ) @@ -462,11 +483,40 @@ def potplot( time: np.ndarray = None, n0: int = 10, min_peak_distance: int = 2, - conf_level: float = 0.95, + sig_level: float = 0.05, ax: plt.Axes = None, figsize: tuple = (8, 5), ): - _, _, _, _, pks_idx, _ = pot(data, threshold, n0, min_peak_distance, conf_level) + """ + Plot the POT for data given a threshold. + + Parameters + ---------- + data : np.ndarray + Data + threshold : float, default=0.0 + Threshold used to plot the peaks + time : np.ndarray, default=None + Time of data + n0 : int, default=10 + Minimum number of data to compute the POT given a threshold + min_peak_distance : int, default=2 + Minimum peak distance between POT (in index size) + sig_level : float, default=0.05 + Significance level for Chi-squared test + ax : plt.Axes, default=None + Axes + figsize : tuple, default=(8,5) + Figure figsize + + Returns + ------- + fig : plt.Figure + Figure + ax : plt.Axes + Axes + """ + _, _, _, _, pks_idx, _ = pot(data, threshold, n0, min_peak_distance, sig_level) if time is None: time = np.arange(data.size) From 38fcc53c7fee5159a4fc23a3f4fe04a4d3ecaef4 Mon Sep 17 00:00:00 2001 From: Colladito Date: Mon, 20 Oct 2025 12:24:14 +0200 Subject: [PATCH 08/14] [VCF] Small changes in pot and add pot_utils.py --- bluemath_tk/distributions/pot.py | 17 ++++++++--------- bluemath_tk/distributions/utils/pot_utils.py | 8 ++++---- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/bluemath_tk/distributions/pot.py b/bluemath_tk/distributions/pot.py index 7b3018d..933f103 100644 --- a/bluemath_tk/distributions/pot.py +++ b/bluemath_tk/distributions/pot.py @@ -1,6 +1,5 @@ import matplotlib.pyplot as plt import numpy as np -import pandas as pd from scipy.signal import find_peaks from scipy.stats import chi2, norm, pearsonr @@ -267,7 +266,7 @@ def __init__( sig_level: float = 0.05, method: str = "studentized", plot: bool = False, - filename: str = None, + folder: str = None, display_flag: bool = False, ): self.data = data @@ -285,7 +284,7 @@ def __init__( self.method = method self.sig_level = sig_level self.plot = plot - self.filename = filename + self.folder = folder self.display_flag = display_flag def fit( @@ -310,7 +309,7 @@ def fit( self.excedencias_weight_valid, self.sig_level, self.plot, - self.filename, + self.folder, self.display_flag, ) @@ -328,7 +327,7 @@ def studentized_residuals( exceedances_weight_valid: np.ndarray, sig_level: float = 0.05, plot: bool = False, - filename: str = None, + folder: str = None, display_flag: bool = False, ): """ @@ -348,7 +347,7 @@ def studentized_residuals( Significance level for Chi-squared test plot_flag : bool, default=False Boolean flag, true to plot the graphs, false otherwise - filename : str, default=None + folder : str, default=None Path and name for making graphs display_flag : bool, default=False Boolean flag, true to display messages, false otherwise @@ -396,8 +395,8 @@ def studentized_residuals( # ax.text(threshold + 0.5, min(rN) + 0.1 * (max(rN) - min(rN)), f'Min threshold = {threshold}') ax.grid() ax.legend(loc="upper right") - if filename is not None: - plt.savefig(f"{filename}_StudenRes{it}.png", dpi=300) + if folder is not None: + plt.savefig(f"{folder}/StudenRes{it}.png", dpi=300) plt.show() plt.close() @@ -429,7 +428,7 @@ def studentized_residuals( rN, w_values, plot, - filename, + folder, ) if display_flag: print(f"New threshold found: {threshold}") diff --git a/bluemath_tk/distributions/utils/pot_utils.py b/bluemath_tk/distributions/utils/pot_utils.py index ff9274e..a8c75a0 100644 --- a/bluemath_tk/distributions/utils/pot_utils.py +++ b/bluemath_tk/distributions/utils/pot_utils.py @@ -10,7 +10,7 @@ def threshold_search( e_data: np.ndarray, W_data: np.ndarray, plot: bool = False, - filename: str = None, + folder: str = None, ): """ Auxiliar function used in the studentidez_residuals method. @@ -25,7 +25,7 @@ def threshold_search( Weights vector plot : bool, default=False Flag for plotting - filename : str, default=None + folder : str, default=None File name to save plots Returns @@ -118,8 +118,8 @@ def objective_function(x): plt.xticks(fontsize=14) plt.yticks(fontsize=14) plt.tight_layout() - if filename is not None: - plt.savefig(f"{filename}_thresholdlocation.png", dpi=300) + if folder is not None: + plt.savefig(f"{folder}/thresholdlocation.png", dpi=300) plt.show() plt.close() From e6c74bbf25f9f9f8a7c46f6824539d0af455a78a Mon Sep 17 00:00:00 2001 From: Colladito Date: Mon, 20 Oct 2025 12:24:41 +0200 Subject: [PATCH 09/14] [VCF] Add first functions in pareto_poisson.py with GPD-Poisson model --- bluemath_tk/distributions/pareto_poisson.py | 137 ++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 bluemath_tk/distributions/pareto_poisson.py diff --git a/bluemath_tk/distributions/pareto_poisson.py b/bluemath_tk/distributions/pareto_poisson.py new file mode 100644 index 0000000..a63104a --- /dev/null +++ b/bluemath_tk/distributions/pareto_poisson.py @@ -0,0 +1,137 @@ +import numpy as np + +from ._base_distributions import BaseDistribution + +class GPDPoiss(BaseDistribution): + def __init__(self) -> None: + """ + Initialize the GPD-Poisson mode + """ + super().__init__() + + @property + def name() -> str: + return "GPD-Poisson model" + + @property + def nparams() -> int: + """ + Number of parameters of GPD-Poisson model + """ + return int(4) + + @property + def param_names() -> list[str]: + """ + Name of parameters of GPD-Poisson + """ + return ["threshold", "scale", "shape", "poisson"] + + @staticmethod + def pdf( + x: np.ndarray, threshold: float = 0.0, scale: float = 1.0, shape: float = 0.0, poisson: float = 1.0 + ) -> np.ndarray: + """ + Probability density function + + Parameters + ---------- + x : np.ndarray + Values to compute the probability density value + threshold : float, default=0.0 + Threshold parameter + scale : float, default = 1.0 + Scale parameter. + Must be greater than 0. + shape : float, default = 0.0 + Shape parameter. + poisson : float, default = 1.0 + + Returns + ---------- + pdf : np.ndarray + Probability density function values + + Raises + ------ + ValueError + If scale is not greater than 0. + ValueError + If poisson is not greater than 0. + """ + + if scale <= 0: + raise ValueError("Scale parameter must be > 0") + + if poisson <= 0: + raise ValueError("Poisson parameter must be > 0") + + + y = np.maximum(x - threshold, 0) / scale + + # # Gumbel case (shape = 0) + # if shape == 0.0: + # pdf = (1 / scale) * (np.exp(-y) * np.exp(-np.exp(-y))) + + # # General case (Weibull and Frechet, shape != 0) + # else: + # pdf = np.full_like(x, 0, dtype=float) # 0 + # yy = 1 + shape * y + # yymask = yy > 0 + # pdf[yymask] = (1 / scale) * ( + # yy[yymask] ** (-1 - (1 / shape)) * np.exp(-(yy[yymask] ** (-1 / shape))) + # ) + + # return pdf + + @staticmethod + def qf( + p: np.ndarray, threshold: float = 0.0, scale: float = 1.0, shape: float = 0.0, poisson: float = 1.0 + ) -> np.ndarray: + """ + Quantile function (Inverse of Cumulative Distribution Function) + + Parameters + ---------- + p : np.ndarray + Probabilities to compute their quantile + loc : float, default=0.0 + Location parameter + scale : float, default = 1.0 + Scale parameter. + Must be greater than 0. + shape : float, default = 0.0 + Shape parameter. + + Returns + ---------- + q : np.ndarray + Quantile value + + Raises + ------ + ValueError + If probabilities are not in the range (0, 1). + + ValueError + If scale is not greater than 0. + """ + + if np.min(p) <= 0 or np.max(p) >= 1: + raise ValueError("Probabilities must be in the range (0, 1)") + + if scale <= 0: + raise ValueError("Scale parameter must be > 0") + + if poisson <= 0: + raise ValueError("Poisson parameter must be > 0") + + # Gumbel case (shape = 0) + if shape == 0.0: + q = threshold - scale * np.log(-np.log(p)/poisson) + + # General case (Weibull and Frechet, shape != 0) + else: + q = threshold - (1-(-np.log(p)/poisson)**(-shape))*scale/shape + + return q \ No newline at end of file From 8e26e9b1eed794c3d47eb27c5daac3b4c46918d9 Mon Sep 17 00:00:00 2001 From: Colladito Date: Mon, 20 Oct 2025 12:25:39 +0200 Subject: [PATCH 10/14] [VCF] Add first ExtremeCorrection class --- .../distributions/extreme_correction.py | 513 ++++++++++++++++++ 1 file changed, 513 insertions(+) create mode 100644 bluemath_tk/distributions/extreme_correction.py diff --git a/bluemath_tk/distributions/extreme_correction.py b/bluemath_tk/distributions/extreme_correction.py new file mode 100644 index 0000000..5f65510 --- /dev/null +++ b/bluemath_tk/distributions/extreme_correction.py @@ -0,0 +1,513 @@ +import numpy as np +import xarray as xr +import os +import pandas as pd +import scipy.stats as stats + +from ..core.io import BlueMathModel +from ..distributions.gev import GEV +from ..distributions.gpd import GPD +from ..distributions.pot import OptimalThreshold, pot, block_maxima +from ..distributions.pareto_poisson import GPDPoiss + + +class ExtremeCorrection(BlueMathModel): + """ + Extreme Correction class + """ + + def __init__( + self, + corr_config: dict, + pot_config: dict, + method: str = "pot", + conf_level: float = 0.95, + debug: bool = False, + ): + """ + Extreme value correction for sampled datasets using + Generalized Extreme Value (GEV) or Peaks Over Threshold (POT) approaches. + + This class applies upper-tail corrections to sampled datasets + by fitting extreme value distributions to historical observations + and adjusting the sampled extremes accordingly. See V. Collado (2025) [1]. + + Parameters + ---------- + config : dict + Dictionary containing the main configuration of the model. + Required keys: + - var : str + Variable to apply the correction. + - time_var : str + Name of the time variable (datetime or timestamp). + - yyyy_var : str + Name of the year variable. + - freq : float or int + Frequency of observations per year + (e.g., 365.25 for daily data). + Optional keys: + - mm_var : str, default "mm" + Name of the month variable. + - dd_var : str, default "dd" + Name of the day variable. + - folder : str, default None + Path to a folder where diagnostic plots will be saved. + pot_config : dict + Dictionary containing the POT configuration. + Keys: + - n0 : int, default 10 + Minimum number of exceedances required. + - min_peak_distance : int, default 2 + Minimum distance (in data points) between two peaks. + - init_threshold : float, default 0.0 + Initial threshold for peak extraction. + - siglevel : float, default 0.05 + Significance level for the Chi-squared test in + threshold optimization. + - plot_flag : bool, default True + Whether to generate threshold selection plots. + method : {"am", "pot"}, default "pot" + Method for correction. + - "am" : Annual Maxima using GEV distribution. + - "pot" : Peaks Over Threshold using GPD distribution. + conf_level : float, default=0.95 + Confidence level for return period confidence intervals. + """ + super().__init__() + + self.set_logger_name( + name=self.__class__.__name__, level="DEBUG" if debug else "INFO" + ) + self.logger.info("Initializing Extreme Correction Procedure") + + # TODO: CAMBIAR EL CONFIG + # Validate config input + self.config = corr_config + self._validate_config() + + # Method + self.method = method.lower() + if self.method == "pot": + self.pot_config = pot_config + self._validate_pot_config() + + # Initialize fitted parameters + # If GEV (loc, scale, shape) + # If GPD (threshold, scale, shape) + self.parameters = np.empty(3) + + # Confidence level + self.conf = conf_level + + def _validate_config(self) -> None: + """ + Validate the configuration dictionary for extreme correction + + Raise + ----- + KeyError + If any required key is missing + TypeError + If type of any required key is wrong + """ + # Required fields + required_fields = { + "var": str, + # "time_var": str, + # "yyyy_var": str, + # "freq": float | int, + } + + for key, exp_type in required_fields.items(): + if key not in self.config: + if key not in self.config: + raise KeyError( + f"Configuration error: Key '{key}' is missing in the config dictionary." + ) + if not isinstance(self.config[key], exp_type): + raise TypeError( + f"Configuration error: Key '{key}' must be of type {exp_type.__name__}." + ) + + # Optional fields with defaults + optional_fields = { + "mm_var": "mm", + "dd_var": "dd", + "bmus_var": None, + "folder": None, + } + + for key, default_value in optional_fields.items(): + self.config[key] = self.config.get(key, default_value) + + # Define the configuration in the class + self.var = self.config.get("var") + self.time_var = self.config.get("time_var") + self.year_var = self.config.get("yyyy_var") + self.month_var = self.config.get("mm_var") + self.day_var = self.config.get("dd_var") + self.freq = self.config.get("freq") + # Weather Type variable in case we apply the correction by WT + self.bmus_var = self.config["bmus_var"] + + if self.config["folder"] is not None: + self.folder = self.config["folder"] + os.makedirs(self.folder, exist_ok=True) + + # TODO: Corrección por WT + # if self.config.get(self.bmus_var) is not None: + # self.n_wt = np.unique(self.data_hist[self.bmus_var]) + # else: + # self.n_wt = 1 + + def _validate_pot_config(self) -> None: + """ + Validate POT configuration dictionary for peaks extraction. + """ + + self.pot_config["n0"] = self.pot_config.get("n0", 10) + self.pot_config["min_peak_distance"] = self.pot_config.get( + "min_peak_distance", 2 + ) + self.pot_config["init_threshold"] = self.pot_config.get("init_threshold", 0.0) + self.pot_config["sig_level"] = self.pot_config.get("siglevel", 0.05) + self.pot_config["plot"] = self.pot_config.get("plot", False) + + def fit( + self, + data_hist: xr.Dataset, + plot_diagnostic: bool = False, + ) -> None: + """ + Fit the historical data into GEV or GPD + + Parameters + ---------- + data_hist : xr.Dataset + Dataset with historical data + bmus : list[bool, str], default=[False, ""] + Whether to apply the correction by BMUS, if given the name of bmus variable should be given + plot_diagnostic : bool, default=False + Whether to plot the diagnostics plot of the fitted distribution + """ + self.pit_data, self.am_data = self._preprocess_data( + data_hist, + var=self.config.get("var"), + bmus=self.config.get("bmus", [False, ""]), + sim=False, + join_sims=self.config.get("join_sims", True), + ) + + # If POT used in fitting step + if self.method == "pot": + opt_threshold = OptimalThreshold( + data=self.pit_data, + threshold=self.pot_config.get("init_threshold", 0.0), + n0=self.pot_config.get("n0", 10), + min_peak_distance=self.pot_config.get( + "min_peak_distance", 2 + ), + sig_level=self.pot_config.get("sig_level", 0.05), + method=self.pot_config.get("method", "studentized"), + plot=self.pot_config.get("plot", False), + folder=self.pot_config.get("folder", False), + display_flag=self.pot_config.get("display_flag", False), + ) + self.threshold, pot_data, pot_idx = opt_threshold.fit() + self.poiss_parameter = pot_data.size / self.am_data.size + + fit_result = GPD.fit(pot_data, f0=self.threshold) + + # If Annual Maxima used in fitting step + if self.method == "am": + fit_result = GEV.fit(self.am_data) + self.poiss_parameter = 1 # If GEV only 1 exceedance per year (AM) + + # [loc, scale, shape] if GEV or [threshold, scale, shape] if GPD + self.parameters = fit_result.params + + # TODO: Ver que diagnostic devolver alomejor no hace falta todo + if plot_diagnostic: + fit_result.plot() + + def transform( + self, + data_sim: xr.Dataset, + prob: str = "unif", + random_state: int = 0, + siglevel: float = 0.05, + ) -> xr.Dataset: + """ + Apply the correction in the synthetic dataset + + Parameters + ---------- + data_sim : xr.Dataset + Dataset with synthetic data + prob : str, default="unif" + Type of probabilities consider to random correct the AM + If "unif", a sorted random uniform is considered + If "ecdf", the ECDF is considered + random_state : int, default=0 + Random state to generate the probabilities + siglevel : float, default=0.05 + + Returns + ------- + sim_pit_data_corrected : xr.Dataset + Point-in-time corrected data + """ + np.random.seed(random_state) + + self.sim_pit_data, self.sim_am_data = self._preprocess_data( + data_sim, + var=self.config.get("var"), + bmus=self.config.get("bmus", [False, ""]), + sim=True, + join_sims=self.config.get("join_sims", True), + ) + self.sim_am_data_sorted = np.sort(self.sim_am_data) + self.n_year_sim = self.sim_am_data.shape[0] + + # Avoid correct when AM is 0 + self.am_idx_0 = 0 + for idx, value in enumerate(np.sort(self.am_data)): + if value == 0: + self.am_index_0 += 1 + else: + break + + # Test if the correction has to be applied + test_result = self.test() + self.p_value = test_result.get("P-value") + if self.p_value > siglevel: + self.logger.info( + f"Synthetic data comes from fitted distribution (P-value: {self.p_value:.4%})" + ) + self.sim_am_data_corr = self.sim_am_data + self.sim_pit_data_corrected = self.sim_pit_data + + return + else: + self.sim_am_data_corr = np.zeros(self.n_year_sim) + + # Define probs + if prob == "unif": + self.rprob_sim = np.sort( + np.random.uniform(low=0, high=1, size=self.n_year_sim) + ) + else: + self.rprob_sim = np.arange(1, self.n_year_sim + 1) / ( + self.n_year_sim + 1 + ) # ECDF + + # Apply correction on AM + if self.method == "pot": + # TODO: Añadir funciones de POT + self.sim_am_data_corr[self.am_idx_0 :] = GPDPoiss.qf( + self.rprob_sim[self.am_idx_0 :], + threshold=self.parameters[0], + scale=self.parameters[1], + shape=self.parameters[2], + poisson=self.poiss_parameter, + ) + elif self.method == "am": + self.sim_am_data_corr[self.am_idx_0 :] = GEV.qf( + self.rprob_sim[self.am_idx_0 :], + loc=self.parameters[0], + scale=self.parameters[1], + shape=self.parameters[2], + ) + + # Apply correction in pit data + if self.n_year_sim > 1: + self.sim_pit_data_corrected = np.interp( + self.sim_pit_data, # x-coords to interpolate + np.append( + min(self.sim_pit_data), self.sim_am_data_sorted + ), # x-coords of data points + np.append( + min(self.sim_pit_data), self.sim_am_data_corr + ), # y-coords of data points + ) + + return self.sim_pit_data_corrected + + def fit_transform( + self, + data_hist: xr.Dataset, + data_sim: xr.Dataset, + bmus: list[bool, str] = [False, ""], + prob: str = "unif", + plot_diagnostic: bool = False, + random_state: int = 0, + ) -> xr.Dataset: + """ + Fit and apply the correction procedure + + See fit and transform for more information + + Parameters + ---------- + data_hist : xr.Dataset + Dataset with historical data + data_sim : xr.Dataset + Dataset with synthetic data + bmus : list[bool, str], default=[False, ""] + Whether to apply the correction by BMUS, if given the name of bmus variable should be given + prob : str, default="unif" + Type of probabilities consider to random correct the AM + If "unif", a sorted random uniform is considered + If "ecdf", the ECDF is considered + plot_diagnostic : bool, default=False + Whether to plot the diagnostics plot of the fitted distribution + random_state : int, default=0 + Random state to generate the probabilities + + Returns + ------- + sim_pit_data_corrected : xr.Dataset + Point-in-time corrected data + """ + self.fit(data_hist=data_hist, bmus=bmus, plot_diagnostic=plot_diagnostic) + + return self.transform(data_sim=data_sim, prob=prob, random_state=random_state) + + def _preprocess_data( + self, + data: xr.Dataset, + var: list[str], + bmus: list[bool, str] = [False, ""], + sim: bool=True, + join_sims: bool = True, + ) -> tuple[np.ndarray, np.ndarray]: + """ + Preprocess the data + + Parameters + ---------- + data : xr.Dataset + Data to apply correction + var : list[str] + List of variables to apply the correction technique. FUTURE WORK: INCLUDE MORE THAN ONE + bmus : list[bool, str], default=[False, ""] + List to decide if the correction must be applied by WT and if so name of the variable + join_sims : bool, default=True + Whether to joint all the simulations in one array + + Return + ------ + pit_data : np.ndarray + Point-in-time data + am_data : np.ndarray + Annual Maxima values + + """ + # dict_allowed_freq = {"D": 1, "h": 24, "m": 1440} + + # self.freqstr = data.indexes["time"].freqstr + # self.freq = dict_allowed_freq.get(self.freqstr) + + if join_sims and sim: + n_sims = data.get("n_sim").values + pit_data = np.array([]) + am_data = np.array([]) + for sim in n_sims: + pit_data = np.append(pit_data, data.get(f"{var}").sel(n_sim=sim).values) + am_data = np.append( + am_data, + data.get(f"{var}").sel(n_sim=sim).groupby("time.year").max().values, + ) + else: + pit_data = data.get(f"{var}").values.T + am_data = data.get(f"{var}").groupby("time.year").max().values.T + + return pit_data, am_data + + def test(self) -> dict: + """ + Cramer Von-Mises test to check the GOF of fitted distribution + + Test to check the Goodness-of-Fit of the historical fitted distribution with the synthetic data. + Null Hypothesis: sampled AM comes from the fitted extreme distribution. + + Returns + ------- + dict + Statistic and p-value of the Cramer Von-Mises test + + Notes + ----- + The test is applied in the AM since the correction procedure is applied in the AM + """ + + if self.method == "pot": + gev_location = ( + self.parameters[0] + + ( + self.parameters[1] + * (1 - self.poiss_parameter ** self.parameters[2]) + ) + / self.parameters[2] + ) + gev_scale = self.parameters[1] * self.poiss_parameter ** self.parameters[2] + + # POT test + # res_test = stats.cramervonmises(self.sim_pot_data, + # cdf=stats.genpareto.cdf, + # args=(self.parameters[2], self.parameters[0], self.parameters[1]) + # ) + + # AM test to derived GEV from GPD-Poisson + res_test = stats.cramervonmises( + self.sim_am_data, + cdf=stats.genextreme.cdf, + args=(self.parameters[2], gev_location, gev_scale), + ) + return {"Statistic": res_test.statistic, "P-value": res_test.pvalue} + + elif self.method == "am": + res_test = stats.cramervonmises( + self.sim_am_data, + cdf=stats.genextreme.cdf, + args=(self.parameters[2], self.parameters[0], self.parameters[1]), + ) + return {"Statistic": res_test.statistic, "P-value": res_test.pvalue} + + def plot(self): + """ + Plot + + TODO: Plots a añadir: + - Añadir Return Period Plot de la serie simulada + """ + pass + + def correlations(self) -> dict: + """ + Rank based correlations between sampled and corrected sampled data + + Returns + ------- + dict : + Dictionary with Spearman, Kendall and Pearson correlation coefficients. + Keys : + - "Spearman" : Spearman correlation coefficient + - "Kendall" : Kendall correlation coefficient + - "Pearson" : Pearson correlation coefficient + """ + + spearman_corr, _ = stats.spearmanr( + self.sim_pit_data, self.sim_pit_data_corrected + ) + kendall_corr, _ = stats.kendalltau( + self.sim_pit_data, self.sim_pit_data_corrected + ) + pearson_corr, _ = stats.pearsonr(self.sim_pit_data, self.sim_pit_data_corrected) + + return { + "Spearman": spearman_corr, + "Kendall": kendall_corr, + "Pearson": pearson_corr, + } From 174ecacc80a3df9e0f1bcc3715e70a5af2b5ce2d Mon Sep 17 00:00:00 2001 From: Colladito Date: Mon, 20 Oct 2025 12:26:18 +0200 Subject: [PATCH 11/14] [VCF] Format imports in extreme_correction.py --- bluemath_tk/distributions/extreme_correction.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/bluemath_tk/distributions/extreme_correction.py b/bluemath_tk/distributions/extreme_correction.py index 5f65510..aa2b93d 100644 --- a/bluemath_tk/distributions/extreme_correction.py +++ b/bluemath_tk/distributions/extreme_correction.py @@ -1,14 +1,14 @@ -import numpy as np -import xarray as xr import os -import pandas as pd + +import numpy as np import scipy.stats as stats +import xarray as xr from ..core.io import BlueMathModel from ..distributions.gev import GEV from ..distributions.gpd import GPD -from ..distributions.pot import OptimalThreshold, pot, block_maxima from ..distributions.pareto_poisson import GPDPoiss +from ..distributions.pot import OptimalThreshold class ExtremeCorrection(BlueMathModel): @@ -205,9 +205,7 @@ def fit( data=self.pit_data, threshold=self.pot_config.get("init_threshold", 0.0), n0=self.pot_config.get("n0", 10), - min_peak_distance=self.pot_config.get( - "min_peak_distance", 2 - ), + min_peak_distance=self.pot_config.get("min_peak_distance", 2), sig_level=self.pot_config.get("sig_level", 0.05), method=self.pot_config.get("method", "studentized"), plot=self.pot_config.get("plot", False), @@ -379,7 +377,7 @@ def _preprocess_data( data: xr.Dataset, var: list[str], bmus: list[bool, str] = [False, ""], - sim: bool=True, + sim: bool = True, join_sims: bool = True, ) -> tuple[np.ndarray, np.ndarray]: """ From 2b3b062929e070a4a80a471e3715be300ab55bb7 Mon Sep 17 00:00:00 2001 From: Colladito Date: Mon, 20 Oct 2025 17:06:01 +0200 Subject: [PATCH 12/14] [VCF] Add Conf Intervals in Historical Return period plot --- .../distributions/extreme_correction.py | 69 +++++++++++++++++-- bluemath_tk/distributions/pareto_poisson.py | 14 +++- bluemath_tk/distributions/utils/pot_utils.py | 33 ++++++++- 3 files changed, 110 insertions(+), 6 deletions(-) diff --git a/bluemath_tk/distributions/extreme_correction.py b/bluemath_tk/distributions/extreme_correction.py index aa2b93d..6b70a12 100644 --- a/bluemath_tk/distributions/extreme_correction.py +++ b/bluemath_tk/distributions/extreme_correction.py @@ -3,12 +3,14 @@ import numpy as np import scipy.stats as stats import xarray as xr +import matplotlib.pyplot as plt from ..core.io import BlueMathModel from ..distributions.gev import GEV from ..distributions.gpd import GPD from ..distributions.pareto_poisson import GPDPoiss from ..distributions.pot import OptimalThreshold +from ..distributions.utils.pot_utils import gpdpoiss_ci_rp_bootstrap class ExtremeCorrection(BlueMathModel): @@ -198,6 +200,7 @@ def fit( sim=False, join_sims=self.config.get("join_sims", True), ) + self.n_year = self.am_data.size # If POT used in fitting step if self.method == "pot": @@ -212,10 +215,10 @@ def fit( folder=self.pot_config.get("folder", False), display_flag=self.pot_config.get("display_flag", False), ) - self.threshold, pot_data, pot_idx = opt_threshold.fit() - self.poiss_parameter = pot_data.size / self.am_data.size + self.threshold, self.pot_data, pot_idx = opt_threshold.fit() + self.poiss_parameter = self.pot_data.size / self.am_data.size - fit_result = GPD.fit(pot_data, f0=self.threshold) + fit_result = GPD.fit(self.pot_data, f0=self.threshold) # If Annual Maxima used in fitting step if self.method == "am": @@ -480,7 +483,65 @@ def plot(self): TODO: Plots a añadir: - Añadir Return Period Plot de la serie simulada """ - pass + fig1, ax1 = self.hist_retper_plot() + + return fig1, ax1 + + def hist_retper_plot(self): + """ + Historical Return Period plot + + Returns + ------- + fig + Figure + ax + Axes + """ + + ecdf_annmax_probs_hist = np.arange(1, self.n_year + 1) / (self.n_year + 1) + self.T_annmax = 1 / (1 - ecdf_annmax_probs_hist) + + # Fitted Return Periods + self.T_years = np.array([1.001, 1.01, 1.1, 1.2, 1.4, 1.6, 2, 2.5, 3, 3.5, 4, 4.5, 5, 7.5, 10, 12.5, 15, 17.5, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 150, 200, 500, 1000]) + if self.method == "pot": + ret_levels = GPDPoiss.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2], self.poiss_parameter) + lower_ci_rp, upper_ci_rp = gpdpoiss_ci_rp_bootstrap( + pot_data=self.pot_data, + years=self.T_years, + threshold=self.threshold, + poisson=self.poiss_parameter, + B=1000, + conf_level=0.95) + else: + ret_levels = GEV.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2], self.poiss_parameter) + upper_ci_rp = 1 + lower_ci_rp = 1 + + fig = plt.figure(figsize=(8,5)) + ax = fig.add_subplot(111) + + # Fitted distribution + ax.semilogx(self.T_years, ret_levels, color = 'red',linestyle='dashed', linewidth=2.5, label='Fitted GPD-Poisson') + # Confidence interval for fitted Distribution + ax.semilogx(self.T_years, upper_ci_rp, color = "tab:gray",linestyle='dotted', label=f'{self.conf} Conf. Band') + ax.semilogx(self.T_years, lower_ci_rp, color = "tab:gray",linestyle='dotted') + + # Historical AM values + ax.semilogx(self.T_annmax, np.sort(self.am_data), color="tab:blue", linewidth=0, marker='o',markersize=5, label='Historical Annual Maxima') + + ax.set_xlabel("Return Periods (Years)") + ax.set_ylabel(f"{self.var}") + ax.set_xscale('log') + ax.set_xticks([1, 2, 5, 10, 20, 50, 100, 250, 1000, 10000]) + ax.get_xaxis().set_major_formatter(plt.ScalarFormatter()) + ax.set_xlim(left=0.9, right=self.n_year + 100) + ax.set_ylim(bottom=0) + ax.legend(loc='best') + ax.grid() + + return fig, ax + def correlations(self) -> dict: """ diff --git a/bluemath_tk/distributions/pareto_poisson.py b/bluemath_tk/distributions/pareto_poisson.py index a63104a..3f99959 100644 --- a/bluemath_tk/distributions/pareto_poisson.py +++ b/bluemath_tk/distributions/pareto_poisson.py @@ -134,4 +134,16 @@ def qf( else: q = threshold - (1-(-np.log(p)/poisson)**(-shape))*scale/shape - return q \ No newline at end of file + return q + + def cdf( + x: np.ndarray, threshold: float = 0.0, scale: float = 1.0, shape: float = 0.0, poisson: float = 1.0 + ) -> np.ndarray: + + if shape == 0.0: + expr = (np.maximum(x-threshold, 0))/scale + return np.exp(-poisson*np.exp(-expr)) + + else: + expr = 1+shape*(np.maximum(x-threshold, 0)/scale) + return np.exp(-poisson*expr**(-1/shape)) \ No newline at end of file diff --git a/bluemath_tk/distributions/utils/pot_utils.py b/bluemath_tk/distributions/utils/pot_utils.py index a8c75a0..6072dc7 100644 --- a/bluemath_tk/distributions/utils/pot_utils.py +++ b/bluemath_tk/distributions/utils/pot_utils.py @@ -3,6 +3,8 @@ import matplotlib.pyplot as plt from sklearn.metrics import r2_score from csaps import csaps +from bluemath_tk.distributions.gpd import GPD +from bluemath_tk.distributions.pareto_poisson import GPDPoiss def threshold_search( @@ -260,4 +262,33 @@ def RWLSfit(u, e, w): # Internally studentized residual rN = (np.sqrt(np.diag(W)) * r) / (sigres * np.sqrt(1 - np.diag(W) * np.diag(P))) - return beta, fobj, r, rN \ No newline at end of file + return beta, fobj, r, rN + + +def gpdpoiss_ci_rp_bootstrap(pot_data: np.ndarray, years: np.ndarray, threshold:float, poisson:float, B: int=1000, conf_level:float=0.95): + """ + Compute the Confidence intervals for return periods of GPD-Poisson based on Bootstrap method. + + Parameters + ---------- + B : int, default=1000 + Number of bootstrap samples. + """ + probs_ci = 1 - 1 / years # Convert to exceedance probabilities + + # Generate all bootstrap samples at once + boot_samples = np.random.choice(pot_data, size=(B, pot_data.size), replace=True) + + # Vectorized parameter fitting + boot_params = np.zeros((B, 3)) + for i in range(B): + fit_result = GPD.fit(boot_samples[i], f0=threshold) + boot_params[i,:] = fit_result.params + + # Vectorized return period computation + return_periods = np.array([GPDPoiss.qf(probs_ci, threshold, params[1], params[2], poisson) for params in boot_params]) + + lower_ci_rp = np.quantile(return_periods, (1 - conf_level) / 2, axis=0) + upper_ci_rp = np.quantile(return_periods, 1 - (1 - conf_level) / 2, axis=0) + + return lower_ci_rp, upper_ci_rp \ No newline at end of file From e96bd0e0b23352422ce837cb03474e5f8ce1f626 Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 21 Oct 2025 10:17:48 +0200 Subject: [PATCH 13/14] [VCF] Stable Extreme Correction class and basic plots --- .../distributions/extreme_correction.py | 75 ++++++++++++++++--- .../distributions/utils/extr_corr_utils.py | 59 +++++++++++++++ bluemath_tk/distributions/utils/pot_utils.py | 35 +-------- 3 files changed, 124 insertions(+), 45 deletions(-) create mode 100644 bluemath_tk/distributions/utils/extr_corr_utils.py diff --git a/bluemath_tk/distributions/extreme_correction.py b/bluemath_tk/distributions/extreme_correction.py index 6b70a12..5f3f630 100644 --- a/bluemath_tk/distributions/extreme_correction.py +++ b/bluemath_tk/distributions/extreme_correction.py @@ -10,7 +10,7 @@ from ..distributions.gpd import GPD from ..distributions.pareto_poisson import GPDPoiss from ..distributions.pot import OptimalThreshold -from ..distributions.utils.pot_utils import gpdpoiss_ci_rp_bootstrap +from ..distributions.utils.extr_corr_utils import gpdpoiss_ci_rp_bootstrap, gev_ci_rp_bootstrap class ExtremeCorrection(BlueMathModel): @@ -371,7 +371,7 @@ def fit_transform( sim_pit_data_corrected : xr.Dataset Point-in-time corrected data """ - self.fit(data_hist=data_hist, bmus=bmus, plot_diagnostic=plot_diagnostic) + self.fit(data_hist=data_hist, plot_diagnostic=plot_diagnostic) return self.transform(data_sim=data_sim, prob=prob, random_state=random_state) @@ -483,9 +483,19 @@ def plot(self): TODO: Plots a añadir: - Añadir Return Period Plot de la serie simulada """ + figs = [] + axes = [] + fig1, ax1 = self.hist_retper_plot() + figs.append(fig1) + axes.append(ax1) - return fig1, ax1 + fig2, ax2 = self.sim_retper_plot() + + figs.append(fig2) + axes.append(ax2) + + return figs, axes def hist_retper_plot(self): """ @@ -503,29 +513,36 @@ def hist_retper_plot(self): self.T_annmax = 1 / (1 - ecdf_annmax_probs_hist) # Fitted Return Periods - self.T_years = np.array([1.001, 1.01, 1.1, 1.2, 1.4, 1.6, 2, 2.5, 3, 3.5, 4, 4.5, 5, 7.5, 10, 12.5, 15, 17.5, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 150, 200, 500, 1000]) + self.T_years = np.array([1.001, 1.01, 1.1, 1.2, 1.4, 1.6, 2, 2.5, 3, 3.5, 4, 4.5, 5, 7.5, 10, 12.5, 15, 17.5, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 150, 200, 500, 1000, 5000, 10000]) if self.method == "pot": - ret_levels = GPDPoiss.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2], self.poiss_parameter) - lower_ci_rp, upper_ci_rp = gpdpoiss_ci_rp_bootstrap( + self.ret_levels = GPDPoiss.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2], self.poiss_parameter) + self.lower_ci_rp, self.upper_ci_rp = gpdpoiss_ci_rp_bootstrap( pot_data=self.pot_data, years=self.T_years, threshold=self.threshold, poisson=self.poiss_parameter, B=1000, conf_level=0.95) + + self.dist = "GPD-Poisson" else: - ret_levels = GEV.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2], self.poiss_parameter) - upper_ci_rp = 1 - lower_ci_rp = 1 + self.ret_levels = GEV.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2]) + self.lower_ci_rp, self.upper_ci_rp = gev_ci_rp_bootstrap( + am_data=self.am_data, + years=self.T_years, + B=1000, + conf_level=0.95) + + self.dist = "GEV" fig = plt.figure(figsize=(8,5)) ax = fig.add_subplot(111) # Fitted distribution - ax.semilogx(self.T_years, ret_levels, color = 'red',linestyle='dashed', linewidth=2.5, label='Fitted GPD-Poisson') + ax.semilogx(self.T_years, self.ret_levels, color = 'red',linestyle='dashed', linewidth=2.5, label=f'Fitted {self.dist}') # Confidence interval for fitted Distribution - ax.semilogx(self.T_years, upper_ci_rp, color = "tab:gray",linestyle='dotted', label=f'{self.conf} Conf. Band') - ax.semilogx(self.T_years, lower_ci_rp, color = "tab:gray",linestyle='dotted') + ax.semilogx(self.T_years, self.upper_ci_rp, color = "tab:gray",linestyle='dotted', label=f'{self.conf} Conf. Band') + ax.semilogx(self.T_years, self.lower_ci_rp, color = "tab:gray",linestyle='dotted') # Historical AM values ax.semilogx(self.T_annmax, np.sort(self.am_data), color="tab:blue", linewidth=0, marker='o',markersize=5, label='Historical Annual Maxima') @@ -541,6 +558,40 @@ def hist_retper_plot(self): ax.grid() return fig, ax + + def sim_retper_plot(self): + + ecdf_annmax_probs_sim = np.arange(1, self.n_year_sim + 1) / (self.n_year_sim + 1) + self.T_annmax_sim = 1 / (1 - ecdf_annmax_probs_sim) + + fig = plt.figure(figsize=(8,5)) + ax = fig.add_subplot(111) + + # Fitted distribution + ax.semilogx(self.T_years, self.ret_levels, color = 'red',linestyle='dashed', linewidth=2.5, label=f'Fitted {self.dist}') + # Confidence interval for fitted Distribution + ax.semilogx(self.T_years, self.upper_ci_rp, color = "tab:gray",linestyle='dotted', label=f'{self.conf} Conf. Band') + ax.semilogx(self.T_years, self.lower_ci_rp, color = "tab:gray",linestyle='dotted') + + # Corrected Sampled AM values + ax.semilogx(self.T_annmax_sim, np.sort(self.sim_am_data_corr), color="tab:red", linewidth=0, marker='D',markersize=5, alpha=0.8, label='Corrected Sampled Annual Maxima') + # Corrected Sampled AM values + ax.semilogx(self.T_annmax_sim, np.sort(self.sim_am_data), color="tab:red", linewidth=0, marker='o',markersize=5, alpha=0.8, label='Sampled Annual Maxima') + + # Historical AM values + ax.semilogx(self.T_annmax, np.sort(self.am_data), color="tab:blue", linewidth=0, marker='o',markersize=5, alpha=0.8, label='Historical Annual Maxima') + + ax.set_xlabel("Return Periods (Years)") + ax.set_ylabel(f"{self.var}") + ax.set_xscale('log') + ax.set_xticks([1, 2, 5, 10, 20, 50, 100, 250, 1000, 10000]) + ax.get_xaxis().set_major_formatter(plt.ScalarFormatter()) + ax.set_xlim(left=0.9, right=self.n_year_sim + 100) + ax.set_ylim(bottom=0) + ax.legend(loc='best') + ax.grid() + + return fig, ax def correlations(self) -> dict: diff --git a/bluemath_tk/distributions/utils/extr_corr_utils.py b/bluemath_tk/distributions/utils/extr_corr_utils.py new file mode 100644 index 0000000..1c13139 --- /dev/null +++ b/bluemath_tk/distributions/utils/extr_corr_utils.py @@ -0,0 +1,59 @@ +import numpy as np + +from ..pareto_poisson import GPDPoiss +from ..gpd import GPD +from ..gev import GEV + +def gpdpoiss_ci_rp_bootstrap(pot_data: np.ndarray, years: np.ndarray, threshold:float, poisson:float, B: int=1000, conf_level:float=0.95): + """ + Compute the Confidence intervals for return periods of GPD-Poisson based on Bootstrap method. + + Parameters + ---------- + B : int, default=1000 + Number of bootstrap samples. + """ + probs_ci = 1 - 1 / years # Convert to exceedance probabilities + + # Generate all bootstrap samples at once + boot_samples = np.random.choice(pot_data, size=(B, pot_data.size), replace=True) + + # Vectorized parameter fitting + boot_params = np.zeros((B, 3)) + for i in range(B): + fit_result = GPD.fit(boot_samples[i], f0=threshold) + boot_params[i,:] = fit_result.params + + return_periods = np.array([GPDPoiss.qf(probs_ci, threshold, params[1], params[2], poisson) for params in boot_params]) + + lower_ci_rp = np.quantile(return_periods, (1 - conf_level) / 2, axis=0) + upper_ci_rp = np.quantile(return_periods, 1 - (1 - conf_level) / 2, axis=0) + + return lower_ci_rp, upper_ci_rp + +def gev_ci_rp_bootstrap(am_data: np.ndarray, years: np.ndarray, B: int=1000, conf_level:float=0.95): + """ + Compute the Confidence intervals for return periods of GEV based on Bootstrap method. + + Parameters + ---------- + B : int, default=1000 + Number of bootstrap samples. + """ + probs_ci = 1 - 1 / years # Convert to exceedance probabilities + + # Generate all bootstrap samples at once + boot_samples = np.random.choice(am_data, size=(B, am_data.size), replace=True) + + # Vectorized parameter fitting + boot_params = np.zeros((B, 3)) + for i in range(B): + fit_result = GEV.fit(boot_samples[i]) + boot_params[i,:] = fit_result.params + + return_periods = np.array([GEV.qf(probs_ci, params[0], params[1], params[2]) for params in boot_params]) + + lower_ci_rp = np.quantile(return_periods, (1 - conf_level) / 2, axis=0) + upper_ci_rp = np.quantile(return_periods, 1 - (1 - conf_level) / 2, axis=0) + + return lower_ci_rp, upper_ci_rp \ No newline at end of file diff --git a/bluemath_tk/distributions/utils/pot_utils.py b/bluemath_tk/distributions/utils/pot_utils.py index 6072dc7..8cdb306 100644 --- a/bluemath_tk/distributions/utils/pot_utils.py +++ b/bluemath_tk/distributions/utils/pot_utils.py @@ -3,8 +3,6 @@ import matplotlib.pyplot as plt from sklearn.metrics import r2_score from csaps import csaps -from bluemath_tk.distributions.gpd import GPD -from bluemath_tk.distributions.pareto_poisson import GPDPoiss def threshold_search( @@ -257,38 +255,9 @@ def RWLSfit(u, e, w): # Hat or projection matrix P = X @ np.linalg.inv(X.T @ W @ X) @ X.T # Sensitivity matrix S = I - P * W - S = np.eye(n) - P @ W + # S = np.eye(n) - P @ W # Internally studentized residual rN = (np.sqrt(np.diag(W)) * r) / (sigres * np.sqrt(1 - np.diag(W) * np.diag(P))) - return beta, fobj, r, rN - - -def gpdpoiss_ci_rp_bootstrap(pot_data: np.ndarray, years: np.ndarray, threshold:float, poisson:float, B: int=1000, conf_level:float=0.95): - """ - Compute the Confidence intervals for return periods of GPD-Poisson based on Bootstrap method. - - Parameters - ---------- - B : int, default=1000 - Number of bootstrap samples. - """ - probs_ci = 1 - 1 / years # Convert to exceedance probabilities - - # Generate all bootstrap samples at once - boot_samples = np.random.choice(pot_data, size=(B, pot_data.size), replace=True) - - # Vectorized parameter fitting - boot_params = np.zeros((B, 3)) - for i in range(B): - fit_result = GPD.fit(boot_samples[i], f0=threshold) - boot_params[i,:] = fit_result.params - - # Vectorized return period computation - return_periods = np.array([GPDPoiss.qf(probs_ci, threshold, params[1], params[2], poisson) for params in boot_params]) - - lower_ci_rp = np.quantile(return_periods, (1 - conf_level) / 2, axis=0) - upper_ci_rp = np.quantile(return_periods, 1 - (1 - conf_level) / 2, axis=0) - - return lower_ci_rp, upper_ci_rp \ No newline at end of file + return beta, fobj, r, rN \ No newline at end of file From 9264d41ac8cadbc2e918a965707850a7c87e578a Mon Sep 17 00:00:00 2001 From: Colladito Date: Tue, 21 Oct 2025 10:48:31 +0200 Subject: [PATCH 14/14] [VCF] Ruff format and add preprocess output --- .../distributions/extreme_correction.py | 238 ++++++++++++++---- bluemath_tk/distributions/pareto_poisson.py | 47 ++-- bluemath_tk/distributions/pot.py | 2 +- .../distributions/utils/extr_corr_utils.py | 44 +++- 4 files changed, 255 insertions(+), 76 deletions(-) diff --git a/bluemath_tk/distributions/extreme_correction.py b/bluemath_tk/distributions/extreme_correction.py index 5f3f630..b6fcbfb 100644 --- a/bluemath_tk/distributions/extreme_correction.py +++ b/bluemath_tk/distributions/extreme_correction.py @@ -1,16 +1,19 @@ import os +import matplotlib.pyplot as plt import numpy as np import scipy.stats as stats import xarray as xr -import matplotlib.pyplot as plt from ..core.io import BlueMathModel from ..distributions.gev import GEV from ..distributions.gpd import GPD from ..distributions.pareto_poisson import GPDPoiss from ..distributions.pot import OptimalThreshold -from ..distributions.utils.extr_corr_utils import gpdpoiss_ci_rp_bootstrap, gev_ci_rp_bootstrap +from ..distributions.utils.extr_corr_utils import ( + gev_ci_rp_bootstrap, + gpdpoiss_ci_rp_bootstrap, +) class ExtremeCorrection(BlueMathModel): @@ -333,7 +336,9 @@ def transform( ), # y-coords of data points ) - return self.sim_pit_data_corrected + output = self._preprocess_output(data=data_sim) + + return output def fit_transform( self, @@ -425,6 +430,29 @@ def _preprocess_data( am_data = data.get(f"{var}").groupby("time.year").max().values.T return pit_data, am_data + + def _preprocess_output(self, data: xr.Dataset) -> xr.Dataset: + """ + Preprocess the output dataset + + Parameters + ---------- + data : xr.Dataset + Data to add the corrected variable + + Returns + ------- + data : xr.Dataset + Data with added the corrected variable + """ + n_sim = data.get("n_sim").values.shape[0] + n_time = data.get("time").values.shape[0] + sim_pit_data_corrected_reshaped = self.sim_pit_data_corrected.reshape(n_sim, n_time) + + data[f"{self.var}_corr"] = (data[f"{self.var}"].dims, sim_pit_data_corrected_reshaped) + + return data + def test(self) -> dict: """ @@ -476,16 +504,13 @@ def test(self) -> dict: ) return {"Statistic": res_test.statistic, "P-value": res_test.pvalue} - def plot(self): + def plot(self) -> tuple[list[plt.Figure], list[plt.Axes]]: """ - Plot - - TODO: Plots a añadir: - - Añadir Return Period Plot de la serie simulada + Plot return periods """ figs = [] axes = [] - + fig1, ax1 = self.hist_retper_plot() figs.append(fig1) axes.append(ax1) @@ -497,103 +522,228 @@ def plot(self): return figs, axes - def hist_retper_plot(self): + def hist_retper_plot(self) -> tuple[plt.Figure, plt.Axes]: """ Historical Return Period plot Returns ------- fig - Figure - ax - Axes + plt.Figure + ax + plt.Axes """ ecdf_annmax_probs_hist = np.arange(1, self.n_year + 1) / (self.n_year + 1) self.T_annmax = 1 / (1 - ecdf_annmax_probs_hist) # Fitted Return Periods - self.T_years = np.array([1.001, 1.01, 1.1, 1.2, 1.4, 1.6, 2, 2.5, 3, 3.5, 4, 4.5, 5, 7.5, 10, 12.5, 15, 17.5, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90, 100, 150, 200, 500, 1000, 5000, 10000]) + self.T_years = np.array( + [ + 1.001, + 1.01, + 1.1, + 1.2, + 1.4, + 1.6, + 2, + 2.5, + 3, + 3.5, + 4, + 4.5, + 5, + 7.5, + 10, + 12.5, + 15, + 17.5, + 20, + 25, + 30, + 35, + 40, + 45, + 50, + 60, + 70, + 80, + 90, + 100, + 150, + 200, + 500, + 1000, + 5000, + 10000, + ] + ) if self.method == "pot": - self.ret_levels = GPDPoiss.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2], self.poiss_parameter) + self.ret_levels = GPDPoiss.qf( + 1 - 1 / self.T_years, + self.parameters[0], + self.parameters[1], + self.parameters[2], + self.poiss_parameter, + ) self.lower_ci_rp, self.upper_ci_rp = gpdpoiss_ci_rp_bootstrap( pot_data=self.pot_data, years=self.T_years, threshold=self.threshold, poisson=self.poiss_parameter, B=1000, - conf_level=0.95) - + conf_level=0.95, + ) + self.dist = "GPD-Poisson" else: - self.ret_levels = GEV.qf(1 - 1 / self.T_years, self.parameters[0], self.parameters[1], self.parameters[2]) + self.ret_levels = GEV.qf( + 1 - 1 / self.T_years, + self.parameters[0], + self.parameters[1], + self.parameters[2], + ) self.lower_ci_rp, self.upper_ci_rp = gev_ci_rp_bootstrap( - am_data=self.am_data, - years=self.T_years, - B=1000, - conf_level=0.95) - + am_data=self.am_data, years=self.T_years, B=1000, conf_level=0.95 + ) + self.dist = "GEV" - fig = plt.figure(figsize=(8,5)) + fig = plt.figure(figsize=(8, 5)) ax = fig.add_subplot(111) # Fitted distribution - ax.semilogx(self.T_years, self.ret_levels, color = 'red',linestyle='dashed', linewidth=2.5, label=f'Fitted {self.dist}') + ax.semilogx( + self.T_years, + self.ret_levels, + color="red", + linestyle="dashed", + linewidth=2.5, + label=f"Fitted {self.dist}", + ) # Confidence interval for fitted Distribution - ax.semilogx(self.T_years, self.upper_ci_rp, color = "tab:gray",linestyle='dotted', label=f'{self.conf} Conf. Band') - ax.semilogx(self.T_years, self.lower_ci_rp, color = "tab:gray",linestyle='dotted') + ax.semilogx( + self.T_years, + self.upper_ci_rp, + color="tab:gray", + linestyle="dotted", + label=f"{self.conf} Conf. Band", + ) + ax.semilogx( + self.T_years, self.lower_ci_rp, color="tab:gray", linestyle="dotted" + ) # Historical AM values - ax.semilogx(self.T_annmax, np.sort(self.am_data), color="tab:blue", linewidth=0, marker='o',markersize=5, label='Historical Annual Maxima') + ax.semilogx( + self.T_annmax, + np.sort(self.am_data), + color="tab:blue", + linewidth=0, + marker="o", + markersize=5, + label="Historical Annual Maxima", + ) ax.set_xlabel("Return Periods (Years)") ax.set_ylabel(f"{self.var}") - ax.set_xscale('log') + ax.set_xscale("log") ax.set_xticks([1, 2, 5, 10, 20, 50, 100, 250, 1000, 10000]) ax.get_xaxis().set_major_formatter(plt.ScalarFormatter()) ax.set_xlim(left=0.9, right=self.n_year + 100) ax.set_ylim(bottom=0) - ax.legend(loc='best') + ax.legend(loc="best") ax.grid() return fig, ax - - def sim_retper_plot(self): - ecdf_annmax_probs_sim = np.arange(1, self.n_year_sim + 1) / (self.n_year_sim + 1) + def sim_retper_plot(self) -> tuple[plt.Figure, plt.Axes]: + """ + Corrected Sampled and Sampled Return Period plot + + Returns + ------- + fig + plt.Figure + ax + plt.Axes + """ + + ecdf_annmax_probs_sim = np.arange(1, self.n_year_sim + 1) / ( + self.n_year_sim + 1 + ) self.T_annmax_sim = 1 / (1 - ecdf_annmax_probs_sim) - fig = plt.figure(figsize=(8,5)) + fig = plt.figure(figsize=(8, 5)) ax = fig.add_subplot(111) # Fitted distribution - ax.semilogx(self.T_years, self.ret_levels, color = 'red',linestyle='dashed', linewidth=2.5, label=f'Fitted {self.dist}') + ax.semilogx( + self.T_years, + self.ret_levels, + color="red", + linestyle="dashed", + linewidth=2.5, + label=f"Fitted {self.dist}", + ) # Confidence interval for fitted Distribution - ax.semilogx(self.T_years, self.upper_ci_rp, color = "tab:gray",linestyle='dotted', label=f'{self.conf} Conf. Band') - ax.semilogx(self.T_years, self.lower_ci_rp, color = "tab:gray",linestyle='dotted') + ax.semilogx( + self.T_years, + self.upper_ci_rp, + color="tab:gray", + linestyle="dotted", + label=f"{self.conf} Conf. Band", + ) + ax.semilogx( + self.T_years, self.lower_ci_rp, color="tab:gray", linestyle="dotted" + ) # Corrected Sampled AM values - ax.semilogx(self.T_annmax_sim, np.sort(self.sim_am_data_corr), color="tab:red", linewidth=0, marker='D',markersize=5, alpha=0.8, label='Corrected Sampled Annual Maxima') + ax.semilogx( + self.T_annmax_sim, + np.sort(self.sim_am_data_corr), + color="tab:red", + linewidth=0, + marker="D", + markersize=5, + alpha=0.8, + label="Corrected Sampled Annual Maxima", + ) # Corrected Sampled AM values - ax.semilogx(self.T_annmax_sim, np.sort(self.sim_am_data), color="tab:red", linewidth=0, marker='o',markersize=5, alpha=0.8, label='Sampled Annual Maxima') - + ax.semilogx( + self.T_annmax_sim, + np.sort(self.sim_am_data), + color="tab:red", + linewidth=0, + marker="o", + markersize=5, + alpha=0.8, + label="Sampled Annual Maxima", + ) + # Historical AM values - ax.semilogx(self.T_annmax, np.sort(self.am_data), color="tab:blue", linewidth=0, marker='o',markersize=5, alpha=0.8, label='Historical Annual Maxima') + ax.semilogx( + self.T_annmax, + np.sort(self.am_data), + color="tab:blue", + linewidth=0, + marker="o", + markersize=5, + alpha=0.8, + label="Historical Annual Maxima", + ) ax.set_xlabel("Return Periods (Years)") ax.set_ylabel(f"{self.var}") - ax.set_xscale('log') + ax.set_xscale("log") ax.set_xticks([1, 2, 5, 10, 20, 50, 100, 250, 1000, 10000]) ax.get_xaxis().set_major_formatter(plt.ScalarFormatter()) ax.set_xlim(left=0.9, right=self.n_year_sim + 100) ax.set_ylim(bottom=0) - ax.legend(loc='best') + ax.legend(loc="best") ax.grid() return fig, ax - def correlations(self) -> dict: """ Rank based correlations between sampled and corrected sampled data diff --git a/bluemath_tk/distributions/pareto_poisson.py b/bluemath_tk/distributions/pareto_poisson.py index 3f99959..1a60f8a 100644 --- a/bluemath_tk/distributions/pareto_poisson.py +++ b/bluemath_tk/distributions/pareto_poisson.py @@ -2,13 +2,14 @@ from ._base_distributions import BaseDistribution + class GPDPoiss(BaseDistribution): def __init__(self) -> None: """ Initialize the GPD-Poisson mode """ super().__init__() - + @property def name() -> str: return "GPD-Poisson model" @@ -19,17 +20,21 @@ def nparams() -> int: Number of parameters of GPD-Poisson model """ return int(4) - + @property def param_names() -> list[str]: """ Name of parameters of GPD-Poisson """ return ["threshold", "scale", "shape", "poisson"] - + @staticmethod def pdf( - x: np.ndarray, threshold: float = 0.0, scale: float = 1.0, shape: float = 0.0, poisson: float = 1.0 + x: np.ndarray, + threshold: float = 0.0, + scale: float = 1.0, + shape: float = 0.0, + poisson: float = 1.0, ) -> np.ndarray: """ Probability density function @@ -62,11 +67,10 @@ def pdf( if scale <= 0: raise ValueError("Scale parameter must be > 0") - + if poisson <= 0: raise ValueError("Poisson parameter must be > 0") - y = np.maximum(x - threshold, 0) / scale # # Gumbel case (shape = 0) @@ -86,7 +90,11 @@ def pdf( @staticmethod def qf( - p: np.ndarray, threshold: float = 0.0, scale: float = 1.0, shape: float = 0.0, poisson: float = 1.0 + p: np.ndarray, + threshold: float = 0.0, + scale: float = 1.0, + shape: float = 0.0, + poisson: float = 1.0, ) -> np.ndarray: """ Quantile function (Inverse of Cumulative Distribution Function) @@ -122,28 +130,31 @@ def qf( if scale <= 0: raise ValueError("Scale parameter must be > 0") - + if poisson <= 0: raise ValueError("Poisson parameter must be > 0") # Gumbel case (shape = 0) if shape == 0.0: - q = threshold - scale * np.log(-np.log(p)/poisson) + q = threshold - scale * np.log(-np.log(p) / poisson) # General case (Weibull and Frechet, shape != 0) else: - q = threshold - (1-(-np.log(p)/poisson)**(-shape))*scale/shape + q = threshold - (1 - (-np.log(p) / poisson) ** (-shape)) * scale / shape return q - + def cdf( - x: np.ndarray, threshold: float = 0.0, scale: float = 1.0, shape: float = 0.0, poisson: float = 1.0 + x: np.ndarray, + threshold: float = 0.0, + scale: float = 1.0, + shape: float = 0.0, + poisson: float = 1.0, ) -> np.ndarray: - if shape == 0.0: - expr = (np.maximum(x-threshold, 0))/scale - return np.exp(-poisson*np.exp(-expr)) - + expr = (np.maximum(x - threshold, 0)) / scale + return np.exp(-poisson * np.exp(-expr)) + else: - expr = 1+shape*(np.maximum(x-threshold, 0)/scale) - return np.exp(-poisson*expr**(-1/shape)) \ No newline at end of file + expr = 1 + shape * (np.maximum(x - threshold, 0) / scale) + return np.exp(-poisson * expr ** (-1 / shape)) diff --git a/bluemath_tk/distributions/pot.py b/bluemath_tk/distributions/pot.py index 933f103..6f98058 100644 --- a/bluemath_tk/distributions/pot.py +++ b/bluemath_tk/distributions/pot.py @@ -444,7 +444,7 @@ def potplot( figsize: tuple = (8, 5), ): """ - Auxiliar function which call generic potplot to plot the POT usign the optimal threshold obtained + Auxiliar function which call generic potplot to plot the POT usign the optimal threshold obtained Parameters ---------- diff --git a/bluemath_tk/distributions/utils/extr_corr_utils.py b/bluemath_tk/distributions/utils/extr_corr_utils.py index 1c13139..cb6014f 100644 --- a/bluemath_tk/distributions/utils/extr_corr_utils.py +++ b/bluemath_tk/distributions/utils/extr_corr_utils.py @@ -1,10 +1,18 @@ import numpy as np -from ..pareto_poisson import GPDPoiss -from ..gpd import GPD from ..gev import GEV +from ..gpd import GPD +from ..pareto_poisson import GPDPoiss + -def gpdpoiss_ci_rp_bootstrap(pot_data: np.ndarray, years: np.ndarray, threshold:float, poisson:float, B: int=1000, conf_level:float=0.95): +def gpdpoiss_ci_rp_bootstrap( + pot_data: np.ndarray, + years: np.ndarray, + threshold: float, + poisson: float, + B: int = 1000, + conf_level: float = 0.95, +): """ Compute the Confidence intervals for return periods of GPD-Poisson based on Bootstrap method. @@ -13,25 +21,33 @@ def gpdpoiss_ci_rp_bootstrap(pot_data: np.ndarray, years: np.ndarray, threshold: B : int, default=1000 Number of bootstrap samples. """ - probs_ci = 1 - 1 / years # Convert to exceedance probabilities + probs_ci = 1 - 1 / years # Convert to exceedance probabilities # Generate all bootstrap samples at once boot_samples = np.random.choice(pot_data, size=(B, pot_data.size), replace=True) - + # Vectorized parameter fitting boot_params = np.zeros((B, 3)) for i in range(B): fit_result = GPD.fit(boot_samples[i], f0=threshold) - boot_params[i,:] = fit_result.params + boot_params[i, :] = fit_result.params - return_periods = np.array([GPDPoiss.qf(probs_ci, threshold, params[1], params[2], poisson) for params in boot_params]) + return_periods = np.array( + [ + GPDPoiss.qf(probs_ci, threshold, params[1], params[2], poisson) + for params in boot_params + ] + ) lower_ci_rp = np.quantile(return_periods, (1 - conf_level) / 2, axis=0) upper_ci_rp = np.quantile(return_periods, 1 - (1 - conf_level) / 2, axis=0) return lower_ci_rp, upper_ci_rp -def gev_ci_rp_bootstrap(am_data: np.ndarray, years: np.ndarray, B: int=1000, conf_level:float=0.95): + +def gev_ci_rp_bootstrap( + am_data: np.ndarray, years: np.ndarray, B: int = 1000, conf_level: float = 0.95 +): """ Compute the Confidence intervals for return periods of GEV based on Bootstrap method. @@ -40,20 +56,22 @@ def gev_ci_rp_bootstrap(am_data: np.ndarray, years: np.ndarray, B: int=1000, con B : int, default=1000 Number of bootstrap samples. """ - probs_ci = 1 - 1 / years # Convert to exceedance probabilities + probs_ci = 1 - 1 / years # Convert to exceedance probabilities # Generate all bootstrap samples at once boot_samples = np.random.choice(am_data, size=(B, am_data.size), replace=True) - + # Vectorized parameter fitting boot_params = np.zeros((B, 3)) for i in range(B): fit_result = GEV.fit(boot_samples[i]) - boot_params[i,:] = fit_result.params + boot_params[i, :] = fit_result.params - return_periods = np.array([GEV.qf(probs_ci, params[0], params[1], params[2]) for params in boot_params]) + return_periods = np.array( + [GEV.qf(probs_ci, params[0], params[1], params[2]) for params in boot_params] + ) lower_ci_rp = np.quantile(return_periods, (1 - conf_level) / 2, axis=0) upper_ci_rp = np.quantile(return_periods, 1 - (1 - conf_level) / 2, axis=0) - return lower_ci_rp, upper_ci_rp \ No newline at end of file + return lower_ci_rp, upper_ci_rp