Source code for pywasp.wasp.topography

# pylint:extension-pkg-whitelist:pywasp.core.Rvea0287
"""
Calculate speedups and other map properties using WAsP tools.

This module contains the TopgraphyMap class which contains both roughness
and elevation maps. It provides the interfaces to the WAsP fortran routines
that calculate terrain effects. These terrain effects are required at the
input and output locations.

The flow of this model is as follows:

.. graphviz:: topography.dot

Methods to create elevation, roughness roses or site effects all require
locations to be one of the spatial windkit structures.
"""

import inspect
import logging
import warnings
import zipfile
import json
import tempfile
from pathlib import Path

import geopandas as gpd
import numpy as np
import xarray as xr

import windkit as wk
from windkit.metadata import (
    update_var_attrs,
    _ROU_ROSE_ATTRS,
    _ELEV_ROSE_ATTRS,
    _TOPO_EFFECTS_ATTRS,
    _TOPO_CFD_EFFECTS_ATTRS,
    update_history,
)
from windkit._vectormap_helpers import explode_gdf, _is_lc, VECTORMAP_ID_COLS
from windkit.sector import create_sector_coords
from windkit.spatial import (
    create_dataset,
    get_crs,
    spatial_stack,
    spatial_unstack,
    count_spatial_points,
    to_stacked_point,
    BBox,
)

from . import Config
from .._cfg import _config as usr_config
from ..core import Rvea0287
from ..io.vectormap import WaspVectorMap, waspformat_to_vectormap
from ..io.rastermap import WaspRasterMap, rastermap_to_waspformat
from .._errors import _handle_fort_errors

_FORT_GET_INDICES = Rvea0287.calc_points.get_indices
_FORT_GPD_ROU_ROSE_PTS = Rvea0287.calc_points.gpd_rou_rose_points
_FORT_ELEV_ROSE_PTS_VECT_GPD = Rvea0287.calc_points.gpd_elev_rose_points
_FORT_ELEV_ROSE_PTS_GRID = Rvea0287.calc_points.elev_rose_points_grid
_FORT_ROSE_SITE_FACTORS = Rvea0287.calc_points.rose_site_factors_points
_FORT_GPD_SITE_FACTORS = Rvea0287.calc_points.gpd_site_factors_points
_FORT_RAST_GPD_SITE_FACTORS = Rvea0287.calc_points.rastermap_gpd_site_factors_points
_FORT_EMPTY_BZ = Rvea0287.bz_grid_create_mod.create_empty_bz_grid_slack
_FORT_CFD_SITE_FACTORS = Rvea0287.calc_points_cfd.cfd_site_factors_points


logger = logging.getLogger(__name__)


