Skip to content
Merged
158 changes: 126 additions & 32 deletions bluemath_tk/topo_bathy/swan_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,141 @@
import xarray as xr


def generate_grid_parameters(bathy_data: xr.DataArray) -> dict:
def generate_grid_parameters(
bathy_data: xr.DataArray,
alpc: float = 0,
xpc: float = None,
ypc: float = None,
xlenc: float = None,
ylenc: float = None,
buffer_distance: float = None,
) -> dict:
"""
Generate the grid parameters for the SWAN model.
Generate grid parameters for the SWAN model based on bathymetry.

Parameters
----------
bathy_data : xr.DataArray
Bathymetry data.
Bathymetry data.
Must have the following dimensions:
- lon: longitude
- lat: latitude
- lon/x: longitude or x coordinate
- lat/y: latitude or y coordinate
alpc: float
Computational Grid Rotation angle in degrees.
xpc: float
X origin.
ypc: float
Y origin.

Returns
-------
dict
Grid parameters for the SWAN model.

Contact @bellidog on GitHub for more information.
Dictionary with grid configuration for SWAN input.
"""

return {
"xpc": int(np.nanmin(bathy_data.lon)), # x origin
"ypc": int(np.nanmin(bathy_data.lat)), # y origin
"alpc": 0, # x-axis direction
"xlenc": int(
np.nanmax(bathy_data.lon) - np.nanmin(bathy_data.lon)
), # grid length in x
"ylenc": int(
np.nanmax(bathy_data.lat) - np.nanmin(bathy_data.lat)
), # grid length in y
"mxc": len(bathy_data.lon) - 1, # number mesh x, una menos pq si no SWAN peta
"myc": len(bathy_data.lat) - 1, # number mesh y, una menos pq si no SWAN peta
"xpinp": np.nanmin(bathy_data.lon), # x origin
"ypinp": np.nanmin(bathy_data.lat), # y origin
"alpinp": 0, # x-axis direction
"mxinp": len(bathy_data.lon) - 1, # number mesh x
"myinp": len(bathy_data.lat) - 1, # number mesh y
"dxinp": abs(
bathy_data.lon[1].values - bathy_data.lon[0].values
), # size mesh x (resolution in x)
"dyinp": abs(
bathy_data.lat[1].values - bathy_data.lat[0].values
), # size mesh y (resolution in y)
}
# Determine coordinate system based on coordinate names
coord_names = list(bathy_data.coords)

# Get coordinate variables
if any(name in ["lon", "longitude"] for name in coord_names):
x_coord = next(name for name in coord_names if name in ["lon", "longitude"])
y_coord = next(name for name in coord_names if name in ["lat", "latitude"])
else:
x_coord = next(
name for name in coord_names if name in ["x", "X", "cx", "easting"]
)
y_coord = next(
name for name in coord_names if name in ["y", "Y", "cy", "northing"]
)

# Get resolution from cropped data
grid_resolution_x = abs(
bathy_data[x_coord][1].values - bathy_data[x_coord][0].values
)
grid_resolution_y = abs(
bathy_data[y_coord][1].values - bathy_data[y_coord][0].values
)

if alpc != 0:
angle_rad = np.radians(alpc)
# Create rotation matrix
R = np.array(
[
[np.cos(angle_rad), -np.sin(angle_rad)],
[np.sin(angle_rad), np.cos(angle_rad)],
]
)

# Create unrotated rectangle corners
dx = np.array([0, xlenc, xlenc, 0, 0])
dy = np.array([0, 0, ylenc, ylenc, 0])
points = np.column_stack([dx, dy])

# Rotate points
rotated = np.dot(points, R.T)

# Translate to corner position
x = rotated[:, 0] + xpc
y = rotated[:, 1] + ypc
corners = np.column_stack([x, y])

x_min = np.min(x) - buffer_distance
x_max = np.max(x) + buffer_distance
y_min = np.min(y) - buffer_distance
y_max = np.max(y) + buffer_distance

print(f"Cropping bounds: x=[{x_min}, {x_max}], y=[{y_min}, {y_max}]")

# Crop bathymetry
cropped = bathy_data.sel(
{
x_coord: slice(x_min, x_max),
y_coord: slice(y_max, y_min),
} # Note: slice from max to min for descending coordinates
)

