# (c) 2022 DTU Wind Energy
"""
Create empty datasets for various WindKit structures.
The datasets have the correct shape, dimensions, coordinates and data variables and
can also be filled with meaningful random data.
"""
import numpy as np
import pandas as pd
import scipy.stats
import xarray as xr
import windkit as wk
from .metadata import (
_BWC_ATTRS,
_HIS_ATTRS,
_MAP_TYPE_ATTRS,
_MET_ATTRS,
_TOPO_EFFECTS_ATTRS,
_TS_ATTRS,
_WEIB_ATTRS,
ALL_VARS_META,
update_history,
update_var_attrs,
)
from .sector import create_sector_coords, create_ws_bin_coords
from .spatial import is_point, spatial_stack, spatial_unstack
_metvars_3d_nosec = [
"wspd", # same as A
"power_density", # 100 - 600
"air_density", # 1.225, 0.2 stddev
"wspd_emergent", # same as A
"power_density_emergent", # same as power density
"A_combined", # same as A
"k_combined", # same as k
]
_metvars_4d = ["wspd_sector", "power_density_sector"] # same as wspd, power_density
# metadata for the generalized wind climate
_GEN_COORDS_META = {
"gen_height": ALL_VARS_META["gen_height"],
"gen_roughness": ALL_VARS_META["gen_roughness"],
"sector": ALL_VARS_META["sector"],
}
__all__ = [
"empty_wasp_site_factors",
"empty_bwc",
"empty_wwc",
"empty_gwc",
"empty_met_fields",
"empty_z0meso",
"empty_pwc",
"empty_wv_count",
"empty_raster_map",
]
def _empty_unstack(ds, is_scalar):
"""
Allow user to retain scalar structure when using spatial_unstack"
"""
if is_scalar:
return ds.isel(point=0)
else:
return spatial_unstack(ds)
def _define_std_arrays(output_locs, nsec=12):
"""Return standard 2D, 3D, and 4D arrays in point format"""
output_is_point = is_point(output_locs)
out_std = spatial_stack(output_locs).drop_vars(output_locs.data_vars)
# check if it is a scalar
is_scalar = not output_is_point and out_std.sizes["point"] == 1
# Setup sector
sector_coords = create_sector_coords(nsec).coords
dims = ("sector", "point")
out_sec_std = out_std.assign_coords(sector_coords)
values = np.full((nsec, out_std.sizes["point"]), np.nan, np.float32)
out_das = {}
# x, y
out_das["da_2d"] = xr.DataArray(
values[0,],
out_std.coords,
dims[1:],
attrs={"_pwio_data_is_2d": True},
)
# sector, x, y
out_das["da_3d_nohgt"] = xr.DataArray(
values, out_sec_std.coords, dims, attrs={"_pwio_data_is_2d": True}
)
# height, x, y
out_das["da_3d_nosec"] = xr.DataArray(
values[0,],
out_std.coords,
dims[1:],
attrs={"_pwio_data_is_2d": False},
)
# Sector, height, x, y
out_das["da_4d"] = xr.DataArray(values, out_sec_std.coords, dims, attrs={})
return out_das, out_std.attrs, is_scalar
def _copy_chunks(in_ds, out_ds):
"""copy chunks from in_ds to out_ds"""
# If input is not chunked it will have an emtpy chunks dict, so we need to build a
# custom chunk_map based on the chunked dimensions of the original data.
chunk_map = {}
for i in in_ds.chunks:
chunk_map[i] = in_ds.chunks[i][0]
# Remember in Python empty dictionaries are False
if chunk_map:
return out_ds.chunk(chunk_map)
else:
return out_ds
[docs]
def empty_wasp_site_factors(
output_locs,
nsec=12,
not_empty=True,
site_factors=["z0meso", "displ"],
seed=9876538,
**kwargs,
):
"""Create empty site-factors dataset.
Parameters
----------
output_locs : xarray.Dataset
Output geospatial information
nsec : int
Number of sectors. Defaults to 12.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data.
site_Factors : list of strings, or string
List of variables to include in the output, or a string "all" that
includes all variables. Defaults to ["z0meso", "displ"]
seed : int
Seed for the random data, defaults to 9876538.
kwargs : dict
Additional arguments.
Returns
-------
ds : xarray.Dataset
Empty site factors dataset.
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs, nsec)
_site_factors_vars_2d = ["site_elev", "rix"]
_site_factors_vars_3d_nohgt = ["z0meso", "slfmeso", "displ", "dirrix"]
_site_factors_vars_4d = [
"user_def_speedups",
"orographic_speedups",
"obstacle_speedups",
"roughness_speedups",
"user_def_turnings",
"orographic_turnings",
"obstacle_turnings",
"roughness_turnings",
]
if site_factors == "all":
site_factors = (
_site_factors_vars_2d + _site_factors_vars_3d_nohgt + _site_factors_vars_4d
)
random_param_dict = {
"z0meso": (0, 1.2),
"slfmeso": (0, 1),
"displ": (0, 10),
"user_def_speedups": 1,
"orographic_speedups": (0.6, 1.5),
"obstacle_speedups": 1,
"roughness_speedups": (0.6, 1.5),
"user_def_turnings": 0,
"orographic_turnings": (-20, 20),
"obstacle_turnings": 0,
"roughness_turnings": 0,
"dirrix": (0, 0.5),
"site_elev": (0, 100),
"rix": (0, 0.5),
}
out_vars = {}
for var in site_factors:
if var in _site_factors_vars_2d:
out_vars[var] = da_dict["da_2d"]
elif var in _site_factors_vars_3d_nohgt:
out_vars[var] = da_dict["da_3d_nohgt"]
elif var in _site_factors_vars_4d:
out_vars[var] = da_dict["da_4d"]
else:
raise ValueError(f"Unknown {var}, cannot add to result")
ds = xr.Dataset(
out_vars,
attrs=unstack_attrs,
)
if not_empty:
rng = np.random.default_rng(seed)
for val in site_factors:
rand_param = random_param_dict[val]
if type(rand_param) == tuple:
ds[val].values = rng.uniform(*rand_param, ds[val].shape)
else:
ds[val].values = np.zeros(ds[val].shape)
ustack_ds = _empty_unstack(ds, is_scalar)
ds = update_var_attrs(_copy_chunks(output_locs, ustack_ds), _TOPO_EFFECTS_ATTRS)
return update_history(ds)
[docs]
def empty_tswc(output_locs, date_range=None, not_empty=True, seed=9876538):
"""
Create empty time series wind climate dataset.
If not_empty=True, the data variables are filled with meaninful random numbers.
Parameters
----------
output_loc : xarray.Dataset
Output geospatial information.
time_range : pandas.DatetimeIndex or None
time range as a pandas DateTimeIndex. If None is passed, a default range with 100
entries is created. Defaults to None.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data. Defaults to True.
seed : int
Seed for the random data, defaults to 9876538.
Returns
-------
ds : xarray.Dataset
Time Series wind climate dataset either empty or filled with
random numbers.
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs)
if date_range is None:
time_values = pd.date_range(
"2001-01-01", "2010-01-01", freq="10min", inclusive="left"
)
elif type(date_range) is not pd.DatetimeIndex:
raise TypeError("date_range must be of type pandas.DatetimeIndex")
else:
time_values = date_range
ds = xr.Dataset(
{
"wind_speed": da_dict["da_3d_nosec"],
"wind_direction": da_dict["da_3d_nosec"],
},
attrs=unstack_attrs,
)
ds["wind_speed"] = ds["wind_speed"].expand_dims({"time": time_values})
ds["wind_direction"] = ds["wind_direction"].expand_dims({"time": time_values})
n_pt = len(ds["point"])
n_timesteps = len(ds["time"])
if not_empty:
rng = np.random.default_rng(seed)
k = 2.0
A = 8.0
ws = rng.weibull(k, size=(n_timesteps, n_pt)) * A
wd = rng.uniform(0, 360, (n_timesteps, n_pt))
ds["wind_speed"].data = ws
ds["wind_direction"].data = wd
ustack_ds = _empty_unstack(ds, is_scalar)
ds = update_var_attrs(_copy_chunks(output_locs, ustack_ds), _TS_ATTRS)
return update_history(ds)
[docs]
def empty_bwc(output_locs, nsec=12, nbins=30, not_empty=True, seed=9876538):
"""
Create empty binned wind climate dataset.
If not_empty=True, the data variables are filled with meaninful random numbers,
e.g. the sum of wdfreq is 1.
Parameters
----------
output_loc : xarray.Dataset
Output geospatial information.
nsec : int
Number of sectors, defaults to 12.
nbins: int
Number of histogram bins, defaults to 30.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data.
seed : int
Seed for the random data, defaults to 9876538.
Returns
-------
ds : xarray.Dataset
Binned wind climate dataset either empty or filled with
random numbers.
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs, nsec)
ds = xr.Dataset(
{"wdfreq": da_dict["da_4d"], "wsfreq": da_dict["da_4d"]}, attrs=unstack_attrs
)
wsbin_coords = create_ws_bin_coords(bin_width=1.0, nws=nbins)
ds["wsfreq"] = ds["wsfreq"].expand_dims({"wsbin": wsbin_coords.values})
ds = ds.assign_coords({**wsbin_coords.coords})
n_pt = len(ds["point"])
if not_empty:
wsbin_n = np.linspace(1, nbins, nbins)
rng = np.random.default_rng(seed)
wsbin_full = wsbin_n.repeat(nsec * n_pt).reshape((nbins, nsec, n_pt))
k = rng.uniform(1.5, 2.5, [nsec, n_pt])
A = rng.uniform(5, 10, [nsec, n_pt])
wsbin_freq_not1 = scipy.stats.weibull_min.pdf(wsbin_full, k, scale=A)
wsbin_freq = wsbin_freq_not1 / wsbin_freq_not1.sum(0)
ds["wsfreq"] = xr.DataArray(wsbin_freq, ds["wsfreq"].coords, ds["wsfreq"].sizes)
ds["wdfreq"] = xr.DataArray(
rng.dirichlet(np.ones(nsec), n_pt).T,
ds["wdfreq"].coords,
ds["wdfreq"].sizes,
)
ustack_ds = _empty_unstack(ds, is_scalar)
ds = update_var_attrs(_copy_chunks(output_locs, ustack_ds), _BWC_ATTRS)
return update_history(ds)
[docs]
def empty_wwc(output_locs, nsec=12, not_empty=True, seed=9876538, **kwargs):
"""Create empty weibull wind climate dataset.
If not_empty=True,the data variables are filled with meaninful random numbers, e.g.
the values from A are generated from a uniform function between 5
and 10 and the values for k from a uniform function between 1.5 and 2.5.
Parameters
----------
output_locs : xarray.Dataset
Output geospatial information
nsec : int
Number of sectors, defaults to 12.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data. Defaults to True.
seed : int
Seed for the random data, defaults to 9876538.
kwargs : dict
Additional arguments.
Returns
-------
ds : xarray.Dataset
Weibull wind climate dataset either empty or filled with
random numbers.
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs, nsec)
ds = xr.Dataset(
{"A": da_dict["da_4d"], "k": da_dict["da_4d"], "wdfreq": da_dict["da_4d"]},
attrs=unstack_attrs,
)
n_pt = len(ds["point"])
if not_empty:
rng = np.random.default_rng(seed)
k = rng.uniform(1.5, 2.5, [nsec, n_pt])
A = rng.uniform(5, 10, [nsec, n_pt])
ds["A"] = xr.DataArray(A, ds["A"].coords, ds["A"].dims)
ds["k"] = xr.DataArray(k, ds["k"].coords, ds["k"].dims)
ds["wdfreq"] = xr.DataArray(
rng.dirichlet(np.ones(nsec), n_pt).T,
ds["wdfreq"].coords,
ds["wdfreq"].dims,
)
ustack_ds = _empty_unstack(ds, is_scalar)
ds = update_var_attrs(_copy_chunks(output_locs, ustack_ds), _WEIB_ATTRS)
return update_history(ds)
[docs]
def empty_gwc(
output_locs,
nsec=12,
not_empty=True,
seed=9876538,
gen_heights=(10.0, 50.0, 100.0),
gen_roughnesses=(0.0, 0.03, 0.1, 0.4, 1.5),
wdfreq_constant=False,
**kwargs,
):
"""Create empty generalized wind climate dataset.
If not_empty=True, the data variables are filled with meaninful random numbers, e.g.
the values from A are generated from a uniform function between 5
and 10 and the values for k from a uniform function between 1.5 and 2.5.
Parameters
----------
output_locs : xarray.Dataset
Output geospatial information.
nsec : int
Number of sectors, defaults to 12.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data. Defaults to True.
seed : int
Seed for the random data, defaults to 9876538.
gen_heights : list
List of generalized heights to use for coordinates
gen_roughnesses : list
List of generalized roughnesses to use for coordinates
wdfreq_constant: bool
If True, the values of wdfreq do not change with along the dimension
gen_heights. This is used when writing lib files. Defaults to False.
kwargs : dict
Additional arguments.
Returns
-------
ds : xarray.Dataset
Generalized wind climate dataset either empty or filled with
random numbers.
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs, nsec)
ds = xr.Dataset(
{"A": da_dict["da_4d"], "k": da_dict["da_4d"], "wdfreq": da_dict["da_4d"]},
attrs=unstack_attrs,
)
gen_rou_coords = np.array(gen_roughnesses, dtype=float)
gen_h_coords = np.array(gen_heights, dtype=float)
n_gen_rou = len(gen_rou_coords)
n_gen_h = len(gen_h_coords)
ds = ds.expand_dims(
{
"gen_roughness": gen_rou_coords,
}
)
ds["A"] = ds["A"].expand_dims({"gen_height": gen_h_coords})
ds["k"] = ds["k"].expand_dims({"gen_height": gen_h_coords})
ds["wdfreq"] = ds["wdfreq"].expand_dims({"gen_height": gen_h_coords})
n_pt = len(ds["point"])
if not_empty:
rng = np.random.default_rng(seed)
k = rng.uniform(1.5, 2.5, [n_gen_h, n_gen_rou, nsec, n_pt])
A = rng.uniform(5, 10, [n_gen_h, n_gen_rou, nsec, n_pt])
ds["A"] = xr.DataArray(A, ds["A"].coords, ds["A"].dims)
ds["k"] = xr.DataArray(k, ds["k"].coords, ds["k"].dims)
ds["wdfreq"] = xr.DataArray(
rng.dirichlet(np.ones(nsec), (n_gen_h, n_gen_rou, n_pt)),
dims=("gen_height", "gen_roughness", "point", "sector"),
)
ds["gen_roughness"].attrs = {**_GEN_COORDS_META["gen_roughness"]}
ds["gen_height"].attrs = {**_GEN_COORDS_META["gen_height"]}
ds["sector"].attrs = {**_GEN_COORDS_META["sector"]}
ustack_ds = _empty_unstack(ds, is_scalar)
if wdfreq_constant:
da_wdfreq_constant = ustack_ds.wdfreq.isel(gen_height=0).expand_dims(
dim={"gen_height": ustack_ds.gen_height.values}
)
ustack_ds["wdfreq"] = da_wdfreq_constant
ds = update_var_attrs(_copy_chunks(output_locs, ustack_ds), _WEIB_ATTRS)
return update_history(ds)
[docs]
def empty_met_fields(
output_locs,
nsec=12,
not_empty=True,
seed=9876538,
met_fields=["wspd", "power_density"],
**kwargs,
):
"""Create empty dataset filled with met_fields
Parameters
----------
output_locs : xarray.Dataset
Output geospatial information
nsec : int
Number of sectors, defaults to 12
met_fields : list of strings, or string
List of variables to include in the output, or a string "all" with
all the variables. Defaults to ["wspd", "power_dens"]
kwargs : dict
Additional arguments.
Returns
-------
ds : xarray.Dataset
empty met fields dataset
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs, nsec)
random_param_dict = {
"wspd": (5, 10),
"power_density": (100, 600),
"air_density": ("gaussian", 1.225, 0.2),
"wspd_emergent": (5, 10),
"power_density_emergent": (100, 600),
"A_combined": (5, 10),
"k_combined": (5, 10),
"wspd_sector": (5, 10),
"power_density_sector": (100, 600),
}
if met_fields == "all":
met_fields = _metvars_4d + _metvars_3d_nosec
out_vars = {}
for var in met_fields:
if var in _metvars_4d:
out_vars[var] = da_dict["da_4d"]
elif var in _metvars_3d_nosec:
out_vars[var] = da_dict["da_3d_nosec"]
else:
raise ValueError(f"Unknown met_field {var}, cannot add to result")
ds = xr.Dataset(
out_vars,
attrs=unstack_attrs,
)
if not_empty:
rng = np.random.default_rng(seed)
for val in met_fields:
rand_param = random_param_dict[val]
if len(rand_param) == 2:
ds[val].values = rng.uniform(*rand_param, ds[val].shape)
else:
ds[val].values = rng.normal(*rand_param[1:], ds[val].shape)
ustack_ds = _empty_unstack(ds, is_scalar)
ds = update_var_attrs(_copy_chunks(output_locs, ustack_ds), _MET_ATTRS)
return update_history(ds)
[docs]
def empty_z0meso(output_locs, nsec=12, **kwargs):
"""Empty site_factors with only z0meso and slfmeso.
Parameters
----------
out_grid : xarray.Dataset
Output geospatial information.
nsec : int
Number of sectors, defaults to 12.
kwargs : dict
Additional arguments.
Returns
-------
ds : xarray.Dataset
Empty dataset.
"""
empty_z0 = empty_wasp_site_factors(output_locs, nsec)[["z0meso", "slfmeso"]]
return update_history(empty_z0)
[docs]
def empty_pwc(
output_locs,
nsec=12,
not_empty=True,
seed=9876538,
met_fields=["A_combined", "k_combined", "power_density"],
site_factors=["site_elev"],
include_name=True,
**kwargs,
):
"""Empty predicted wind climate with optional variables.
Parameters
----------
out_grid : xarray.Dataset
Output geospatial information
nsec : int
Number of sectors, defaults to 12.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data. Defaults to True.
seed : int
Seed for the random data, defaults to 9876538.
met_fields : list of str, string
List of met fields variables to include in the output. If None, nothing
is included. Defaults to A_combined, k_combined, power_density
include_site_factors : list of str, str
List of site factors variables to include in the output. If None, nothing
is included. Defaults to site_elev.
include_name : bool
If true, include a "name" coordinate, which is a string, associated to the dimension
point, as it is commonly seen in rsf and wrg files. Defaults to True.
kwargs : dict
Additional arguments.
Returns
-------
ds : xarray.Dataset
empty predicted wind climate dataset.
"""
_, _, is_scalar = _define_std_arrays(output_locs, nsec)
output_locs = spatial_stack(output_locs) # needed to handle scalar case
if include_name:
output_locs = output_locs.assign_coords(
name=("point", ["GridPoint"] * output_locs.sizes["point"])
)
ds_list = [empty_wwc(output_locs, nsec)]
# increment seed +1
seed += 1
if site_factors:
# ds_list.append(empty_wasp_site_factors(output_locs, nsec)[include_site_factors])
ds_list.append(
empty_wasp_site_factors(
output_locs,
nsec,
site_factors=site_factors,
seed=seed,
not_empty=not_empty,
)
)
# increment seed +1
seed += 1
if met_fields is not None:
ds_list.append(
empty_met_fields(
output_locs,
nsec,
not_empty=not_empty,
met_fields=met_fields,
seed=seed,
)
)
pwc = xr.merge(ds_list, combine_attrs="override")
ustack_ds = _empty_unstack(pwc, is_scalar)
# return update_history(pwc)
return update_history(ustack_ds)
[docs]
def empty_wv_count(output_locs, nsec=12, nbins=30, not_empty=True, seed=9876538):
"""
Create empty wind vector count dataset.
If not_empty=True, the data variables are filled with meaninful random numbers.
Parameters
----------
output_loc : xarray.Dataset
Output geospatial information
nsec : int
Number of sectors, defaults to 12.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data, defaults to True.
seed : int
Seed for the random data, defaults to 9876538.
Returns
-------
ds : xarray.Dataset
Wind vector count dataset either empty or filled with
random numbers.
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs, nsec)
ds = xr.Dataset({"wv_count": da_dict["da_4d"]}, attrs=unstack_attrs)
wsbin_coords = create_ws_bin_coords(bin_width=1.0, nws=nbins)
ds["wv_count"] = ds["wv_count"].expand_dims({"wsbin": wsbin_coords.values})
ds = ds.assign_coords({**wsbin_coords.coords})
n_pt = len(ds["point"])
if not_empty:
rng = np.random.default_rng(seed)
k = rng.uniform(1.5, 2.5, [nsec, n_pt])
A = rng.uniform(5, 10, [nsec, n_pt])
wsbin_n = np.linspace(1, nbins, nbins)
wsbin_full = wsbin_n.repeat(nsec * n_pt).reshape((nbins, nsec, n_pt))
scaling_fact = rng.uniform(500, 1000, [nsec, n_pt])
wv_count_array = np.rint(
np.multiply(
scipy.stats.weibull_min.pdf(wsbin_full, k, scale=A),
scaling_fact[np.newaxis, :, :],
)
)
ds["wv_count"].values = wv_count_array
ustack_ds = _empty_unstack(ds, is_scalar)
ds = update_var_attrs(_copy_chunks(output_locs, ustack_ds), _HIS_ATTRS)
return update_history(ds)
[docs]
def empty_raster_map(output_locs, resolution, map_type="elevation"):
"""
Create empty raster map data array The values in the raster map are a 2d
gaussian and the boundaries are defined by output_locs.
Parameters
----------
output_locs : xarray.Dataset
A windkit spatial object that defines the bounding box to create a raster map.
resolution : float
Resolution of the raster map.
map_type : str
Raster map type. Available options are "elevation", "roughness", or "landcover",
defaults to "elevation.
Returns
-------
raster_da : xarray.DataArray
windkit raster map data array.
"""
# build the raster_structure
bb = wk.spatial.BBox.from_ds(output_locs)
raster_ds = bb.to_grid(resolution, 10.0) # this height is not used
raster_da = raster_ds["output"].isel(height=0, drop=True).rename(map_type)
# fill with a 2D gauss
def _get_gauss(sigma, mi, x):
return (1 / (sigma * np.sqrt(2 * np.pi))) * np.e ** (
-0.5 * np.square((x - mi) / sigma)
)
sigma = 20
mi = 0
val = [_get_gauss(sigma, mi, x) for x in range(-50, 50 + 1, 1)]
val = np.asarray(val)
Amp = 25000
for i, step in enumerate(val):
raster_da.values[i] = Amp * val[i] * val + 1
raster_da.values = np.round(raster_da.values)
raster_da = update_var_attrs(raster_da, _MAP_TYPE_ATTRS)
return update_history(raster_da)