# (c) 2022 DTU Wind Energy
"""Weibull wind climate module
When measuring over a long period the frequency of occurence of wind speed usually follows a
`Weibull distribution <https://en.wikipedia.org/wiki/Weibull_distribution>`_. It is therefore common
practice in the wind energy industry to use the Weibull *A* and *k*
parameters to denote the wind resource at a certain location.
Because there can be large differences in the wind climate when the wind is
coming from different wind directions, the Weibull distributions are usually specified
per sector.
A valid Weibull wind climate therefore has a dimension ``sector`` and the variables
``A``, ``k`` and ``wdfreq``. Also it must have a valid spatial structure. This module contains
functions that operate on and create weibull wind climates.
"""
__all__ = [
"validate_wwc",
"is_wwc",
"create_wwc",
"read_wwc",
"read_mfwwc",
"wwc_to_file",
"wwc_to_bwc",
"weibull_combined",
]
import io
import logging
import re
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
from scipy.special import gamma
from windkit.xarray_structures._validate import (
_create_is_obj_function,
_create_validation_wrapper_factory,
_create_validator,
)
from windkit.xarray_structures.empty import (
_copy_chunks,
_define_std_arrays,
_empty_unstack,
)
from windkit.xarray_structures.metadata import (
_BWC_ATTRS,
_WEIB_ATTRS,
_update_history,
_update_var_attrs,
)
from windkit.xarray_structures.sector import create_sector_coords
from windkit.xarray_structures.wsbin import create_wsbin_coords
from windkit.spatial import (
_raster,
count_spatial_points,
is_cuboid,
set_crs,
to_raster,
)
from windkit.utils import _infer_file_format
from windkit.weibull import fit_weibull_k_sumlogm, weibull_cdf, weibull_moment
SUPPORTED_WWC_FILE_FORMATS_READ = ["rsf", "wrg", "pwc", "nc"]
SUPPORTED_WWC_FILE_FORMATS_WRITE = ["rsf", "wrg"]
VAR_WEIBULL_A = "A"
VAR_WEIBULL_k = "k"
VAR_WDFREQ = "wdfreq"
DIM_SECTOR = "sector"
WRG_HEADER_PATTERN = re.compile(
r"\s*(\d+)\s+(\d+)\s+(-?\d+(?:\.\d+)?)\s+(-?\d+(?:\.\d+)?)\s+(\d+(?:\.\d+)?)\s*",
re.IGNORECASE,
)
DATA_VAR_DICT_WWC = {"A": ["sector"], "k": ["sector"], "wdfreq": ["sector"]}
REQ_DIMS_WWC = ["sector"]
REQ_COORDS_WWC = [
"sector",
"sector_ceil",
"sector_floor",
]
def _validate_values_range(wwc):
"""Helper function to validate data variables are within range"""
response_list = []
if "A" in wwc.data_vars and any(
xr.where(wwc.A.sum(dim="sector") <= 0.0, True, False).values.flatten()
):
response_list.append("Sum of A values not positive.")
if "k" in wwc.data_vars and any(
xr.where(wwc.k == 0.0, True, False).values.flatten()
):
response_list.append("At least one of the k values is zero.")
if "wdfreq" in wwc.data_vars and "sector" in wwc.wdfreq.dims:
sum_s = wwc["wdfreq"].sum(dim="sector")
if not np.allclose(sum_s, 1.0):
response_list.append("Wind direction frequency must sum to 1")
return response_list
validate_wwc = _create_validator(
variables=DATA_VAR_DICT_WWC,
dims=REQ_DIMS_WWC,
coords=REQ_COORDS_WWC,
extra_checks=[_validate_values_range],
)
_validate_wwc_wrapper_factory = _create_validation_wrapper_factory(validate_wwc)
is_wwc = _create_is_obj_function(validate_wwc)
[docs]
def create_wwc(output_locs, n_sectors=12, not_empty=True, seed=9876538, **kwargs):
"""Create empty weibull wind climate dataset.
If not_empty=True,the data variables are filled with meaninful random numbers, e.g.
the values from A are generated from a uniform function between 5
and 10 and the values for k from a uniform function between 1.5 and 2.5.
Parameters
----------
output_locs : xarray.Dataset
Output geospatial information
n_sectors : int
Number of sectors, defaults to 12.
not_empty : bool
If true, the empty dataset is filled with random
meaningful data. Defaults to True.
seed : int
Seed for the random data, defaults to 9876538.
kwargs : dict
Additional arguments.
Returns
-------
ds : xarray.Dataset
Weibull wind climate dataset either empty or filled with
random numbers.
"""
da_dict, unstack_attrs, is_scalar = _define_std_arrays(output_locs, n_sectors)
ds = xr.Dataset(
{"A": da_dict["da_4d"], "k": da_dict["da_4d"], "wdfreq": da_dict["da_4d"]},
attrs=unstack_attrs,
)
n_pt = len(ds["point"])
if not_empty:
rng = np.random.default_rng(seed)
k = rng.uniform(1.5, 2.5, [n_sectors, n_pt])
A = rng.uniform(5, 10, [n_sectors, n_pt])
ds["A"] = xr.DataArray(A, ds["A"].coords, ds["A"].dims)
ds["k"] = xr.DataArray(k, ds["k"].coords, ds["k"].dims)
ds["wdfreq"] = xr.DataArray(
rng.dirichlet(np.ones(n_sectors), n_pt).T,
ds["wdfreq"].coords,
ds["wdfreq"].dims,
)
ustack_ds = _empty_unstack(ds, is_scalar)
ds = _update_var_attrs(_copy_chunks(output_locs, ustack_ds), _WEIB_ATTRS)
return _update_history(ds)
def _has_wrg_header(infile, parse_header=False):
"""Check if a resource file has a WRG-style header
and optionally parse the params.
Parameters
----------
infile : str, pathlib.Path, io.StringIO
Input file to check
parse_header : bool, optional
If True, will attemp to parse the header params, by default False
Returns
-------
bool:
Whether the file has a wrg header
GridParams, optional:
Grid parameters parsed from the header, if parse_header=True
"""
if isinstance(infile, io.StringIO):
fobj = infile
else:
fobj = open(infile)
line = fobj.readline().strip()
if not isinstance(infile, io.StringIO):
fobj.close()
match = WRG_HEADER_PATTERN.match(line)
return bool(match)
def _infer_resource_file_nsec(infile, skip_first=True):
"""Infer the number of sectors in resource file by reading
column 70-72 and converting it to an integer.
Parameters
----------
infile : str, pathlib.Path, io.StringIO
Resource file to infer sectors from.
Returns
-------
int
Number of sectors
"""
if isinstance(infile, io.StringIO):
fobj = infile
else:
fobj = open(infile)
# Skip the first line
if skip_first:
fobj.readline()
# Read the second line
line = fobj.readline()
if not isinstance(infile, io.StringIO):
fobj.close()
return int(line[69:72]) # column 70-72 using python indexing
def _read_resource_file(
resource_file, crs, n_sectors=12, to_cuboid=False, use_production=False, **kwargs
):
"""Reads .wrg or .rsf file into a weibull wind climate dataset.
Parameters
----------
resource_file : str, pathlib.Path, io.StringIO
Path to resource file
crs : int, dict, str or CRS
Value to create CRS object or an existing CRS object
n_sectors : int
Number of sectors in file. Defaults to 12.
to_cuboid: boolean
If true, the dataset will be converted to the cuboid spatial
structure (dimensions south_north, west_east, height).
use_production: bool
If True, the column with power in the file is interpreted as power production,
i.e. stored as 'gross_aep' in the dataset. If False, it is stored as 'power_density'.
If the values in the file are power production, they are originally storedin Wh/y,
but they are saved with units GWh/y in the dataset. Defaults to False.
Returns
-------
wwc: xarray.Dataset
Weibull wind climate dataset.
"""
if crs is None:
raise ValueError("crs must be specified")
has_wrg_header = _has_wrg_header(resource_file)
n_sectors = _infer_resource_file_nsec(resource_file, skip_first=has_wrg_header)
df = pd.read_fwf(
resource_file,
widths=tuple([10, 10, 10, 8, 5, 5, 6, 15, 3] + [4, 4, 5] * n_sectors),
header=None,
skiprows=int(has_wrg_header),
)
power_col = "gross_aep" if use_production else "power_density"
header = [
"name",
"west_east",
"south_north",
"site_elev",
"height",
"A_combined",
"k_combined",
power_col,
"n_sectors",
]
for i in range(1, n_sectors + 1):
header += f"f_{i} A_{i} k_{i}".split()
df.columns = header
can_be_raster = _raster._can_be_raster(df["west_east"], df["south_north"])
df = df.set_index(["name"])
wwc = df.to_xarray()
wwc = wwc.assign_coords(point=(("name",), np.arange(len(df.index))))
wwc = wwc.swap_dims({"name": "point"})
wwc = wwc.drop_vars("point")
wwc = wwc.assign_coords(
west_east=(("point",), wwc.west_east.values),
south_north=(("point",), wwc.south_north.values),
height=(("point",), wwc.height.values),
)
knames = [f"k_{sec}" for sec in range(1, n_sectors + 1)]
Anames = [f"A_{sec}" for sec in range(1, n_sectors + 1)]
fnames = [f"f_{sec}" for sec in range(1, n_sectors + 1)]
wwc["k"] = xr.concat([wwc[n] for n in knames], dim="sector")
wwc["A"] = xr.concat([wwc[n] for n in Anames], dim="sector")
wwc["wdfreq"] = xr.concat([wwc[n] for n in fnames], dim="sector")
wwc["site_elev"] = wwc["site_elev"].astype(np.float64)
wwc["k"] = wwc["k"] / 100.0
wwc["A"] = wwc["A"] / 10.0
wwc["wdfreq"] = wwc["wdfreq"] / wwc["wdfreq"].sum(dim="sector", skipna=False)
wwc = wwc.drop_vars(
["n_sectors"]
+ [f"f_{sec}" for sec in range(1, n_sectors + 1)]
+ [f"A_{sec}" for sec in range(1, n_sectors + 1)]
+ [f"k_{sec}" for sec in range(1, n_sectors + 1)]
)
if use_production:
wwc["gross_aep"] = wwc["gross_aep"] / 1e9 # Wh/y -> GWh/y
wwc = set_crs(wwc, crs)
wdcenters = create_sector_coords(n_sectors)
wwc = wwc.assign_coords(**wdcenters.coords)
n_spatial = count_spatial_points(wwc)
if (not can_be_raster) and to_cuboid and (n_spatial > 1):
logging.warning(
"_read_resource_file: Data cannot be converted to raster, returning point."
)
if can_be_raster and to_cuboid or n_spatial == 1 and to_cuboid:
wwc = to_raster(wwc, ignore_raster_check=True)
if "elevation" in wwc.data_vars:
wwc["elevation"] = wwc["elevation"].isel(height=0)
wwc = _update_var_attrs(wwc, _WEIB_ATTRS)
return _update_history(wwc)
def _read_rsffile(
rsffile, crs=None, *, to_cuboid=False, use_production=False, **kwargs
):
"""Reads .rsf file into a weibull wind climate dataset.
Parameters
----------
rsffile : str, pathlib.Path, io.StringIO
Path to .rsf file
crs : int, dict, str or CRS
Value to create CRS object or an existing CRS object
to_cuboid: boolean
If true, the dataset will be converted to the cuboid spatial
structure (dimensions south_north, west_east, height).
use_production: bool
If True, the column with power in the file is interpreted as power production,
i.e. stored as 'gross_aep' in the dataset. If False, it is stored as 'power_density'.
If the values in the file are power production, they are originally storedin Wh/y,
but they are saved with units GWh/y in the dataset. Defaults to False.
Returns
-------
wwc: xarray.Dataset
Weibull wind climate dataset.
"""
return _read_resource_file(
rsffile, crs=crs, to_cuboid=to_cuboid, use_production=use_production, **kwargs
)
def _read_wrgfile(wrgfile, crs=None, *, to_cuboid=True, use_production=False, **kwargs):
"""Reads .wrg file into a weibull wind climate dataset.
Parameters
----------
wrgfile : str, pathlib.Path, io.StringIO
Path to .wrg file
crs : int, dict, str or CRS
Value to create CRS object or an existing CRS object
to_cuboid: boolean
If true, the dataset will be converted to the cuboid spatial
structure (dimensions south_north, west_east, height).
use_production: bool
If True, the column with power in the file is interpreted as power production,
i.e. stored as 'gross_aep' in the dataset. If False, it is stored as 'power_density'.
If the values in the file are power production, they are originally storedin Wh/y,
but they are saved with units GWh/y in the dataset. Defaults to False.
Returns
-------
wwc: xarray.Dataset
Weibull wind climate dataset.
"""
return _read_resource_file(
wrgfile, crs=crs, to_cuboid=to_cuboid, use_production=use_production, **kwargs
)
def _read_pwcfile(pwcfile, spatial_dataset):
"""Reads .pwc predicted wind climate file from WAsP in XML format.
The .pwc file does not include spatial information, so it must
be passed as a parameter.
Parameters
----------
pwcfile: str,pathlib.Path, io.StringIO
Path to .pwc file
spatial_dataset: xarray.Dataset
xarray dataset with the spatial info
"""
ds = pd.read_xml(
pwcfile, names=["index", "sector", "sector_width", "wdfreq", "A", "k"]
)
ds = ds.set_index("sector")[["A", "k", "wdfreq"]].to_xarray()
ds = ds.assign_coords(create_sector_coords(ds.sector.size).coords)
ds = ds.expand_dims(spatial_dataset.sizes)
ds = ds.assign_coords(spatial_dataset.coords)
wwc = _update_var_attrs(ds, _WEIB_ATTRS)
return _update_history(wwc)
def _can_be_int(x):
"""Check if x can be converted to int."""
try:
int(x)
return True
except ValueError:
return False
def _convert_to_int_or_fixed_decimals(x, decimals=1):
if _can_be_int(x):
return int(x)
else:
return np.round(x, decimals=decimals)
def _wrg_header(wwc):
"""Write WWC grid dimensions to WRG header with the format:
nx ny xmin ymin cell_size
Parameters
----------
wwc : xarray.Dataset
Weibull wind climate xarray dataset.
Returns
-------
str
WRG header.
"""
nx, ny = _raster._shape(wwc)
xmin = _convert_to_int_or_fixed_decimals(wwc.west_east.values.min(), decimals=1)
ymin = _convert_to_int_or_fixed_decimals(wwc.south_north.values.min(), decimals=1)
size = _convert_to_int_or_fixed_decimals(_raster._spacing(wwc), decimals=1)
return f" {nx:<13} {ny:<13} {xmin:>9} {ymin:>13} {size:>8}"
def _get_rsf_formatters(df, n_sectors=12, use_production=False, formatters=None):
"""Returns a list of functions to format the data in the RSF/WRG file.
Parameters
----------
df : pandas.DataFrame
Dataframe with the data to be formatted.
var_power : str
Name of the variable to be used as power/aep.
n_sectors : int
Number of sectors.
use_production : bool
If true, the gross_aep variable will be used instead of power_density.
formatters : dict
Dictionary with the variable names as keys and the functions to format
the data as values.
Returns
-------
list
List of functions to format the data in the RSF/WRG file.
"""
var_power = "gross_aep" if use_production else "power_density"
fixed_columns = [
"name",
"west_east",
"south_north",
"elevation",
"height",
"A_combined",
"k_combined",
var_power,
"n_sectors",
]
_formatters_default = {
"name": lambda x: f"{x:<10}", # 10
"west_east": lambda x: f"{x:>9.1f}", # 10
"south_north": lambda x: f"{x:>9.1f}", # 10
"elevation": lambda x: f"{x:>7.0f}", # 8
"height": lambda x: f"{x:>4.1f}", # 5
"A_combined": lambda x: f"{x:>4.2f}", # 5
"k_combined": lambda x: f"{x:>5.3f}", # 6
"power_density": lambda x: f"{x:>14.4f}", # 15
"gross_aep": lambda x: f"{x:>14.0f}", # 15
"n_sectors": lambda x: f"{x:>2}", # 3
"A": lambda x: f"{x:>3.0f}", # 4
"k": lambda x: f"{x:>4.0f}", # 4
"wdfreq": lambda x: f"{x:>3.0f}", # 5
}
if formatters is None:
formatters = _formatters_default
else:
formatters = {**_formatters_default, **formatters}
#
# INDIVIDUAL CHECKS AND FORMATTER ADJUSTMENTS
#
def _check_max(df, var, maxval):
vmax = np.abs(df[var]).max()
if vmax > maxval:
raise ValueError(
f"The {var} of one or more points is larger than {maxval}. "
f"Please check the {var}."
)
# name: 10(10)
if df["name"].str.len().max() > 10:
raise ValueError(
"The name of one or more points is longer than 10 characters. "
"Please shorten the names."
)
# west_east: 10(9)
_check_max(df, "west_east", 99999999)
if np.abs(df["west_east"]).max() > 999999:
formatters["west_east"] = lambda x: f"{x:>9.0f}"
# south_north: 10(9)
_check_max(df, "south_north", 99999999)
if np.abs(df["south_north"]).max() > 999999:
formatters["south_north"] = lambda x: f"{x:>9.0f}"
# elevation: 8(7)
_check_max(df, "site_elev", 999999)
if np.abs(df["site_elev"]).max() > 9999:
formatters["elevation"] = lambda x: f"{x:>7.0f}"
# height: 5(4)
_check_max(df, "height", 9999)
# When heights are > 100m, the height needs to be written as an integer
if df["height"].max() > 100.0:
formatters["height"] = lambda x: f"{x:>4.0f}"
# A_combined: 5(4)
_check_max(df, "A_combined", 9999)
if df["A_combined"].max() >= 10.0:
formatters["A_combined"] = lambda x: f"{x:>4.1f}"
elif df["A_combined"].max() >= 100.0:
formatters["A_combined"] = lambda x: f"{x:>4.0f}"
# k_combined: 6(5)
_check_max(df, "k_combined", 99999)
if df["k_combined"].max() >= 10.0:
formatters["k_combined"] = lambda x: f"{x:>5.2f}"
elif df["k_combined"].max() >= 100.0:
formatters["k_combined"] = lambda x: f"{x:>5.1f}"
elif df["k_combined"].max() >= 1000.0:
formatters["k_combined"] = lambda x: f"{x:>5.0f}"
# power_density / gross_aep: 15(14)
_check_max(df, var_power, 99999999999999)
# When all values are missing for power/aep, i.e. valued as -9999,
# the value needs to be written as an integer
if all(df[var_power] == -9999):
formatters[var_power] = lambda x: f"{x:>14.0f}"
if df[var_power].max() >= 10000000000:
formatters[var_power] = lambda x: f"{x:>14.1f}"
elif df[var_power].max() >= 100000000000:
formatters[var_power] = lambda x: f"{x:>14.0f}"
# n_sectors: 3(2)
_check_max(df, "n_sectors", 99)
for isec in range(n_sectors):
# A: 4(3)
_check_max(df, str(("A", isec)), 999)
# k: 4(3)
_check_max(df, str(("k", isec)), 999)
# wdfreq: 5(3)
_check_max(df, str(("wdfreq", isec)), 9999)
formatters_list = []
for col in fixed_columns:
formatters_list.append(formatters[col])
for isec in range(n_sectors):
formatters_list.append(formatters["wdfreq"])
formatters_list.append(formatters["A"])
formatters_list.append(formatters["k"])
return formatters_list
def _to_resource_file(
wwc,
/,
rsffile,
wrg_header=False,
use_production=False,
formatters=None,
**kwargs,
):
"""Write weibull wind climate dataset to a resource file (.rsf or .wrg).
Parameters
----------
wwc : xarray.Dataset
Weibull wind climate xarray dataset.
rsffile : str
Path to resource file
wrg_header : bool
If True, the WRG header will be added to the file.
Requires a cuboid dataset.
use_production : bool
If true, the "gross_aep" variable will be used instead of "power_density".
formatters : dict
Dictionary with the variable names as keys and the functions to format
the data as values.
For each variable the expected widths in the formatter are:
name: 10
west_east: 9
south_north: 9
elevation: 7
height: 4
A_combined: 4
k_combined: 5
power_density: 14
gross_aep: 14
n_sectors: 2
A: 3
k: 4
wdfreq: 3
A space is added after each variable. Meaning the effective
width of the string representation of the variable is the
width in the formatter plus one (except for the first variable)
A formatter can look like:
formatters = {
"west_east": lambda x: f"{x:<9.2f}",
}
In this case the width of the string representation of the
west_east variable will be 10 (9+1). And the number of decimals
will be 2 instead of the default 1. If a int representation
is desired, "{x:<9.0f}" can be used.
"""
# Check if wwc has unsupported dimensions
approved_dims = [
"sector",
"point",
"stacked_point",
"south_north",
"west_east",
"height",
]
if any([dim not in approved_dims for dim in wwc.dims]):
raise ValueError(
f"Unsupported dimensions in wwc: {wwc.dims}. "
f"Supported dimensions: {approved_dims}"
)
if use_production and "gross_aep" not in wwc.data_vars:
raise ValueError(
"The gross_aep variable is required to write a resource file with "
"use_production=True."
)
if use_production:
var_power = "gross_aep"
else:
var_power = "power_density"
wwc_cp = wwc.copy()
if wrg_header:
if not is_cuboid(wwc):
raise ValueError("WWC must be a 'cuboid' to add WRG header!")
header = _wrg_header(wwc)
# I feel like it would be cleaner to throw a key error or we always
# return the parameters required from `downscale()`. But adding the vars
# as needed works as well.
if "A_combined" not in wwc_cp.data_vars:
wwc_cp[["A_combined", "k_combined"]] = weibull_combined(wwc_cp)
if "wspd" not in wwc_cp.data_vars:
wwc_cp["wspd"] = _mean_wind_speed(wwc_cp)
if "power_density" not in wwc_cp.data_vars:
try:
air_dens = wwc_cp["air_density"]
except KeyError:
air_dens = 1.225
wwc_cp["power_density"] = _mean_power_density(
wwc_cp, air_density=air_dens, bysector=False
)
# remove unneeded vars
wwc_cp = wwc_cp.drop_vars(
["sector", "sector_ceil", "sector_floor", "crs"], errors="ignore"
)
n_sectors = wwc_cp.sizes["sector"]
# round values in the order that fits best with WAsP values
wwc_cp["A"] = wwc_cp["A"].round(decimals=1) * 10.0
wwc_cp["k"] = wwc_cp["k"].round(decimals=2) * 100.0
wwc_cp["wdfreq"] = wwc_cp["wdfreq"].round(decimals=3) * 1000.0
wwc_cp["site_elev"] = wwc_cp["site_elev"].astype(np.int16)
wwc_cp["n_sectors"] = xr.full_like(wwc_cp["site_elev"], n_sectors, dtype=np.int16)
if use_production:
wwc_cp["gross_aep"] = wwc_cp["gross_aep"] * 1e9 # GWh/y -> Wh/y
# select variables that do not depend on wind direction sector
vars = [
"name",
"west_east",
"south_north",
"site_elev",
"height",
"A_combined",
"k_combined",
var_power,
"n_sectors",
]
df = wwc_cp[
["site_elev", "A_combined", "k_combined", var_power, "n_sectors"]
].to_dataframe()
if "name" not in df.columns:
df["name"] = "GridPoint" # insert text column at first position
# select variables that depend on wind direction sector
sec_vars = [
str((var, sec)) for sec in range(n_sectors) for var in ["wdfreq", "A", "k"]
]
df_sec = wwc_cp[["wdfreq", "A", "k"]].to_dataframe()
df_sec = df_sec.pivot_table(
index=df.index.names, columns="sector", values=["wdfreq", "A", "k"]
)
# merge all-sector and sectorwise values
df_total = (
pd.concat([df, df_sec], axis=1)
.reset_index()
.drop(["point", "stacked_point"], axis=1, errors="ignore")
) # concat combined and sectorwise values
# transform all column names to string to make it compatible with pandas 1.4
df_total.columns = [str(x) for x in df_total.columns]
df_total = df_total[vars + sec_vars] # select vars in correct order
formatters_list = _get_rsf_formatters(
df_total,
n_sectors=n_sectors,
use_production=use_production,
formatters=formatters,
)
str_list = df_total.to_string(
header=False,
index=False,
index_names=False,
formatters=formatters_list,
)
def _is_char_21_blank(s):
"""Check that character 21 is blank."""
if s[20] != " ":
return False
else:
return True
def _remove_initial_blank_space(s):
"""Remove initial blank space from each line of the string to be written.
This is required for the .rsf/.wrg format to get the correct widths written to file.
"""
sout = ""
for line in s.split("\n"):
if line[0] == " ":
sout += line[1:] + "\n"
return sout
def _check_row_widths(s, n_sectors=12):
"""Check that the row widths are correct for the number of sectors."""
width_expected = 72 + n_sectors * 13
for line in s.split("\n")[:-1]:
width = len(line)
if width != width_expected:
raise ValueError(
f".rsf/.wrg row width is {width} Expected {width_expected} for {n_sectors} sectors!\nare the formatters correct?"
)
def _check_ncols(s, n_sectors=12):
"""Check that the number of columns is correct read back in for the number of sectors."""
ncols_expected = 9 + n_sectors * 3
df = pd.read_fwf(
io.StringIO(s),
widths=tuple([10, 10, 10, 8, 5, 5, 6, 15, 3] + [4, 4, 5] * n_sectors),
header=None,
)
ncols = df.shape[1]
if ncols != ncols_expected:
raise ValueError(
f".rsf/.wrg has {ncols} columns. Expected {ncols_expected} for {n_sectors} sectors!"
)
# check that character 21 is blank and remove initial blank space if not
# This problem is related to the pandas version (or a dependency of pandas)
if not _is_char_21_blank(str_list):
str_list = _remove_initial_blank_space(str_list)
_check_row_widths(str_list, n_sectors=n_sectors)
_check_ncols(str_list, n_sectors=n_sectors)
with open(rsffile, "w", newline="\r\n") as text_file:
if wrg_header:
text_file.write(header + "\n")
text_file.write(str_list)
def _to_rsffile(
wwc, rsffile, wrg_header=False, use_production=False, formatters=None, **kwargs
):
"""Write weibull wind climate dataset to .rsf file.
Parameters
----------
wwc : xarray.Dataset
Weibull wind climate xarray dataset.
rsffile: str
Path to .rsf file
"""
return _to_resource_file(
wwc,
rsffile,
wrg_header=wrg_header,
use_production=False,
formatters=formatters,
**kwargs,
)
def _to_wrgfile(wwc, wrgfile, use_production=False, formatters=None):
"""Write weibull wind climate dataset to .wrg file.
Parameters
----------
wwc : xarray.Dataset
Weibull wind climate xarray dataset.
wrgfile: str
Path to .wrg file
"""
return _to_resource_file(
wwc, wrgfile, wrg_header=True, use_production=False, formatters=formatters
)
def _read_grdfiles(grdfiles, *, regex_pattern=None, regex_var_order=None):
"""Reads a .grd file into a weibull wind climate dataset.
Parameters
----------
grdfiles: str or list
path of .grd file or list of .grd files.
regex_pattern: re str
Filename regex pattern to extract height, sector, and variable name.
Defaults to None.
regex_var_order: list or tuple
Order of 'height', 'sector', and 'var' in regex_pattern. Defaults to None.
Returns
-------
wwc: xarray.Dataset
Weibull wind climate dataset.
"""
def _rename_var(var):
"""
Function to rename WAsP variable names to short hand name
"""
_rename = {
"Flow inclination": "flow_inclination",
"Mean speed": "wspd",
"Meso roughness": "z0meso",
"Obstacles speed": "obstacle_speedups",
"Orographic speed": "orographic_speedups",
"Orographic turn": "orographic_turnings",
"Power density": "power_density",
"RIX": "rix",
"Roughness changes": "nrch",
"Roughness speed": "roughness_speedups",
"Sector frequency": "wdfreq",
"Turbulence intensity": "turbulence_intensity",
"Weibull-A": "A",
"Weibull-k": "k",
"Elevation": "site_elev",
}
return _rename[var]
def _read_grd_data(filename):
def _parse_line_floats(f):
return [float(i) for i in f.readline().strip().split()]
def _parse_line_ints(f):
return [int(i) for i in f.readline().strip().split()]
with open(filename, "rb") as f:
_ = f.readline().strip().decode() # file_id
nx, ny = _parse_line_ints(f)
xl, xu = _parse_line_floats(f)
yl, yu = _parse_line_floats(f)
zl, zu = _parse_line_floats(f)
values = np.genfromtxt(f)
xarr = np.linspace(xl, xu, nx)
yarr = np.linspace(yl, yu, ny)
# note that the indexing of WAsP grd file is 'xy' type, i.e.,
# values.shape == (xarr.shape[0], yarr.shape[0])
# we need to transpose values to match the 'ij' indexing
values = values.T
return xarr, yarr, values
def _parse_grdfile(grdfile, regex_pattern=None, regex_var_order=None):
match = re.findall(regex_pattern, grdfile.name)[0]
meta = {k: v for k, v in zip(regex_var_order, match)}
meta["var"] = _rename_var(meta["var"])
xarr, yarr, values = _read_grd_data(grdfile)
dims = ["west_east", "south_north", "height"]
coords = {
"height": [float(meta["height"])],
"x": (("west_east",), xarr),
"y": (("south_north",), yarr),
"west_east": (("west_east",), xarr),
"south_north": (("south_north",), yarr),
}
values = values[..., np.newaxis]
if not meta["sector"].lower() == "all":
dims += ["sector"]
coords["sector"] = [int(meta["sector"])]
values = values[..., np.newaxis]
else:
if meta["var"] != "site_elev":
meta["var"] = meta["var"] + "_combined"
da = xr.DataArray(values, dims=dims, coords=coords, name=meta["var"])
if da.name == "site_elev":
da = da.isel(height=0, drop=True)
return da
if not isinstance(grdfiles, list):
grdfiles = [grdfiles]
grdfiles = list(Path(f) for f in grdfiles)
if regex_pattern is None:
regex_pattern = r"Sector (\w+|\d+) \s+ Height (\d+)m \s+ ([a-zA-Z0-9- ]+)"
if regex_var_order is None:
regex_var_order = ("sector", "height", "var")
wwc = xr.merge(
[
_parse_grdfile(
grdfile, regex_pattern=regex_pattern, regex_var_order=regex_var_order
)
for grdfile in grdfiles
]
)
ds = _update_var_attrs(wwc, _WEIB_ATTRS)
return _update_history(ds)
[docs]
def read_wwc(filename, *, file_format="infer", **kwargs):
"""Read a weibull wind climate dataset from a file.
Parameters
----------
filename : str or pathlib.Path
Path to the file to read the dataset from.
file_format : str
File format of the file. If "infer", the file format is inferred from the file extension.
Supported file formats are "rsf", "wrg", "grd", and "nc". Defaults to "infer".
**kwargs
Additional keyword arguments passed to the read function
Returns
-------
xarray.Dataset
Weibull wind climate dataset.
"""
if file_format == "infer":
file_format = _infer_file_format(filename)
if file_format == "rsf":
ds = _read_rsffile(filename, **kwargs)
elif file_format == "wrg":
ds = _read_wrgfile(filename, **kwargs)
elif file_format == "grd":
ds = _read_grdfiles(filename, **kwargs)
elif file_format == "nc":
ds = xr.open_dataset(filename, **kwargs)
else:
raise ValueError(f"Unsupported file format: {file_format}")
validate_wwc(ds, run_extra_checks=False)
ds = _update_var_attrs(ds, _WEIB_ATTRS)
return _update_history(ds)
[docs]
def read_mfwwc(files, file_format="infer", **kwargs):
"""Read multiple weibull wind climate datasets from files.
Parameters
----------
files : list of str or pathlib.Path
Paths to the files to read the datasets from.
file_format : str
File format of the files. If "infer", the file format is inferred from the file extension.
Supported file formats are "rsf", "wrg", and "grd". Defaults to "infer".
**kwargs
Additional keyword arguments passed to the read function
Returns
-------
xarray.Dataset
Weibull wind climate dataset.
"""
if file_format == "infer":
file_format = _infer_file_format(files[0])
if file_format == "grd":
ds = _read_grdfiles(files, **kwargs)
else:
raise ValueError(f"Unsupported file format: {file_format}")
validate_wwc(ds, run_extra_checks=False)
ds = _update_var_attrs(ds, _WEIB_ATTRS)
return _update_history(ds)
[docs]
@_validate_wwc_wrapper_factory(run_extra_checks=False)
def wwc_to_file(wwc, filename, *, file_format="infer", **kwargs):
"""Write a weibull wind climate dataset to a file.
Parameters
----------
wwc : xarray.Dataset
Weibull wind climate dataset.
filename : str or pathlib.Path
Path to the file to write the dataset to.
file_format : str
File format of the file. If "infer", the file format is inferred from the file extension.
Supported file formats are "rsf", "wrg", and "nc". Defaults to "infer".
**kwargs
Additional keyword arguments passed to the write function
"""
if file_format == "infer":
file_format = _infer_file_format(filename)
if file_format == "rsf":
_to_rsffile(wwc, filename, **kwargs)
elif file_format == "wrg":
_to_wrgfile(wwc, filename, **kwargs)
else:
raise ValueError(f"Unsupported file format: {file_format}")
[docs]
@_validate_wwc_wrapper_factory(run_extra_checks=False)
def weibull_combined(wwc, atol=0.000001):
"""Return the all sector A & k.
This is known as the combined weibull A and k in the
WAsP GUI. For more information, see here:
https://www.wasp.dk/support/frequently-asked-questions/wasp-faq/emergent-and-combined-all-sector-weibull-distributions
Using the combined weibull A and k are calculated
using first and third moment conservation rules.
Parameters
----------
wwc: xarray.Dataset
Weibull Wind Climate dataset.
Returns
-------
tuple of xr.DataArray
All sector A & k DataArrays
"""
sum1 = (wwc["wdfreq"] * weibull_moment(wwc["A"], wwc["k"], 1)).sum(
dim="sector", skipna=False
)
sum3 = (wwc["wdfreq"] * weibull_moment(wwc["A"], wwc["k"], 3)).sum(
dim="sector", skipna=False
)
sum1 = sum1 / wwc["wdfreq"].sum(dim="sector", skipna=False)
sum3 = sum3 / wwc["wdfreq"].sum(dim="sector", skipna=False)
sum_logm = np.log(sum3) / 3.0 - np.log(sum1)
k_combined = fit_weibull_k_sumlogm(
sum_logm, order_m_first=1, order_m_higher=3, atol=atol
)
A_combined = sum1 / gamma(1.0 + 1.0 / k_combined)
return A_combined, k_combined
def _mean_ws_moment(wwc, moment=1, *, bysector=False):
"""Calculate the mean wind speed from a weibull wind climate dataset.
Parameters
----------
wwc: xarray.Dataset
Weibull Wind Climate dataset.
moment: int
Moment to calculate. Defaults to 1.
bysector: bool
Return results by sector or as an all-sector value. Defaults to False.
Returns
-------
xarray.DataArray
DataArray with the mean wind speed.
"""
moment = weibull_moment(wwc[VAR_WEIBULL_A], wwc[VAR_WEIBULL_k], moment)
if not bysector:
moment = (moment * wwc[VAR_WDFREQ]).sum(dim=DIM_SECTOR, skipna=False)
return moment
def _mean_wind_speed(wwc, *, bysector=False):
"""Calculate the mean wind speed from a weibull wind climate dataset.
Parameters
----------
wwc: xarray.Dataset
Weibull Wind Climate dataset.
bysector: bool
Return results by sector or as an all-sector value. Defaults to False.
Returns
-------
xarray.DataArray
DataArray with the mean wind speed.
"""
return _mean_ws_moment(wwc, moment=1, bysector=bysector)
def _mean_power_density(wwc, *, air_density=1.225, bysector=False):
"""Calculate the power density
Parameters
----------
wwc: xarray.Dataset
Weibull wind climate dataset.
air_density : float
Air density.
bysector: bool
Return sectorwise mean wind speed if True. defaults to False.
Returns
pd : xarray.DataArray
Data array with power density.
"""
return 0.5 * air_density * _mean_ws_moment(wwc, moment=3, bysector=bysector)
[docs]
@_validate_wwc_wrapper_factory(run_extra_checks=False)
def wwc_to_bwc(wwc, ws_bins):
"""Creates object from directional A's and k's.
Parameters
----------
wwc: xarray.Dataset
Weibull wind climate xr.Dataset object
ws_bins: np.array
Wind speed bin edges
Returns
-------
bwc : xarray.Dataset
binned wind climate from a Weibull distribution.
"""
wwc = wwc.copy() # make a copy to avoid modifications leaving scope
ws_bins = xr.DataArray(ws_bins, dims=("wsbin",))
cdfs = weibull_cdf(wwc.A, wwc.k, ws_bins)
ws_freq = cdfs.isel(wsbin=slice(1, None)) - cdfs.isel(wsbin=slice(None, -1))
ws_freq = ws_freq / ws_freq.sum(dim="wsbin")
ws_freq = ws_freq.fillna(0.0)
bwc = wwc[
[v for v in wwc.data_vars if v not in ["A", "k"]]
] # pass through other variables
bwc["wsfreq"] = ws_freq
wscenters = create_wsbin_coords(ws_bins)
bwc = bwc.assign_coords(
{
**wscenters.coords,
}
)
bwc = _update_var_attrs(bwc, _BWC_ATTRS)
return _update_history(bwc)
def _ws_cdf(wwc, **kwargs):
"""Calculates wind speed cumulative distribution function from weibull wind climate dataset."""
raise NotImplementedError("This function is not yet implemented.")
def _ws_freq_gt_mean(wwc, **kwargs):
"""Calculates wind speed frequency greater than mean from weibull wind climate dataset."""
raise NotImplementedError("This function is not yet implemented.")