"""Generalized Wind Climate
Extends `pywaspyio.generalized_wind_climate` to include interpolation
"""
import logging
import numpy as np
import xarray as xr
import warnings
from scipy.interpolate import griddata as sp_griddata
from windkit.generalized_wind_climate import (
gwc_validate_wrapper,
REQ_DIMS_GWC,
) # pylint: disable=unused-wildcard-import
from windkit.geostrophic_wind_climate import _is_geowc, REQ_DIMS_GEOWC
from windkit.binned_wind_climate import (
bwc_from_counts,
bwc_ws_moment,
bwc_ws_freq_gt_mean,
)
from windkit.weibull_wind_climate import weibull_moment
from windkit.weibull import fit_weibull_wasp_m1_m3_fgtm, weibull_freq_gt_mean
from windkit.metadata import update_history
from windkit import spatial as wksp
from windkit.empty import empty_gwc
from windkit.spatial import (
get_crs,
reproject,
count_spatial_points,
equal_spatial_shape,
interp_structured_like,
interp_unstructured_like,
is_raster,
)
from ..io import _nat_neighbor_interp as natn
from ..core import Rvea0326
from .binned_wind_climate import bwc_resample_wsbins_like
_GWC_INTERP = Rvea0326.gwc.gwcinterpolate
logger = logging.getLogger(__name__)
def _2D_to_out_locs(gwc, out_locs, exclude_dims=REQ_DIMS_GWC):
# Set structure of 2D point gwc object to match that of out_locs
if wksp.is_raster(out_locs):
gwc = wksp.to_raster(gwc, True)
if wksp.is_vertical(out_locs):
return xr.broadcast(gwc, out_locs, exclude=exclude_dims)[0]
else:
return gwc
[docs]
def interpolate_gwc(
gwc, /, output_locs, method="given", use_moments=True, engine="fortran"
):
"""
Spatially interpolate GWC data to a grid of locations.
The interpolation is done in several steps:
1. Calculate the moments of the Weibull distribution from the A and k parameters.
2. Spatially interpolate the moments to the output locations.
3. Fit new A and k parameters from the interpolated moments.
The first and third moments as well as the frequency of wind speeds greater than the mean
are used, corresponding to WASP methodology.
.. warning::
This function is experimental and its signature may change. The default engine is "fortran", but this
will be deprecated in the future and the default will be "windkit".
Parameters
----------
gwc : xr.Dataset
GWC dataset
output_locs : windkit.spatial.BBox
Output locations
method : str, optional
Interpolation method. By default "given"
Options are:
- "given" for using the given values in the GWC dataset (shapes must be the same)
- "nearest" for nearest neighbor interpolation
- "linear" for bilinear interpolation
- "cubic" for bicubic interpolation
- "natural" for natural neighbor interpolation
use_moments : bool
Only used when the gwc is a geostrophic binned wind climate. Whether to
use the moments in the binned wind climate for interpolation or interpolate
the histogram directly using the specified 'method'.
engine : {'windkit','fortran'}
Backend to use for interpolation.
Returns
-------
xr.Dataset
Interpolated GWC dataset.
Raises
------
ValueError
If the GWC and output locations do not have the same CRS.
ValueError
If the GWC contains any of "__m1__", "__m3__", "__fgtm__". These are reserved for internal use.
ValueError
If the shape of GWC and output locations are not the same when method is "given".
UserWarning
If the GWC is in geographic coordinates and method is "nearest", "linear", "cubic", or "natural".
Interpolation in geographic coordinates can be inaccurate.
"""
if engine == "windkit":
return _interpolate_gwc(
gwc, output_locs, method=method, use_moments=use_moments
)
else:
warnings.warn(
"The 'fortran' engine is deprecated. Use 'windkit' instead!",
FutureWarning,
)
return _interpolate_gwc_pw(gwc, output_locs, method=method)
def _interpolate_gwc(gwc, /, output_locs, method="given", use_moments=True):
"""
Spatially interpolate GWC data to a grid of locations.
The interpolation is done in several steps:
1. Calculate the moments of the Weibull distribution from the A and k parameters.
2. Spatially interpolate the moments to the output locations.
3. Fit new A and k parameters from the interpolated moments.
The first and third moments as well as the frequency of wind speeds greater than the mean
are used, corresponding to WASP methodology.
Parameters
----------
gwc : xr.Dataset
GWC dataset.
output_locs : windkit.spatial.BBox
Output locations.
method : str, optional
Interpolation method. By default "given".
Options are:
- "given" for using the given values in the GWC dataset
in this case the shapes of the output dataset and the
GWC dataset must be the same.
- "nearest" for nearest neighbor interpolation
- "linear" for bilinear interpolation
- "cubic" for bicubic interpolation
- "natural" for natural neighbor interpolation
use_moments: bool
Only used when the gwc is a geostrophic binned wind climate. Whether to
use the moments in the binned wind climate for interpolation or interpolate
the histogram directly using the specified 'method'.
Returns
-------
xr.Dataset
Interpolated GWC dataset.
Raises
------
ValueError
If the GWC contains any of "__m1__", "__m3__", "__fgtm__". These are reserved for internal use.
ValueError
If the shape of GWC and output locations are not the same when method is "given".
UserWarning
If the GWC is in geographic coordinates and method is "nearest", "linear", "cubic", or "natural".
Interpolation in geographic coordinates can be inaccurate.
"""
crs_gwc = get_crs(gwc)
crs_out = get_crs(output_locs)
if crs_out.is_geographic and method in ["nearest", "linear", "cubic", "natural"]:
warnings.warn(
"Interpolation of GWCs in geographic coordinates can be inaccurate. "
)
if not crs_gwc.equals(crs_out):
gwc = reproject(gwc.copy(), crs_out)
is_geowc = _is_geowc(gwc)
REQ_DIMS = REQ_DIMS_GEOWC if is_geowc else REQ_DIMS_GWC
if count_spatial_points(gwc) == 1:
gwc_out = gwc.broadcast_like(output_locs, exclude=REQ_DIMS)
return update_history(gwc_out)
if method == "given":
gwc_pt = wksp.to_point(gwc)
out_locs_pt = wksp.to_point(output_locs)
if not equal_spatial_shape(gwc_pt, out_locs_pt):
raise ValueError(
"""No interpolation specificed, but gwc and out_locs are not spatially
equal. Please specify an interpolation method."""
)
return update_history(gwc)
if any([var in gwc.data_vars for var in ["__m1__", "__m3__", "__fgtm__"]]):
raise ValueError(
"GWC cannot contain any of '__m1__', '__m3__', '__fgtm__'. These are reserved for internal use."
)
if not is_geowc:
gwc["__m1__"] = weibull_moment(gwc.A, gwc.k, 1)
gwc["__m3__"] = weibull_moment(gwc.A, gwc.k, 3)
gwc["__fgtm__"] = weibull_freq_gt_mean(gwc.A, gwc.k)
gwc = gwc.drop_vars(["A", "k"], errors="ignore")
else:
if use_moments and method != "nearest":
gwc[["wsfreq", "wdfreq"]] = bwc_from_counts(gwc["geo_wv_freq"])[
["wsfreq", "wdfreq"]
]
gwc["__m1__"] = bwc_ws_moment(gwc, 1, bysector=True)
gwc["__m3__"] = bwc_ws_moment(gwc, 3, bysector=True)
gwc["__fgtm__"] = bwc_ws_freq_gt_mean(gwc, bysector=True)
gwc = gwc[
["__m1__", "__m3__", "__fgtm__", "wdfreq", "geo_wv_freq", "geo_turn"]
]
else:
gwc = gwc[["geo_wv_freq", "geo_turn"]]
if is_raster(gwc) and method != "natural":
exclude_dims = [k for k, v in gwc.sizes.items() if v == 1]
gwc_out = interp_structured_like(
gwc, output_locs, method=method, exclude_dims=exclude_dims
)
if is_geowc:
gwc_out["geo_wv_freq"] = gwc_out["geo_wv_freq"].where(
gwc_out["geo_wv_freq"] >= 0, 0.0
)
if use_moments and method != "nearest":
# we need the histogram in traditional wsfreq/wdfreq format for bwc_resample_wsbins
geowc_freq = bwc_from_counts(gwc_out["geo_wv_freq"])[
["wsfreq", "wdfreq"]
]
# calculate A and k using interpolated moments
A, k = fit_weibull_wasp_m1_m3_fgtm(
gwc_out["__m1__"], gwc_out["__m3__"], gwc_out["__fgtm__"]
)
geowc_freq["A"] = A
geowc_freq["k"] = k
# resample histogram to the weibull distributions given by the new A and k
geowc_freq = bwc_resample_wsbins_like(
geowc_freq, geowc_freq, fit_weibull=False, errors="ignore"
)
geowc_freq["geo_wv_freq"] = geowc_freq["wsfreq"] * geowc_freq["wdfreq"]
geowc_freq["geo_turn"] = (
gwc_out["geo_turn"].dims,
gwc_out["geo_turn"].values,
)
gwc_out = geowc_freq.drop_vars(["wsfreq", "wdfreq"], errors="ignore")
else:
exclude_dims = [k for k, v in gwc.sizes.items() if v == 1]
gwc_out = interp_unstructured_like(
gwc, output_locs, method=method, exclude_dims=exclude_dims
)
if not _is_geowc(gwc_out):
gwc_out["wdfreq"] = gwc_out["wdfreq"] / gwc_out["wdfreq"].sum(dim="sector")
A, k = fit_weibull_wasp_m1_m3_fgtm(
gwc_out["__m1__"], gwc_out["__m3__"], gwc_out["__fgtm__"]
)
gwc_out["A"] = A
gwc_out["k"] = k
gwc_out = gwc_out.drop_vars(["__m1__", "__m3__", "__fgtm__"])
return update_history(gwc_out)
@gwc_validate_wrapper
def _interpolate_gwc_pw(
gwc, out_locs, method="given"
): # pylist:disable=too-many-locals
"""Interpolate gwc object to new grid with optional reprojection
Parameters
----------
gwc: xarray.Dataset
PyWAsP Generalized Wind Climate dataset
out_locs: xarray.Dataset
Dataset with at least west_east, south_north dimensions
and a crs coordinate.
method: str
"nearest" for nearest neighbor interpolation
"natural" for natural neighbor interpolation
"given" ignores spatial information, validate data has the same spatial size,
after both gwc and out_locs have been converted to the point spatial structure.
Used most often for cross-predictions, where the gwc and the output
have different locations.
Returns
-------
xr.Dataset
PyWAsP Generalized Wind Climate dataset on new grid
"""
if method is None:
warnings.warn(
"The interpolation method None is deprecated and now defaults to given.",
FutureWarning,
)
method = "given"
if (
wksp.get_crs(gwc).is_geographic or wksp.get_crs(out_locs).is_geographic
) and method in ["nearest", "natural"]:
warnings.warn(
"This dataset has geographic coordinates. Querying for nearest neighbour is unreliable, as points that appear far away in latitude/longitude coordinate space may actually be close, particularly near the poles!"
)
# Set variables needed for all methods
out_crs = wksp.get_crs(out_locs)
gwc_is_single_pt = (gwc["south_north"].size == 1) and (gwc["west_east"].size == 1)
if "height" in gwc.coords:
gwc_is_single_pt = gwc_is_single_pt and (gwc["height"].size == 1)
#####################################################################################
# Allow for "given" option, which returns the input dataset if spatially equal
#####################################################################################
if method == "given":
gwc_pt = wksp.to_point(gwc)
out_locs_pt = wksp.to_point(out_locs)
if not equal_spatial_shape(gwc_pt, out_locs_pt):
raise ValueError(
"""No interpolation specificed, but gwc and out_locs are not spatially
equal. Please specify an interpolation method."""
)
return update_history(gwc)
#####################################################################################
# Return single point extended across all of the output points if only one GWC
#
# This is handled by broadcasting the GWC point to all of the spatial dimensions of
# the out_locs array. Therefore, we only have to spatial stack the GWC object, and
# drop the point dimension and all associated coordinates
#####################################################################################
elif gwc_is_single_pt:
logger.info("Applying single point GWC to all output points")
# Drop spatial information from gwc
gwc_pt = wksp.spatial_stack(gwc, out_crs, revertable=False).squeeze(
"point", drop=True
)
# Broadcast single point across entire domain, first output is gwc
res = gwc_pt.broadcast_like(out_locs, exclude=REQ_DIMS_GWC)
# Dimensions of the broadcasted dimensions are not copied over
for d in res.dims:
if d not in res.coords:
res.coords[d] = out_locs.coords[d]
############################################################################
# Nearest Neighbor Interpolation
#
# We use the scipy.interpolate.griddata function to get the indices of each GWC point
# for each associated out_locs point. The GWC data is always spatially stacked, but
# the out_locs can be any spatial structure. We handle the data in two steps, in the
# first step, we process it in 2D depending on if it is a raster structure or a point
# structure for those two dimensions, i.e. does it have dimensions south_north and
# west_east, or only...S4v4n4r4m...
# a "point" or "stacked_point". Then we broadcast that result to all
# heights, if there is a height dimension.
#
############################################################################
elif (not gwc_is_single_pt) and (method == "nearest"):
logger.info("Nearest neighbor interpolation from scipy.interpolate.griddata.")
# Create a 2D tuple of the input points, GWC doesn't have height at this time
gwc_pt = wksp.spatial_stack(gwc, out_crs, remove_height=True, revertable=False)
points = (gwc_pt.west_east, gwc_pt.south_north)
# If raster, we need to stack first before combining
if wksp.is_raster(out_locs):
out_locs_pt = wksp.spatial_stack(out_locs, remove_height=True)
else:
out_locs_pt = out_locs
# Get indices of the nearest point and reshape, this will always have a point
# structure, as it will follow what exists from the gwc_pt object.
xi = (out_locs_pt.west_east, out_locs_pt.south_north)
idx = sp_griddata(points, np.arange(gwc_pt.point.size), xi, method="nearest")
gwc_out_2d = gwc_pt.isel(point=idx.flatten().astype("int"))
# Update coordinates
if wksp.is_stacked_point(out_locs):
gwc_out_2d = gwc_out_2d.rename_dims(point="stacked_point")
gwc_out_2d = gwc_out_2d.assign_coords(
west_east=out_locs_pt["west_east"], south_north=out_locs_pt["south_north"]
)
# Set structure to match that of out_locs
res = _2D_to_out_locs(gwc_out_2d, out_locs)
elif (not gwc_is_single_pt) and (method == "natural"):
#################################################################################
# Natural Neighbors Interpolation
#
# Use a spatial stacked GWC and a stackpoint formatted out_locs to do a 2D
# interpolation.
#################################################################################
gwc_pt = wksp.spatial_stack(gwc, out_crs, revertable=False)
# Locate the natural neighbors
members, delauny_tri, tri_info = natn.find_natural_neighbors(gwc_pt, out_locs)
# Drop members (out_locs) that don't have any neighbors
gen = [(key, val) for (key, val) in members.items() if len(val) != 0]
# Create 2d list of points in output grid [x, y]
out_locs_pt = wksp.to_stacked_point(out_locs)
grid_points = np.vstack([out_locs_pt.west_east, out_locs_pt.south_north]).T
# Create output arrays
natural = empty_gwc(
out_locs_pt,
gwc_pt.sizes["sector"],
gen_heights=gwc_pt["gen_height"],
gen_roughnesses=gwc_pt["gen_roughness"],
not_empty=False,
)
natural = wksp.to_stacked_point(wksp._struct._from_scalar(natural))
if wksp.is_vertical(natural):
natural = natural.isel(height=0, drop=True)
natural = natural.transpose(
"stacked_point", "sector", "gen_height", "gen_roughness"
)
# loop over each member that has neighbors
for k_member, v_member in gen:
# Calculate the weighting from the natural neighbors
wgts, nn_pts = natn.nn_weights(
gwc_pt.west_east,
gwc_pt.south_north,
grid_points[k_member],
delauny_tri,
v_member,
tri_info,
)
# Collect neighbors for member, and transpose to match fortran's order
gwc_neigh = gwc_pt.isel(point=nn_pts).transpose(
"point",
"sector",
"gen_height",
"gen_roughness",
)
wgts = wgts.astype(np.float32)
A_loc, k_loc, f_loc = _GWC_INTERP(
gwc_neigh.A, gwc_neigh.k, gwc_neigh.wdfreq, wgts
)
natural.A[k_member, ...] = A_loc
natural.k[k_member, ...] = k_loc
natural.wdfreq[k_member, ...] = f_loc
# Set structure to match that of out_locs
res = _2D_to_out_locs(natural, out_locs)
else:
raise ValueError(f"Unknown method ({method})")
return update_history(res.transpose(*(REQ_DIMS_GWC), ...))