grid_parameters = {
"xpc": xpc,
"ypc": ypc,
"alpc": alpc,
"xlenc": xlenc,
"ylenc": ylenc,
"mxc": int(np.round(xlenc / grid_resolution_x) - 1),
"myc": int(np.round(ylenc / grid_resolution_y) - 1),
"xpinp": np.nanmin(cropped[x_coord]), # x origin from cropped data
"ypinp": np.nanmin(cropped[y_coord]), # y origin from cropped data
"alpinp": 0, # x-axis direction
"mxinp": len(cropped[x_coord]) - 1, # number mesh x from cropped data
"myinp": len(cropped[y_coord]) - 1, # number mesh y from cropped data
"dxinp": grid_resolution_x, # resolution from cropped data
"dyinp": grid_resolution_y, # resolution from cropped data
}
return grid_parameters, cropped, corners

else:
# Compute parameters from full bathymetry
grid_parameters = {
"xpc": float(np.nanmin(bathy_data[x_coord])), # origin x
"ypc": float(np.nanmin(bathy_data[y_coord])), # origin y
"alpc": alpc, # x-axis direction
"xlenc": float(
np.nanmax(bathy_data[x_coord]) - np.nanmin(bathy_data[x_coord])
), # grid length x
"ylenc": float(
np.nanmax(bathy_data[y_coord]) - np.nanmin(bathy_data[y_coord])
), # grid length y
"mxc": len(bathy_data[x_coord]) - 1, # num mesh x
"myc": len(bathy_data[y_coord]) - 1, # num mesh y
"xpinp": float(np.nanmin(bathy_data[x_coord])), # origin x
"ypinp": float(np.nanmin(bathy_data[y_coord])), # origin y
"alpinp": 0, # x-axis direction
"mxinp": len(bathy_data[x_coord]) - 1, # num mesh x
"myinp": len(bathy_data[y_coord]) - 1, # num mesh y
"dxinp": float(
abs(bathy_data[x_coord][1].values - bathy_data[x_coord][0].values)
), # resolution x
"dyinp": float(
abs(bathy_data[y_coord][1].values - bathy_data[y_coord][0].values)
), # resolution y
}
return grid_parameters
68 changes: 54 additions & 14 deletions bluemath_tk/waves/binwaves.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,67 @@


def generate_swan_cases(
frequencies_array: np.ndarray,
directions_array: np.ndarray,
frequencies_array: np.ndarray = None,
directions_array: np.ndarray = None,
direction_range: tuple = (0, 360),
direction_divisions: int = 24,
direction_sector: tuple = None,
frequency_range: tuple = (0.035, 0.5),
frequency_divisions: int = 29,
gamma: float = 50,
spr: float = 2,
) -> xr.Dataset:
"""
Generate the SWAN cases monocromatic wave parameters.

Parameters
----------
directions_array : np.ndarray
The directions array.
frequencies_array : np.ndarray
The frequencies array.
frequencies_array : np.ndarray, optional
The frequencies array. If None, it is generated using frequency_range and frequency_divisions.
directions_array : np.ndarray, optional
The directions array. If None, it is generated using direction_range and direction_divisions.
direction_range : tuple
(min, max) range for directions in degrees.
direction_divisions : int
Number of directional divisions.
frequency_range : tuple
(min, max) range for frequencies in Hz.
frequency_divisions : int
Number of frequency divisions.

Returns
-------
xr.Dataset
The SWAN monocromatic cases Dataset with coordinates freq and dir.
"""

# Wave parameters
gamma = 50 # waves gamma
spr = 2 # waves directional spread
# Auto-generate directions if not provided
if directions_array is None:
step = (direction_range[1] - direction_range[0]) / direction_divisions
directions_array = np.arange(
direction_range[0] + step / 2, direction_range[1], step
)

if direction_sector is not None:
start, end = direction_sector
if start < end:
directions_array = directions_array[
(directions_array >= start) & (directions_array <= end)
]
else: # caso circular, ej. 270–90
directions_array = directions_array[
(directions_array >= start) | (directions_array <= end)
]

# Auto-generate frequencies if not provided
if frequencies_array is None:
frequencies_array = np.geomspace(
frequency_range[0], frequency_range[1], frequency_divisions
)

# Constants for SWAN
gamma = gamma # waves gamma
spr = spr # waves directional spread