[docs] class TopographyMap(object): """Class for topography maps This is a helper class that allows the calculation of roughness and elevation roses or site effects at specific points. Parameters ---------- elev_map : gpd.GeoDataFrame or xarray.DataArray Elevation map rou_map : gpd.GeoDataFrame Roughness map Notes ----- Multi-part gpd.GeoDataFrame are always converted to single-part GeoDataFrames. """
[docs] def __init__(self, elev_map, rou_map, lctable=None): if isinstance(elev_map, WaspVectorMap): warnings.warn( "Using WaspVectorMap is deprecated. Future version of pywasp will only support geopandas for vectormaps!", FutureWarning, ) if elev_map.n_lines == 0: raise ValueError("Elevation map is empty!") elif elev_map.gx[0] != 0: # TODO: this test fails to scans further in the file if there might # be lines with a value for roughness, it only scans the first line raise ValueError( "Elevation map has a value for roughness, " + "check that you are passing roughness map" + " then elevation map to TopographyMap." ) elev_map = waspformat_to_vectormap(elev_map) elif isinstance(elev_map, gpd.GeoDataFrame): elev_map = explode_gdf(elev_map) elif isinstance(elev_map, xr.DataArray) or isinstance(elev_map, WaspRasterMap): elev_map = rastermap_to_waspformat(elev_map) if isinstance(rou_map, WaspVectorMap): warnings.warn( "Using WaspVectorMap is deprecated. Future version of pywasp will only support geopandas for vectormaps!", FutureWarning, ) if rou_map.n_lines == 0: # empty roughness maps are fine raise ValueError("Roughness map is empty!") elif rou_map.gx[2] != 0: # TODO: this test fails to scans further in the fine if there might # be lines with a value for elevation, it only scans the first line raise ValueError( "Roughness map has a value for elevation, " + "check that you are passing roughness map" + " then elevation map to TopographyMap." ) rou_map = waspformat_to_vectormap(rou_map, lctable=lctable) elif isinstance(rou_map, gpd.GeoDataFrame): rou_map = explode_gdf(rou_map) else: raise ValueError("Roughness map must be a geopandas.GeoDataFrame") if _is_lc(rou_map): nr_classes = len(lctable) if nr_classes > 100: raise ValueError( f"The maximum number of landcover classes allowed in a landcover map is 100, you have {nr_classes}" ) # make sure all id's in the map are represented in the lctable for col in VECTORMAP_ID_COLS: for i in rou_map[col].unique(): if i not in lctable: ids = ",".join(rou_map.index[rou_map[col] == i].astype(str)) ids_table = ",".join([str(j) for j in lctable.keys()]) raise KeyError( f"Landcover line(s) {ids} ({col}: {i}) in your map doesn't have a corresponding entry in the landcover table keys ({ids_table})." ) self.rou_map = rou_map self.elev_map = elev_map self.lctable = lctable
def __repr__(self): return "Roughness map\n{0.rou_map}\nElevation map\n{0.elev_map}".format(self) def __str__(self): return self.__repr__()
[docs] def save(self, filename): """ Save a TopographyMap to a zip file. In the zipfile, roughness/landcover maps are saved as a GeoDataFrame, elevation maps are saved as a GeoDataFrame or a netcdf file depending on the type of map. The landcover table is saved as a json file. Parameters ---------- filename : str, Path Name of the zip file to save to """ with tempfile.TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) with zipfile.ZipFile(filename, "w") as myzip: if isinstance(self.elev_map, gpd.GeoDataFrame): wk.vector_map_to_file( self.elev_map, tmpdir / "elev_map.gpkg", driver="GPKG" ) myzip.write(tmpdir / "elev_map.gpkg", "elev_map.gpkg") elif isinstance(self.elev_map, WaspRasterMap): elev_map = self.elev_map._to_rastermap() myzip.writestr("elev_map.nc", elev_map.to_netcdf()) else: raise ValueError( "Elevation map must be a geopandas.GeoDataFrame or xarray.DataArray object" ) if isinstance(self.rou_map, gpd.GeoDataFrame): wk.vector_map_to_file( self.rou_map, tmpdir / "rou_map.gpkg", lctable=self.lctable, driver="GPKG", ) myzip.write(tmpdir / "rou_map.gpkg", "rou_map.gpkg") else: raise ValueError( "Roughness map must be a geopandas.GeoDataFrame object" ) if self.lctable is not None: myzip.writestr("lctable.json", json.dumps(self.lctable))
[docs] @classmethod def load(cls, filename): """ Load a TopographyMap from a zip file. In the zipfile, roughness/landcover maps are saved as a GeoDataFrame, elevation maps are saved as a GeoDataFrame or a netcdf file depending on the type of map. The landcover table is saved as a json file. Parameters ---------- filename : str, Path Name of the zip file to load from Returns ------- TopographyMap """ with zipfile.ZipFile(filename, "r") as myzip: namelist = myzip.namelist() if "elev_map.gpkg" in namelist: elev_map = wk.read_vector_map(myzip.open("elev_map.gpkg")) elif "elev_map.nc" in namelist: elev_map = xr.open_dataarray(myzip.open("elev_map.nc")) else: raise ValueError("Elevation map not found in zip file") if "rou_map.gpkg" in namelist: rou_map = wk.read_vector_map(myzip.open("rou_map.gpkg")) else: raise ValueError("Roughness map not found in zip file") if "lctable.json" in namelist: data = json.load(myzip.open("lctable.json")) data = {int(k): v for k, v in data.items()} # convert keys to int lctable = wk.LandCoverTable(data) else: lctable = None return cls(elev_map, rou_map, lctable=lctable)
[docs] def get_elev_rose(self, output_locs, nsecs=12, conf=None): """ Create elevation rose for a set of output locations from a given map .. warning:: This function is experimental and its signature may change. Parameters ---------- output_locs : xarray.Dataset xarray dataset with location information according to the PyWAsP standard (south_north, west_east, height). nsecs: int Number of sectors conf : :any:`Config` Configuration information from WAsP Returns ------- _ElevRose Elevation rose at requested locations as a stacked_point structure on the map CRS. Notes ----- This method allows both vector and raster maps. It tries first the standard way using sq_polint (vector maps), if that fails it tries to use grid-polint This calls sq_polint to get the RIX parameters, which are then used in sqmasage to calculate the expansion coefficients for the BZ model at a given point on the map. This routine does not treat displacements heights, for that use `get_site_effects` """ if conf is None: conf = Config() mp = self.elev_map nsecs = _nsecs_or_sector_dim(output_locs, nsecs) # Roses are 2D parameters, and are converted to point objects on the map crs work_locs = spatial_stack(output_locs, mp.crs, remove_height=True) work_locs = to_stacked_point(work_locs) if not _locs_finite(work_locs): raise ValueError( "NaN or Inf values found in west_east or south_north coordinates." ) if not _map_finite(mp): raise ValueError( "NaN or Inf values found in vector map, please check your data." ) # Different functions depending on the type of map if isinstance(mp, gpd.GeoDataFrame): xmin, ymin, xmax, ymax = mp.total_bounds coords = mp.get_coordinates() results = _FORT_ELEV_ROSE_PTS_VECT_GPD( coords.index, coords.x.values, coords.y.values, mp["elev"].values, work_locs.west_east, work_locs.south_north, nsecs, conf.terrain.param, ) elif isinstance(mp, WaspRasterMap): xmin, ymin, xmax, ymax = mp.bbox.bounds() results = _FORT_ELEV_ROSE_PTS_GRID( mp.values, xmin, ymin, xmax, ymax, mp.nx, mp.ny, work_locs.west_east, work_locs.south_north, nsecs, conf.terrain.param, ) # Build dataset result_names = ( "site_elev", "rix", "dirrix", "flow_sep_height", "nray", "grid", "c", "s", "w", "hr1", "hi1", "hi2", "r", ) dim_names = ( [("stacked_point")] * 2 + [("sector", "stacked_point")] * 2 + [[]] + [("radials_x_radial_dist", "stacked_point")] + [("radials", "stacked_point")] * 3 + [("radial_dist", "sector", "stacked_point")] * 3 + [("stacked_point")] ) elev_rose = create_dataset( work_locs.coords["west_east"], work_locs.coords["south_north"], 0, get_crs(work_locs), "stacked_point", ).drop_dims("height") for name, dimname, rslt in zip(result_names, dim_names, results): elev_rose[name] = (dimname, rslt) elev_rose = elev_rose.assign_coords( { **create_sector_coords(nsecs).coords, } ) elev_rose = update_var_attrs(elev_rose, _ELEV_ROSE_ATTRS) return elev_rose.transpose( "radial_dist", "radials_x_radial_dist", "radials", "sector", "stacked_point" )
[docs] def get_rou_rose(self, output_locs, nsecs=None, conf=None, elev_rose=None): """Create a roughness rose xarray object for all points .. warning:: This function is experimental and its signature may change. Parameters ---------- output_locs : xarray.Dataset xarray dataset with location information according to the PyWAsP standard (south_north, west_east, height). nsecs: int Number of sectors, default use sector dimension from output_locs conf : :any:`Config` Configuration information from WAsP, default use default configuration elev_rose: xarray.Dataset The displacement height due to forest is added to the elevation rose. If None, a dummy elev_rose is created and passed into the fortran but not used. Returns ------- tuple (xarray.Dataset,xarray.Dataset or None) A tuple where the first element is a dataset with the roughness roses at all points, and the second element is a dataset with the updated elevation rose. The second element is only returned if an elev_rose is defined, otherwise the second element is None. Notes ----- This calls the internal rou_rose function. """ def _create_bz_like(work_locs, conf): """Create a dummy elevation rose for work_locs points Parameters ---------- work_locs : xarray.Dataset xarray dataset with location information according to the PyWAsP standard (south_north, west_east, height). conf : :any:`Config` Configuration information from WAsP, default use default configuration Returns ------- xr.Dataset Roughness roses at all points. If an elev_rose is defined it will also return the updated elevation rose. """ elev_rose = create_dataset( work_locs.coords["west_east"], work_locs.coords["south_north"], 0, get_crs(work_locs), "stacked_point", ).drop_dims("height") r, c, s, w, bzgrid, nrays = _FORT_EMPTY_BZ(conf.terrain.param) elev_rose["r"] = xr.broadcast(xr.DataArray(r), elev_rose.stacked_point)[0] elev_rose["c"] = xr.broadcast( xr.DataArray(c, dims="radials"), elev_rose.stacked_point )[0] elev_rose["s"] = xr.broadcast( xr.DataArray(s, dims="radials"), elev_rose.stacked_point )[0] elev_rose["w"] = xr.broadcast( xr.DataArray(w, dims="radials"), elev_rose.stacked_point )[0] elev_rose["grid"] = xr.broadcast( xr.DataArray(bzgrid, dims="radials_x_radial_dist"), elev_rose.stacked_point, )[0] elev_rose["nray"] = nrays return elev_rose if conf is None: conf = Config() mp = self.rou_map mp, lct, col_names = _standardize_roughness_map(mp, self.lctable, conf) xmin, ymin, xmax, ymax = mp.total_bounds nsecs = _nsecs_or_sector_dim(output_locs, nsecs) # Roses are 2D parameters, but we convert them to point objects on the map crs work_locs = spatial_stack(output_locs, mp.crs, remove_height=True) work_locs = to_stacked_point(work_locs) if elev_rose is None: elev_rose_present = False elev_rose = _create_bz_like(work_locs, conf) else: elev_rose_present = True if not _locs_finite(work_locs): raise ValueError( "NaN or Inf values found in west_east or south_north coordinates." ) if not _map_finite(mp): raise ValueError( "NaN or Inf values found in vector map, please check your data." ) if not _lct_finite(lct): raise ValueError( "NaN or Inf values found in landcover table, please check your data." ) nr = _get_nr_of_radial_bins(conf) coords = mp.get_coordinates() ( hr1, hi1, hi2, z0, slf, dist, displ, flow_sep_height, nrch, ) = _FORT_GPD_ROU_ROSE_PTS( coords.index, coords.x.values, coords.y.values, mp[col_names[0]].values, mp[col_names[1]].values, xmin, ymin, xmax, ymax, work_locs.west_east, work_locs.south_north, nsecs, nr, lct, elev_rose.nray, elev_rose.grid, elev_rose.c, elev_rose.s, elev_rose.w, elev_rose.r, conf.terrain.param, ) results = (z0, slf, dist, displ, flow_sep_height, nrch) result_names = ("z0", "slf", "dist", "displ", "flow_sep_height", "nrch") dim_names = ( [("max_rou_changes1", "sector", "stacked_point")] * 2 + [("max_rou_changes", "sector", "stacked_point")] + [("sector", "stacked_point")] + [("sector", "stacked_point")] * 2 ) rou_rose = create_dataset( work_locs.coords["west_east"], work_locs.coords["south_north"], 0, get_crs(work_locs), "stacked_point", ).drop_dims("height") for name, dimname, rslt in zip(result_names, dim_names, results): rou_rose[name] = (dimname, rslt) rou_rose = rou_rose.assign_coords( { **create_sector_coords(nsecs).coords, } ) rou_rose = update_var_attrs(rou_rose, _ROU_ROSE_ATTRS) res = rou_rose.transpose( "max_rou_changes", "max_rou_changes1", "sector", "stacked_point" ) if not elev_rose_present: # return only roughness rose if no elevation rose was passed in return (update_history(res), None) else: # update elevation rose with new results after adding displacement heights # and return both rou_rose and elev_rose results = hr1, hi1, hi2 result_names = ("hr1", "hi1", "hi2") dim_names = [("radial_dist", "sector", "stacked_point")] * 3 for name, dimname, rslt in zip(result_names, dim_names, results): elev_rose[name] = (dimname, rslt) return (update_history(res), elev_rose)
[docs] def get_site_effects_rose(self, elev_rose, rou_rose, hgts, conf=None): """ Calculate speed ups, and reweight the significant land fraction .. warning:: This function is experimental and its signature may change. Parameters ---------- elev_rose, rou_rose: xarray.Dataset PyWAsP Datasets containing roses of elevation and roughness at the points hgts: list or 1d array Heights above ground level to calculate values conf : :any:`Config` Configuration information from WAsP Returns ------- xarray.Dataset Dataset containg orographic and roughness speedups, turnings and other topographic parameters at location Notes ----- This calls htrnsf_nt from the WAsP core to calculate the speedups at a given height above ground level using the elevation and roughness roses. """ ### Add point check here usr_config.check_point_count( count_spatial_points(elev_rose.assign_coords(height=hgts)) ) if conf is None: conf = Config() rr = rou_rose.transpose( "max_rou_changes", "max_rou_changes1", "sector", "stacked_point" ) er = elev_rose.transpose( "radial_dist", "radials", "radials_x_radial_dist", "sector", "stacked_point" ) results = _FORT_ROSE_SITE_FACTORS( hgts, rr.z0, rr.slf, rr.nrch, rr.dist, rr.displ, er.flow_sep_height, er.hr1, er.hi1, er.hi2, er.r, conf.terrain.param, ) # Handle errors from fortran ierror = results[-1] _handle_fort_errors(ierror) # Build output dataset result_names = ( "z0meso", "slfmeso", "user_def_speedups", "orographic_speedups", "obstacle_speedups", "roughness_speedups", "user_def_turnings", "orographic_turnings", "obstacle_turnings", "roughness_turnings", ) dim_names = [("sector", "stacked_point")] * 2 + [ ("sector", "height", "stacked_point") ] * 8 site_facs = xr.merge( [ elev_rose[["site_elev", "rix", "dirrix", "flow_sep_height"]], rou_rose[["displ"]], ], combine_attrs="override", ) for name, dimname, rslt in zip(result_names, dim_names, results): site_facs[name] = (dimname, rslt) site_facs = site_facs.assign_coords( { "height": (("height"), hgts), } ) site_facs = update_var_attrs(site_facs, _TOPO_EFFECTS_ATTRS) return site_facs.transpose("sector", "height", "stacked_point")
[docs] def get_site_effects(self, output_locs, conf=None, nsecs=None): """Calculate speedups at all points provided .. warning:: This function is experimental and its signature may change. Parameters ---------- output_locs : xarray.Dataset xarray dataset with location information according to the windkit spatial structures (south_north, west_east, height). If the sector coordinates are also provided it will use those for generating the site effects. conf : :any:`Config` Configuration information from WAsP nsecs : optional int Number of sectors used for creating the roses Returns ------- xarray.Dataset """ # Check number of output points usr_config.check_point_count(count_spatial_points(output_locs)) if conf is None: conf = Config() # always work on point datasets logger.debug("Preparing output_locs for fort") work_locs = output_locs.copy() work_locs = spatial_stack(output_locs, self.elev_map.crs).load() # create array to remember input ordering so that we can sort back to original logger.debug("Sorting output locations for Fortran processing") work_locs["sortorder"] = (("point"), np.arange(work_locs.sizes["point"])) # lexical sort on x,y coordinates such that these are always in order work_locs = work_locs.sortby(["west_east", "south_north"]) if "sector" not in work_locs.dims or nsecs is not None: if nsecs is None: raise ValueError( inspect.cleandoc( """You have to provide the number of sectors you want your terrain analysis for using the nsecs argument""" ) ) sec = create_sector_coords(nsecs) work_locs = work_locs[["point", "sortorder"]].expand_dims( {"sector": sec}, -1 ) work_locs = work_locs.assign_coords(sec.coords) work_locs["sortorder"] = work_locs["sortorder"].sel(sector=0) else: work_locs = work_locs[["sector", "sortorder"]] nsecs = work_locs.sizes["sector"] # the fortran routines first compute a roughness rose # and BZ grid for each unique x,y point and subsequently # runs over all x,y,z points. We must sort get indices to # map points(x,y,z) -> points(x,y) logger.debug("Getting indices from Fortran") if not _locs_finite(work_locs): raise ValueError( "NaN or Inf values found in west_east or south_north coordinates." ) xi = _FORT_GET_INDICES(work_locs.west_east, work_locs.south_north) xu, yu = np.unique( np.vstack([work_locs.west_east.values, work_locs.south_north.values]), axis=1, ) zu = work_locs.height mp = self.rou_map mp, lct, col_names = _standardize_roughness_map(mp, self.lctable, conf) xmin, ymin, xmax, ymax = mp.total_bounds coords = mp.get_coordinates() if not _map_finite(mp): raise ValueError( "NaN or Inf values found in roughness vector map, please check your data." ) if not _lct_finite(lct): raise ValueError( "NaN or Inf values found in landcover table, please check your data." ) nr = _get_nr_of_radial_bins(conf) if isinstance(self.elev_map, gpd.GeoDataFrame): coords_elev = self.elev_map.get_coordinates() logger.debug("Calculating site factors from vector map") ( z0meso, slfmeso, displ, flow_sep_height, s1, s2, s3, s4, d1, d2, d3, d4, site_elev, rix, dirrix, fort_err, ) = _FORT_GPD_SITE_FACTORS( coords_elev.index, coords_elev.x.values, coords_elev.y.values, self.elev_map["elev"].values, coords.index, coords.x.values, coords.y.values, mp[col_names[0]].values, mp[col_names[1]].values, xmin, ymin, xmax, ymax, xu, yu, zu, xi, nr, lct, nsecs, conf.terrain.param, ) elif isinstance(self.elev_map, WaspRasterMap): logger.debug("Calculating site factors from raster map") xmin_elev, ymin_elev, xmax_elev, ymax_elev = self.elev_map.bbox.bounds() ( z0meso, slfmeso, displ, flow_sep_height, s1, s2, s3, s4, d1, d2, d3, d4, site_elev, rix, dirrix, fort_err, ) = _FORT_RAST_GPD_SITE_FACTORS( self.elev_map.values, xmin_elev, ymin_elev, xmax_elev, ymax_elev, self.elev_map.nx, self.elev_map.ny, coords.index, coords.x.values, coords.y.values, mp[col_names[0]].values, mp[col_names[1]].values, xmin, ymin, xmax, ymax, xu, yu, zu, xi, nr, lct, nsecs, conf.terrain.param, ) _handle_fort_errors(fort_err) logger.debug("Formatting Result") result_names = ( "z0meso", "slfmeso", "displ", "flow_sep_height", "user_def_speedups", "orographic_speedups", "obstacle_speedups", "roughness_speedups", "user_def_turnings", "orographic_turnings", "obstacle_turnings", "roughness_turnings", "site_elev", "rix", "dirrix", ) results_2d = [ "z0meso", "slfmeso", "displ", "flow_sep_height", "site_elev", "rix", "dirrix", ] dim_names = ( [("sector", "point")] * 3 + [("sector", "point")] * 9 + [ ("point"), ("point"), ("sector", "point"), ] ) for name, dimname, da in zip( result_names, dim_names, ( z0meso, slfmeso, displ, flow_sep_height, s1, s2, s3, s4, d1, d2, d3, d4, site_elev, rix, dirrix, ), ): work_locs[name] = (dimname, da, _TOPO_EFFECTS_ATTRS[name]) work_locs[name].attrs["_pwio_data_is_2d"] = name in results_2d # put back in original order work_locs = work_locs.sortby(["sortorder"]).drop_vars( ["sortorder", "point"], errors="i" ) work_locs = spatial_unstack(work_locs) # Add global attributes work_locs.attrs["title"] = "WAsP site effects" if "output" in work_locs.variables: work_locs = work_locs.drop_vars("output") work_locs = update_var_attrs(work_locs, _TOPO_EFFECTS_ATTRS) return update_history(work_locs)
def _is_cfd(site_effects): """Determines if xarray Dataset with site_effects is from CFD site_effects: xarray.Dataset Dataset with obtained from reading a .cfdres file using the windkit function read_cfdres Returns ------- boolean: True if the dataset contains variables cfd_speedups and cfd_turnings, which indicate the dataset was obtained from CFD """ has_speedups = "cfd_speedups" in site_effects.data_vars has_turnings = "cfd_turnings" in site_effects.data_vars is_cfd = has_speedups and has_turnings return is_cfd def _add_site_effects_vars(site_effects): """Adds variables to site_effects dataset site_effects: xarray.Dataset CFD dataset with variables `cfd_speedups`, `cfd_turnings` and `z0meso` Returns ------- site_effects: xarray.Dataset Dataset with the usual structure for site effects from linear WAsP, but the orographic speedups replaced by the CFD speedups Notes ----- The `cfd_speedups` and `cfd_turnings` contain both roughness and orographic effects. The variables are the required ones for the fortran core. """ site_effects["slfmeso"] = xr.full_like(site_effects["z0meso"], 1.0) site_effects["displ"] = xr.full_like(site_effects["z0meso"], 0.0) site_effects["flow_sep_height"] = xr.full_like(site_effects["z0meso"], 0.0) site_effects["orographic_speedups"] = site_effects["cfd_speedups"] site_effects["orographic_turnings"] = site_effects["cfd_turnings"] site_effects["roughness_speedups"] = xr.full_like(site_effects["z0meso"], 1.0) site_effects["roughness_turnings"] = xr.full_like(site_effects["z0meso"], 0.0) site_effects["obstacle_speedups"] = xr.full_like(site_effects["z0meso"], 1.0) site_effects["obstacle_turnings"] = xr.full_like(site_effects["z0meso"], 0.0) site_effects["user_def_speedups"] = xr.full_like(site_effects["z0meso"], 1.0) site_effects["user_def_turnings"] = xr.full_like(site_effects["z0meso"], 0.0) return site_effects def _prepare_site_effects_vars(site_effects): """ Prepares the site effects variables for further processing. Parameters ---------- site_effects : xarray.Dataset The site effects data to be prepared. Returns ------- site_effects : xarray.Dataset The prepared site effects data. Notes ----- The function first checks if the site effects data is from a CFD simulation. If it is, it adds necessary variables to the data. Then, it checks if the 'flow_sep_height' variable is in the data. If it's not, it adds it with a default value of 0.0. """ site_effects = site_effects.copy() if _is_cfd(site_effects): site_effects = _add_site_effects_vars(site_effects) if "flow_sep_height" not in site_effects.data_vars: site_effects["flow_sep_height"] = xr.full_like( site_effects["orographic_speedups"], 0.0 ) return site_effects
[docs] def get_site_effects_cfd(cfd_files, output_locs, conf=None, nsecs=None): """Calculate speedups at all points provided .. warning:: This function is experimental and its signature may change. Parameters ---------- cfdres : xarray.Dataset or list of xarray.Dataset xarray dataset with CFD volume information according to the PyWAsP standard (south_north, west_east, height). If the sector coordinates are also provided it will use those for generating the site effects. If a list of xarray.Datasets it will loop over the CFD volumes using the results from whatever file comes first in the list. output_locs : xarray.Dataset xarray dataset with location information according to the PyWAsP standard (south_north, west_east, height). If the sector coordinates are also provided it will use those for generating the site effects. conf : :any:`Config` Configuration information from WAsP nsecs : optional int Number of sectors used for creating the roses Returns ------- xarray.Dataset """ if isinstance(cfd_files, xr.Dataset): ds_comb = _get_site_effects_cfd_tile( cfd_files, output_locs, conf=conf, nsecs=nsecs ) elif isinstance(cfd_files, list): list_ds = [] for f in cfd_files: list_ds.append( _get_site_effects_cfd_tile(f, output_locs, conf=conf, nsecs=nsecs) ) ds_comb = list_ds[0] for ds in list_ds: ds_comb = ds_comb.combine_first(ds) else: raise ValueError( f"cfd_files must be a xr.Dataset or a list of xr.Dataset, got {type(cfd_files)}" ) return ds_comb
def _get_site_effects_cfd_tile(cfdres, output_locs, conf=None, nsecs=None): """Calculate speedups at all points provided Parameters ---------- cfdres : xarray.Dataset or list of xarray.Dataset xarray dataset with CFD volume information according to the PyWAsP standard (south_north, west_east, height). If the sector coordinates are also provided it will use those for generating the site effects. If a list of xarray.Datasets it will loop over the CFD volumes using the results from whatever file comes first in the list. output_locs : xarray.Dataset xarray dataset with location information according to the PyWAsP standard (south_north, west_east, height). If the sector coordinates are also provided it will use those for generating the site effects. conf : :any:`Config` Configuration information from WAsP nsecs : optional int Number of sectors used for creating the CFD site effects (maximum 36) Returns ------- xarray.Dataset """ if isinstance(cfdres, xr.Dataset): ds_comb = _get_site_effects_cfd_tile( cfdres, output_locs, conf=conf, nsecs=nsecs ) elif isinstance(cfdres, list): list_ds = [] for f in cfdres: list_ds.append( _get_site_effects_cfd_tile(f, output_locs, conf=conf, nsecs=nsecs) ) ds_comb = list_ds[0] for ds in list_ds: ds_comb = ds_comb.combine_first(ds) else: raise ValueError( f"`cfdres` must be a xr.Dataset or a list of xr.Dataset, got {type(cfd_files)}" ) return ds_comb def _get_site_effects_cfd_tile(cfdres, output_locs, conf=None, nsecs=None): """Calculate speedups at all points provided Parameters ---------- cfdres : xarray.Dataset xarray dataset with CFD volume information according to the PyWAsP standard (south_north, west_east, height). If the sector coordinates are also provided it will use those for generating the site effects. output_locs : xarray.Dataset xarray dataset with location information according to the PyWAsP standard (south_north, west_east, height). If the sector coordinates are also provided it will use those for generating the site effects. conf : :any:`Config` Configuration information from WAsP nsecs : optional int Number of sectors used for creating the CFD site effects (maximum 36) Returns ------- xarray.Dataset """ # Check number of output points usr_config.check_point_count(count_spatial_points(output_locs)) if conf is None: conf = Config() # always work on point datasets logger.debug("Preparing output_locs for fort") work_locs = output_locs.copy() work_locs = spatial_stack(output_locs, get_crs(cfdres)).load() if "sector" not in work_locs.dims or nsecs is not None: if nsecs is None: raise ValueError( inspect.cleandoc( """You have to provide the number of sectors you want your terrain analysis for using the nsecs argument""" ) ) sec = create_sector_coords(nsecs) work_locs = work_locs[["point"]].expand_dims({"sector": sec}, -1) work_locs = work_locs.assign_coords(sec.coords) else: work_locs = work_locs[["sector", "point"]] nsecs = work_locs.sizes["sector"] if not _locs_finite(work_locs): raise ValueError( "NaN or Inf values found in west_east or south_north coordinates." ) x, y, z = work_locs.west_east.values, work_locs.south_north.values, work_locs.height xmin, ymin, xmax, ymax = BBox.from_ds(cfdres).bounds() # make sure dataset is in correct order for fortran # start in lower left corner for processing cfdres = cfdres.transpose("west_east", "south_north", "sector", "height").sortby( ["south_north", "west_east"] ) logger.debug("Calculating site factors from vector map") ( z0meso, cfd_speedups, cfd_turnings, site_elev, # height of terrain (m) cfd_flow_inclination, # vertical tilt cfd_turbulence_intensity, # turbulence intensity (-) fort_err, ) = _FORT_CFD_SITE_FACTORS( xmin, xmax, ymin, ymax, cfdres.speedups, # cfd volume speedup effects compared to log inlet profile characterized by the mesoscale roughness length cfdres.turnings, # cfd volume directional turning effects compared to log inlet profile characterized by the mesoscale roughness length cfdres.height, # cfd volume heights cfdres.z0meso, # cfd volume roughness length in each sector cfdres.elevation, # cfd volume height of terrain (m) cfdres.turbulence_intensity, # cfd volume turbulence intensity (-) cfdres.flow_inclination, # cfd volume vertical tilt x, y, z, nsecs, conf.terrain.param, ) _handle_fort_errors(fort_err) logger.debug("Formatting Result") result_names = ( "z0meso", "cfd_speedups", "cfd_turnings", "cfd_turbulence_intensity", "cfd_flow_inclination", "site_elev", ) results_2d = ["z0meso", "site_elev"] dim_names = [("sector", "point")] * 5 + [ ("point"), ] for name, dimname, da in zip( result_names, dim_names, ( z0meso, cfd_speedups, cfd_turnings, cfd_turbulence_intensity, cfd_flow_inclination, site_elev, ), ): work_locs[name] = (dimname, da, _TOPO_CFD_EFFECTS_ATTRS[name]) work_locs[name].attrs["_pwio_data_is_2d"] = name in results_2d # put back in original order work_locs = work_locs.drop_vars(["point", "output"], errors="i") work_locs = spatial_unstack(work_locs) # Add global attributes work_locs.attrs["title"] = "WAsP CFD site effects" work_locs = update_var_attrs(work_locs, _TOPO_CFD_EFFECTS_ATTRS) return update_history(work_locs) def _nsecs_or_sector_dim(ds, nsecs): """Returns nsecs or number of sectors with appropriate error handling""" if "sector" in ds.dims: nsecs_locs = ds.sizes["sector"] else: nsecs_locs = None if nsecs is None: if nsecs_locs is None: raise ValueError( "Cannot automatically determine number of sectors, please add a value to nsecs." ) else: nsecs = nsecs_locs else: # have nsecs if nsecs_locs is not None and nsecs != nsecs_locs: raise ValueError( f"nsecs argument ({nsecs}) doesn't match number of sectors on dataset ({nsecs_locs})" ) return nsecs def _get_nr_of_radial_bins(conf): """ Calculate the number of radial bins based on the given configuration. This function is used for setting up the number of radial bins in the spidergrid which is created from landcover vector lines. Parameters ---------- conf : :any:`Config` Configuration information from WAsP. This uses the following parameters to calculate the number of radial bins: - 84: first spider web distance - 31: decaylength for roughness area size (default 10 km) - 83: factor to calculate outer radius (default 3, since at 3*10 km most of the roughness changes are not felt anymore) - 81: expansion factor in percent (default 5%), so each each spider web distance is 5% bigger than the previous Returns ------- nr : float The number of radial bins, calculated based on the given configuration. """ gr1 = conf.terrain[84] # first spider web distance rnm = conf.terrain[31] * conf.terrain[83] # outer radius haf = 1.0 + conf.terrain[81] * 0.01 # expansion factor a la BZ nr = ( np.floor(np.log(rnm / gr1 * (haf - 1.0) + 1.0) / np.log(haf)) + 2 ) # number of radial bins return nr def _standardize_roughness_map(rgh_map, lctable, conf): """Format the roughness map for use in Fortran routines Convert from landcover to roughness as necessary or the other direction, and check the rest of the roughness map requirements. Returns ------- tuple gpd.GeoDataFrame and matrix representation of LandCoverTable and col_names used in the GeoDataFrame """ terrain_analysis = conf.terrain[82] # decide which terrain analysis we use if isinstance(rgh_map, gpd.GeoDataFrame): map_type = wk._vectormap_helpers._get_map_type(rgh_map) if terrain_analysis == 0: if map_type == "landcover": rgh_map = wk._vectormap_helpers._landcover_to_z0(rgh_map, lctable) # create dummy landcover table landcover_table = np.array([np.zeros(3)], dtype="f", order="F") else: if lctable is None: logging.debug( inspect.cleandoc( """No landcover table was specified with this rastermap so the values in the Raster are interpreted as roughness lengths: the vector lines are converted to landcover id's and a correspoding LandCoverTable with these id's""" ) ) rgh_map, lctable = wk.vector_map._z0_to_landcover(rgh_map) landcover_table = wk.LandCoverTable(lctable)._to_matrix() else: raise ValueError( "You need to convert your roughness map from raster to vector." ) map_type = wk._vectormap_helpers._get_map_type(rgh_map) col_names = wk._vectormap_helpers._LR_COLS[map_type] return rgh_map, landcover_table, col_names def _locs_finite(work_locs): """Check if locations contains only finite values Parameters ---------- work_locs : xarray.Dataset GeoDataFrame with roughness, landcover or elevation data Returns ------- boolean True if all values are finite """ bool_locs_finite = ( np.isfinite(work_locs.west_east.values).all() and np.isfinite(work_locs.south_north.values).all() ) return bool_locs_finite def _map_finite(mp): """Check if map file has only finite values Parameters ---------- mp : gpd.GeoDataFrame or WaspRasterMap GeoDataFrame with roughness, landcover or elevation data Returns ------- boolean True if all values are finite """ if isinstance(mp, gpd.GeoDataFrame): bool_map_finite = any(~np.isfinite(mp.get_coordinates())) and any( ~np.isfinite(mp[[r for r in mp.columns if r != "geometry"]]) ) elif isinstance(mp, WaspRasterMap): bool_map_finite = np.isfinite(mp.values).all() else: raise ValueError( """This is not valid WaspRasterMap or gpd.GeoDataFrame""" ) return bool_map_finite def _lct_finite(lct): """Check if landcovertable has only finite values Parameters ---------- lct : ndarray 2D array (z0, d) with `float` type. Returns ------- boolean True if all values are finite """ bool_lct_finite = np.isfinite(lct).all() return bool_lct_finite