Source code for windkit.wind_climate

# (c) 2022 DTU Wind Energy
"""
Wind climate functions.
"""
import random
from itertools import product

import numpy as np

from windkit import create_dataset
from windkit.spatial import get_crs, to_point

from .binned_wind_climate import _is_bwc, bwc_power_density, bwc_ws_moment
from .data_structures import DataStructures
from .generalized_wind_climate import _is_gwc
from .geostrophic_wind_climate import _is_geowc
from .metadata import _MET_ATTRS, update_var_attrs
from .time_series_wind_climate import _is_ts
from .weibull_wind_climate import _is_wwc, wwc_mean_windspeed, wwc_power_density


[docs] def get_wc_struct(obj): """Get the type of a WindKit wind climate object. Parameters ---------- obj : xarray.Dataset, xarray.DataArray Object to determine the wind climate structure from. Returns ------- string : Spatial structure name. Can be on of: - 'ts' - 'bwc' - 'wwc' - 'genwc' - 'geowc' If no spatial structure is found None is returned. """ if _is_bwc(obj): return DataStructures.BWC elif _is_wwc(obj): return DataStructures.WWC elif _is_geowc(obj): return DataStructures.GEOWC elif _is_gwc(obj): return DataStructures.GWC elif _is_ts(obj): return DataStructures.TS else: return None
[docs] def mean_windspeed(wco, bysector=False, emergent=True): """Calculate the mean wind speed from a wind climate. Parameters ---------- wco: xarray.Dataset Weibull Wind Climate or Binned Wind Climate Object. bysector: bool Return results by sector or as an all-sector value. Defaults to False emergent: bool Calculate the all-sector mean using the emergent (True) or the combined Weibull distribution (False). Defaults to True. Returns ------- xarray.DataArray DataArray with the wind speed. """ # TODO: add mean wind speed of time series object if bysector and emergent: raise ValueError( "Emergent wind speed cannot be calculated for sectorwise wind speed." ) if _is_bwc(wco): ws = bwc_ws_moment(wco, 1.0, bysector) elif emergent: ws = wwc_mean_windspeed(wco, bysector, emergent=True) ws.name = "wspd" return update_var_attrs(ws, _MET_ATTRS) else: ws = wwc_mean_windspeed(wco, bysector, emergent=False) if bysector: ws.name = "wspd_sector" else: ws.name = "wspd_combined" return update_var_attrs(ws, _MET_ATTRS)
[docs] def power_density(wco, bysector=False, emergent=True, air_density=1.225): """Calculate the power density of a bwc or wwc Parameters ---------- wco: xarray.Dataset Weibull Wind Climate or Binned Wind Climate Object. bysector: bool Return results by sector or as an all-sector value. Defaults to False. emergent: bool Calculate the all-sector mean using the emergent (True) or the combined Weibull distribution (False). Defaults to True. air_dens: xarray.DataArray or float xarray.DataArray with air density with the same dimensions as wco, by default use US standard atmosphere air density of 1.225 kg/m^3 Returns ------- xarray.DataArray DataArray with the power density. """ if bysector and emergent: raise ValueError( "Emergent power density cannot be calculated for sectorwise wind speed." ) # calculate power density if _is_bwc(wco): pd = bwc_power_density(wco, bysector, air_density) elif emergent: pd = wwc_power_density(wco, air_density, bysector=False, emergent=True) pd.name = "power_density" return update_var_attrs(pd, _MET_ATTRS) else: pd = wwc_power_density(wco, air_density, bysector, emergent=False) if bysector: pd.name = "power_density_sector" else: pd.name = "power_density_combined" return update_var_attrs(pd, _MET_ATTRS)
def _not_equal(points): """Return if not a self-prediction""" for x, y in points: if x != y: yield (x, y) def _higher(points): """Return if the target observation higher or equal than the source""" for x, y in points: if y[2] >= x[2]: yield (x, y) def _in_range(points, filter_range): """Return if the observation is within the chosen range""" for x, y in points: bottom, top = filter_range if x[2] >= bottom and x[2] < top and y[2] >= bottom and y[2] < top: yield (x, y)
[docs] def get_cross_predictions( wcs, wcs_src=None, include_self_predictions=True, only_upward_extrapolations=True, filter_range=None, sample_size=None, seed=4, ): """Get cross predictions from a dataset Given the filtering options, return a dataset with the points where we want to predict from and where we want to predict to. Parameters ---------- wcs: :any:`xarray.Dataset` wind climate `xarray.Dataset` for which we want to do cross predictions wcs_src: `xarray.Dataset` wind climate `xarray.Dataset` used as source for the cross predictions. If None, `wcs` is used as source and as target. Defaults to None. include_self_predictions: bool A self prediction is a pair of points where the input point is the exact same as the output point. Keep self predictions in the dataset? only_upward_extrapolations: bool Keep only the cross predictions where the height of point_in >= the height of point_out? filter_range: list height range that we want to retain from the input dataset sample_size: int Number of samples to take from the input dataset seed: int Seed number for the random sampling, if applied Returns ------- from_locs: :any:`xr.Dataset` xarray dataset with input locations to_locs: :any:`xr.Dataset` xarray dataset with target locations """ def point_to_multiindex(ds): """Convert to windkit `point` structure Parameters ---------- ds: :any:`xr.Dataset` xarray dataset with windkit `point` structure Returns ------- points: :any:`xr.Dataset` xarray dataset with multiindex point coordinate """ pp = to_point(ds) return pp.set_index(point=["west_east", "south_north", "height"]) def reindex_points(ds, locs): """Reindex ds by using point indices from locs Returns the dataset `ds` but on the points in `locs`. Parameters ---------- ds: :any:`xr.Dataset` xarray dataset with windkit `point` structure locs: :any:`xr.Dataset` xarray dataset with the desired windkit `point` structure Returns ------- points: :any:`xr.Dataset` xarray dataset with the variables from `ds` but at points from `locs` """ points = ds.sel( point=locs.set_index(point=["west_east", "south_north", "height"])["point"] ) return points.reset_index("point") point_bwc = point_to_multiindex(wcs) index = point_bwc.point.values points = ((i[0], i[1]) for i in product(index, index)) if not include_self_predictions: points = _not_equal(points) if only_upward_extrapolations: points = _higher(points) if filter_range is not None: points = _in_range(points, filter_range) # if lp is an empty list we have filtered out all measurements # so we return None lp = list(points) if lp: if sample_size is not None: # take sample_size from the points, if the number of samples exceeds # the length of the list, take all points random.seed(seed) point_from, point_to = zip(*random.sample(lp, min(sample_size, len(lp)))) else: point_from, point_to = zip(*lp) dataset_from = create_dataset( np.array(point_from).T[0], np.array(point_from).T[1], np.array(point_from).T[2], crs=get_crs(point_bwc), ) dataset_to = create_dataset( np.array(point_to).T[0], np.array(point_to).T[1], np.array(point_to).T[2], crs=get_crs(point_bwc), ) to_locs = reindex_points(point_bwc, dataset_to) if wcs_src is None: from_locs = reindex_points(point_bwc, dataset_from) else: point_wcs_src = point_to_multiindex(wcs_src) from_locs = reindex_points(point_wcs_src, dataset_from) return (from_locs, to_locs) else: return (None, None)