# Initialize data arrays for each variable
hs = np.zeros((len(directions_array), len(frequencies_array)))
Expand All @@ -54,11 +93,11 @@ def generate_swan_cases(

# Create xarray Dataset
ds = xr.Dataset(
data_vars={
"hs": (["dir", "freq"], hs),
"tp": (["dir", "freq"], tp),
"spr": (["dir", "freq"], spr_arr),
"gamma": (["dir", "freq"], gamma_arr),
{
"hs": (("dir", "freq"), hs),
"tp": (("dir", "freq"), tp),
"spr": (("dir", "freq"), spr_arr),
"gamma": (("dir", "freq"), gamma_arr),
},
coords={
"dir": directions_array,
Expand Down Expand Up @@ -115,6 +154,7 @@ def process_kp_coefficients(
concatened_kp = output_kp_list[0]
for file in output_kp_list[1:]:
concatened_kp = xr.concat([concatened_kp, file], dim="case_num")

return concatened_kp.fillna(0.0).sortby("freq").sortby("dir")


Expand Down
44 changes: 33 additions & 11 deletions bluemath_tk/waves/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,11 @@ def _create_vec_direc(self, waves: np.ndarray, direcs: np.ndarray) -> np.ndarray
if direcs[i] < 0:
direcs[i] = direcs[i] + 360
if direcs[i] > 0 and waves[i] > 0:
bin_idx = int(direcs[i] / self.direction_bin_size)
# Handle direction = 360° case by mapping to the first bin (0-22.5°)
if direcs[i] >= 360:
bin_idx = 0
else:
bin_idx = int(direcs[i] / self.direction_bin_size)
data[i, bin_idx] = waves[i]

return data
Expand Down Expand Up @@ -558,6 +562,8 @@ def correct(
[
self.calibration_params["sea_correction"][
int(peak_direction / self.direction_bin_size)
if peak_direction < 360
else 0 # TODO: Check if this with Javi
]
for peak_direction in peak_directions.isel(
part=n_part
Expand All @@ -569,6 +575,8 @@ def correct(
[
self.calibration_params["swell_correction"][
int(peak_direction / self.direction_bin_size)
if peak_direction < 360
else 0 # TODO: Check if this with Javi
]
for peak_direction in peak_directions.isel(
part=n_part
Expand All @@ -595,6 +603,8 @@ def correct(
[
self.calibration_params["sea_correction"][
int(peak_direction / self.direction_bin_size)
if peak_direction < 360
else 0
]
for peak_direction in corrected_data["Dirsea"]
]
Expand All @@ -609,6 +619,8 @@ def correct(
[
self.calibration_params["swell_correction"][
int(peak_direction / self.direction_bin_size)
if peak_direction < 360
else 0
]
for peak_direction in corrected_data[f"Dirswell{n_part}"]
]
Expand Down Expand Up @@ -753,11 +765,16 @@ def plot_calibration_results(self) -> Tuple[Figure, list]:
)

# Plot sea wave climate
x, y, z = density_scatter(
self._data["Dirsea"].iloc[::10] * np.pi / 180,
self._data["Hsea"].iloc[::10],
)
ax5.scatter(x, y, c=z, s=3, cmap="jet")
sea_dirs = self._data["Dirsea"].iloc[::10] * np.pi / 180
sea_heights = self._data["Hsea"].iloc[::10]
# Filter out NaN and infinite values
valid_mask = np.isfinite(sea_dirs) & np.isfinite(sea_heights)
sea_dirs_valid = sea_dirs[valid_mask]
sea_heights_valid = sea_heights[valid_mask]

if len(sea_dirs_valid) > 0:
x, y, z = density_scatter(sea_dirs_valid, sea_heights_valid)
ax5.scatter(x, y, c=z, s=3, cmap="jet")
ax5.set_theta_zero_location("N", offset=0)
ax5.set_xticklabels(["N", "NE", "E", "SE", "S", "SW", "W", "NW"])
ax5.xaxis.grid(True, color="lavender", linestyle="-")
Expand All @@ -768,11 +785,16 @@ def plot_calibration_results(self) -> Tuple[Figure, list]:
ax5.set_title("SEA $Wave$ $Climate$", pad=35, fontweight="bold")

# Plot swell wave climate
x, y, z = density_scatter(
self._data["Dirswell1"].iloc[::10] * np.pi / 180,
self._data["Hswell1"].iloc[::10],
)
ax6.scatter(x, y, c=z, s=3, cmap="jet")
swell_dirs = self._data["Dirswell1"].iloc[::10] * np.pi / 180
swell_heights = self._data["Hswell1"].iloc[::10]
# Filter out NaN and infinite values
valid_mask = np.isfinite(swell_dirs) & np.isfinite(swell_heights)
swell_dirs_valid = swell_dirs[valid_mask]
swell_heights_valid = swell_heights[valid_mask]

if len(swell_dirs_valid) > 0:
x, y, z = density_scatter(swell_dirs_valid, swell_heights_valid)
ax6.scatter(x, y, c=z, s=3, cmap="jet")
ax6.set_theta_zero_location("N", offset=0)
ax6.set_xticklabels(["N", "NE", "E", "SE", "S", "SW", "W", "NW"])
ax6.xaxis.grid(True, color="lavender", linestyle="-")
Expand Down
Loading
Loading