Source code for windkit.wind

# (c) 2022 DTU Wind Energy
"""
Utility functions for working with wind data
"""
import numpy as np
import pandas as pd
import xarray as xr
from scipy.integrate import trapezoid

from windkit.sector import create_sector_coords, create_sector_coords_from_edges


[docs] def wind_speed(u, v): """ Calculate wind speed from wind vectors. Parameters ---------- u, v : numpy.ndarray, xarray.DataArray U and V wind vectors. Returns ------- ws : numpy.ndarray, xarray.DataArray Wind speed. """ return np.sqrt(u * u + v * v)
[docs] def wind_direction(u, v): """ Calculate wind directions from wind vectors. Parameters ---------- u, v : np.ndarray, xr.DataArray U and V wind vectors. Returns ------- wd : np.ndarray, xr.DataArray Wind direction """ return 180.0 + np.arctan2(u, v) * 180.0 / np.pi
[docs] def wind_speed_and_direction(u, v): """ Calculate wind speed and wind direction from wind vectors. Parameters ---------- u, v : numpy.ndarray, xarray.DataArray U and V wind vectors. Returns ------- speed : numpy.ndarray, xarray.DataArray Wind speed. direction : numpy.ndarray, xarray.DataArray Wind direction. """ return wind_speed(u, v), wind_direction(u, v)
[docs] def wind_vectors(ws, wd): """ Calculate wind vectors u,v from the speed and direction. Parameters ---------- speed : numpy.ndarray, xarray.DataArray Wind speed direction : numpy.ndarray, xarray.DataArray Wind direction Returns ------- u, v : numpy.ndarray, xarray.DataArray Wind vectors u and v """ return ( -np.abs(ws) * np.sin(np.pi / 180.0 * wd), -np.abs(ws) * np.cos(np.pi / 180.0 * wd), )
[docs] def wind_direction_difference(wd_obs, wd_mod): """ Calculate the circular (minimum) distance between two directions (observed and modelled). Parameters ---------- wd_obs : xarray.DataArray observed direction arrays. wd_mod: xarray.DataArray modelled direction arrays. Returns ------- xarray.DataArray: circular (minimum) differences. Examples -------- >>> wd_obs = xr.DataArray([15.0, 345.0, 355.0], dims=('time',)) >>> wd_mod = xr.DataArray([345.0, 300.0, 5.0], dims=('time',)) >>> wind_direction_difference(wd_obs, wd_mod) <xarray.DataArray (time: 3)> array([-30., -45., 10.]) Dimensions without coordinates: time """ wd_diff = wd_mod - wd_obs wd_diff = wd_diff.where(wd_diff < 180.0, wd_diff - 360.0) wd_diff = wd_diff.where(wd_diff > -180.0, wd_diff + 360.0) return wd_diff
[docs] def wd_to_sector(wd, bins=12, output_type="centers", quantiles=False): """ Convert wind directions to 0-based sector indices. Parameters ---------- wd : xarray.DataArray, numpy.array Wind directions. The function uses xarray.apply_ufunc, so the return value will keep the shape of the input value. bins : int Number of bins / sectors. Defaults to 12. output_type : str If set to 'centers' the values in 'wd' are the sector centers. If set to 'indices', the values in 'wd' are the sector indices. Defaults to 'centers'. quantiles : bool Allows to use equal probability sectors (quantiles=True) instead of fixed width sectors. Note that this is an experimental feature to be used only together with the `widkit.mcp` module for now. Other `windkit` modules may not be compatible with non fixed width sectors. Defaults to False. Returns ------- sectors : xarray.DataArray,np.array wind speed sector centers. sector_coords : xarray.DataArray data array with sector coordinates incling center, ceiling and floor. Examples -------- >>> wd = xr.DataArray([355.0, 14.0, 25.0, 270.0,], dims=('time',)) >>> wd_to_sector(wd) (<xarray.DataArray (time: 4)> array([ 0., 0., 30., 270.]) Dimensions without coordinates: time, <xarray.DataArray (sector: 12)> array([ 0., 30., 60., 90., 120., 150., 180., 210., 240., 270., 300., 330.]) Coordinates: * sector (sector) float64 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0 sector_ceil (sector) float64 15.0 45.0 75.0 105.0 ... 285.0 315.0 345.0 sector_floor (sector) float64 345.0 15.0 45.0 75.0 ... 255.0 285.0 315.0) """ def _wd_to_sector_constant(wd, bins=12): width = 360.0 / bins edges = np.linspace(0.0, 360.0, bins + 1) edges[0] = -0.1 edges[-1] = 360.1 sector = np.digitize(np.mod(wd + width / 2.0, 360.0), edges) - 1 sector = sector.astype(np.float64) sector[sector >= bins] = np.nan return sector def _wd_to_sector_quantiles(wd, bins=12): # TODO move this to xarray nor numpy so we can use apply_ufunc sector_da = wd.copy() sector_cat, edges = pd.qcut(wd.values.flatten(), bins, retbins=True) edges[0] = 0.0 edges[-1] = 360.0 sector_da.data = sector_cat.codes.reshape(wd.shape) sector_coords_da = create_sector_coords_from_edges(edges) return sector_da, sector_coords_da if output_type not in ["centers", "indices"]: raise ValueError("unkown output type. Possible values are 'centers','indices'") if not quantiles: sectors = xr.apply_ufunc(_wd_to_sector_constant, wd, kwargs={"bins": bins}) sector_coords = create_sector_coords(bins) centers = sectors * 360.0 / bins else: if ( type(wd) is not xr.DataArray or ("point" not in wd.dims) or (len(wd["point"]) > 1) ): raise ValueError( "For quantiles=True, only xarray.DataArray with point dimensions of length 1 are supported" ) sectors, sector_coords = _wd_to_sector_quantiles(wd, bins) centers_values = sector_coords.isel(sector=sectors.values.flatten()).values centers = wd.copy() centers.values = centers_values.reshape(-1, 1) if output_type == "indices": return sectors, sector_coords else: return centers, sector_coords
[docs] def vinterp_wind_direction(wind_direction, height, **kwargs): """ Interpolate wind direction to a given height. Parameters ---------- wind_direction : xarray.DataArray Wind direction. height : float Height to interpolate wind direction to. **kwargs : dict, optional Additional keyword arguments passed to xarray.interp. Returns ------- wind_direction : xarray.DataArray Interpolated wind direction. """ if not isinstance(wind_direction, xr.DataArray): raise TypeError("wind_direction must be a xarray.DataArray") if "height" not in wind_direction.dims: raise ValueError("wind_direction must have a height dimension") if not isinstance(height, (np.ScalarType, xr.DataArray)): raise TypeError("height must be a scalar or xarray.DataArray") wd_ref = wind_direction.isel(height=0) wd_diff = wind_direction_difference(wind_direction, wd_ref) wd_new = wd_ref - wd_diff.interp(height=height, **kwargs) return np.mod(wd_new, 360.0)
[docs] def vinterp_wind_speed(wind_speed, height, log_height=True, **kwargs): """ Vertically interpolate wind speed to a given height from other height levels. Parameters ---------- wind_speed : xarray.DataArray Wind speed. Must have a height dimension. height : float, xarray.DataArray Height to interpolate wind speed to. log_height : bool, optional If True, interpolate in log-height space. Defaults to True. **kwargs : dict, optional Additional keyword arguments passed to xarray.interp. Returns ------- wind_speed : xarray.DataArray Interpolated wind speed. """ if not isinstance(wind_speed, xr.DataArray): raise TypeError("wind_speed must be a xarray.DataArray") if "height" not in wind_speed.dims: raise ValueError("wind_speed must have a height dimension") if not isinstance(height, (np.ScalarType, xr.DataArray)): raise TypeError("height must be a scalar or xarray.DataArray") wind_speed = wind_speed.copy() if log_height: wind_speed = wind_speed.assign_coords(height=np.log1p(wind_speed.height)) if isinstance(height, xr.DataArray): height_ = height.copy() height = np.log1p(height) wind_speed = wind_speed.interp(height=height, **kwargs) if log_height and isinstance(height_, xr.DataArray): wind_speed = wind_speed.assign_coords(height=height_) return wind_speed
[docs] def rotor_equivalent_wind_speed( wind_speed, wind_direction, hub_height, rotor_diameter, delta_z=1.0, n_integrate=1001, ): """ Calculate the rotor equivalent wind speed (REWS) from given wind speed and directions on height levels. The procedure is as follows: 1. Find the area of each segment of the rotor spanned area. 2. Calculate the wind speed at the center of each segment by linearly interpolating the wind speed to the height of the segment center in log-height. 3. Calculate the wind direction at the center of each segment by linearly interpolating the wind direction to the height of the segment center. Circularity is taken into account here. 4. Calculate the wind direction at hub height by linearly interpolating the wind direction to the hub height. 5. Calculate the REWS as the cube root of the sum of the wind speed at each segment center multiplied by the area-weight (area/total) of the segment and the cosine of the difference between the wind direction at the segment center and the wind direction at hub height. Parameters ---------- wind_speed : xarray.DataArray Wind speed on height levels. wind_direction : xarray.DataArray Wind direction on height levels. hub_height : float Turbine Hub height. rotor_diameter : float Turbine rotor diameter. delta_z : float, optional Height difference between segments of turbine spanned rotor area (default: 1.0). n_integrate : int, optional Number of points to use for integration (default: 1001) of the area of each segment. Returns ------- rews : xarray.DataArray Rotor equivalent wind speed. """ if not isinstance(wind_speed, xr.DataArray): raise TypeError("wind_speed must be a xarray.DataArray") if not isinstance(wind_direction, xr.DataArray): raise TypeError("wind_direction must be a xarray.DataArray") if "height" not in wind_speed.dims: raise ValueError("wind_speed must have a height dimension") if "height" not in wind_direction.dims: raise ValueError("wind_direction must have a height dimension") hub_height = float(hub_height) rotor_diameter = float(rotor_diameter) delta_z = float(delta_z) n_integrate = int(n_integrate) rotor_radius = rotor_diameter / 2.0 zi = np.linspace( hub_height - rotor_radius, hub_height + rotor_radius, int(np.round(rotor_diameter / delta_z)) + 1, ) zc = (zi[1:] + zi[:-1]) / 2 zc = xr.DataArray(zc, dims=("height",), coords={"height": zc}) Ai = np.zeros_like(zc) for i in range(len(zi) - 1): zs = np.linspace(zi[i], zi[i + 1], n_integrate) Ai[i] = trapezoid(2 * np.sqrt(rotor_radius**2 - (zs - hub_height) ** 2), zs) # Area of rotor A = np.pi * rotor_radius**2 # Area of rotor segment Ai = xr.DataArray(Ai, dims=("height",), coords={"height": zc}) # wind speed and direction at segment center ui = vinterp_wind_speed( wind_speed, zc, method="linear", kwargs={"fill_value": "extrapolate"} ) di = vinterp_wind_direction( wind_direction, zc, method="linear", kwargs={"fill_value": "extrapolate"} ) # wind direction at hub height dh = vinterp_wind_direction( wind_direction, hub_height, method="linear", kwargs={"fill_value": "extrapolate"}, ) rews = ((1 / A) * Ai * (ui**3) * np.cos(np.deg2rad(di - dh))).sum(dim="height") rews = xr.where(rews < 0, 0, rews) rews = np.power(rews, 1.0 / 3.0) rews = rews.expand_dims(height=[hub_height]) return rews
[docs] def resample_wind_and_direction( ds, freq, var_ws="wind_speed", var_wd="wind_direction", min_availability=0.5, **kwargs, ): """Resample wind speed and direction to a given frequency. Parameters ---------- ds : xarray.Dataset Dataset with wind speed and direction. freq : str Resampling frequency. var_ws : str, optional Name of wind speed variable, by default "wind_speed". var_wd : str, optional Name of wind direction variable, by default "wind_direction". Returns ------- xarray.Dataset Resampled dataset. """ ds = ds.copy() def nan_mean(da): return da.mean(dim="time").where( da.notnull().sum(dim="time") >= len(da.time) * min_availability ) ds["__U__"], ds["__V__"] = wind_vectors(ds[var_ws], ds[var_wd]) ds = ds.drop_vars([var_ws, var_wd]) ds = ds.resample(time=freq, **kwargs).map(nan_mean) ds[var_ws], ds[var_wd] = wind_speed_and_direction(ds["__U__"], ds["__V__"]) ds = ds.drop_vars(["__U__", "__V__"]) return ds