Source code for pywasp.io.vector_map

"""
Contains class and class methods to manipulate elevation and roughness vector files

VectorMaps are used to represent elevation contours and roughness change lines.
Pywasp can read elevation and roughness data from either classic WAsP mapfiles
or the newer gml file formats, but many other file formats are also supported
through GDAL's OGR tool, however this requires you to set the field ids
appropriately when reading the files.

VectorMaps are used by pywaps Topography module to calculate speedups, turning,
and the mesoroughness, for either generalization or downscaling.

Mapfiles are used to represent elevation contours and roughness change lines.
They can then be used by the Topography module to calculate speedups, turning,
and the mesoroughness, for either generalization or downscaling.
"""

from inspect import cleandoc
import numpy as np
import pyproj
from shapely.geometry import LineString
from shapely import linestrings
import geopandas as gpd
import warnings

from windkit.spatial import BBox
from windkit.topography._vectormap_helpers import (
    _get_map_type,
    _explode_gdf,
    _landcover_to_roughness,
)
from windkit.topography._vectormap_helpers import (
    _LR_COLS,
    VECTORMAP_ELEV_COL,
    _MAP_TYPE_CODES,
    MapTypes,
    VECTORMAP_ROUL_COL,
    VECTORMAP_ROUR_COL,
    VECTORMAP_IDL_COL,
    VECTORMAP_IDR_COL,
    VECTORMAP_LMASKL_COL,
    VECTORMAP_LMASKR_COL,
    _is_single_polygons,
)
from windkit.topography.map_conversions import snap_to_layer, _preprocess_polygon_gdf
from ..core import Rvea0287, Rvea0290  #
from .._errors import _handle_fort_errors


[docs] def polygons_to_lines( gdf, lctable=None, map_type="roughness", return_lctable=False, external_roughness=None, check_errors=True, snap=True, ): """Converts a geopandas.GeoDataFrame vectormap object to a xarray.DataArray rastermap object. .. warning:: This function is experimental and its signature may change. Parameters ---------- gdf : geopandas.GeoDataFrame Vectormap with only polygons and at minimum columns 'z0' or 'id' (only allowed when a landcover table ``lct`` is specified). Can also contain a column 'd' that specifies the displacement height and a column 'desc', which describes the landcover in the corresponding polygon. lctable : LandCoverTable Landcover table specifying the landcover classes and their mappings to roughness and displacement height. Required for 'landcover' maps that only have an 'id' column. The default is None, which means that there is a 'z0' column in the dataframe that specifies the roughness length. map_type : {"roughness","landcover"} Whether the output is a geopandas dataframe with roughness (z0) or id change lines, default "roughness". return_lctable: bool Whether to return the landcover table, default False external_roughness: float, optional Default None, meaning that segments from polygons that border a unspecified area are removed. If a float is specified, the roughness of these unspecified areas are assigned this value. check_errors: check for the following errors in the map 1) No polygons are allowed to overlap 2) If external_roughness is None: The exterior boundary (convex hull) given by the extent of the polygons must not contain holes. 3) If external_roughness is a float: Throw a warning if there is holes in the polygons you have specified. snap: bool If True, insert extra vertices if a vertex is present in one polygon but not in the one that is touching it Returns ------- gdf : geopandas.GeoDataFrame Vectormap with LineString and columns <var>_left and <var>_right Notes ----- if map_type="landcover" and return_lctable is True also returns: lct: wk.LandCoverTable landcover table containing the mapping of columns id_left,id_right to roughness length ``z0``, displacement height ``d`` and a description ``desc`` """ if external_roughness is None: doclipping = True else: doclipping = False gdf, lctable, unique_z0d, _ = _preprocess_polygon_gdf( gdf, lctable=lctable, external_roughness=external_roughness, check_errors=check_errors, ) if snap: gdf = snap_to_layer(gdf) coord = gdf.get_coordinates() vertices, nverts = Rvea0287.poly.process_lines( gdf.id.values, coord.index, coord.x, coord.y, max(lctable.keys()), doclipping ) gdf_out = gpd.GeoDataFrame( vertices[:nverts], columns=["x1", "y1", "x2", "y2", "id_left", "id_right"], geometry=[ LineString(v[0:4].reshape(2, 2)) for i, v in enumerate(vertices[:nverts]) ], ) gdf_out = gdf_out.set_geometry("geometry").set_crs(gdf.crs) gdf_out = gdf_out.dissolve(by=["id_left", "id_right"]) gdf_out = gdf_out.line_merge() gdf_out = gdf_out.reset_index(name="geometry").explode() gdf_out["geometry"] = gdf_out["geometry"].make_valid() gdf_out = _explode_gdf(gdf_out).reset_index().drop(columns="index") if map_type == "roughness": if (unique_z0d.reset_index()["d"] > 0).any(): raise ValueError( "You map contains displacement heights. This information will be lost when map_type='roughness', use map_type='landcover' to keep this information" ) if return_lctable: raise ValueError( "Cannot return a landcover table when using map_type='roughness', use map_type='landcover' instead if you want to obtain the landcover table." ) return _landcover_to_roughness(gdf_out, lctable) else: if return_lctable: return (gdf_out, lctable) else: return gdf_out
[docs] def vector_to_raster(gdf, res, bounds=None, map_type=None, return_lctable=False): """ Converts a geopandas.GeoDataFrame vector map object to a xarray.DataArray raster map object. .. warning:: This function is experimental and its signature may change. Parameters ---------- gdf : geopandas.GeoDataFrame vector map to be converted res : float Resolution of x and y dimensions (dx, dy) bounds : tuple, windkit.spatial.BBox, optional Bounding box of the output raster map. In the format: (xmin, ymin, xmax, ymax). By default (None), the entire geodataframe bounding box is used. map_type : {"roughness","landcover"} or None Only used when the gdf is of polygon type, controls whether the output is a raster with roughness (z0) or landcover id, default "roughness". return_lctable: bool Only used when the gdf is of polygon type, whether to return the landcover table, default False Returns ------- xarray.DataArray Rastermap representation of the map """ gdf = _explode_gdf(gdf) lct = None if _is_single_polygons(gdf): if return_lctable: if map_type is None: map_type = "landcover" gdf, lct = polygons_to_lines( gdf, map_type=map_type, return_lctable=return_lctable ) else: if map_type is None: map_type = "roughness" gdf = polygons_to_lines( gdf, map_type=map_type, return_lctable=return_lctable ) else: if return_lctable is not None or map_type is not None: warnings.warn( "return_lctable / map_type arguments are not used when using change lines." ) wvm = _vector_to_wasp_format(gdf, lctable=lct) wrm = wvm.to_wasp_raster_map(res, bounds=bounds) if return_lctable: return (wrm._to_raster_map(), lct) else: return wrm._to_raster_map()
class _WaspVectorMap(object): """ Vector representation of the map. Notes ----- This is the only type of mapfile that the Topography module works with for the moment. Parameters ---------- header : str Single line description of the mapfile, will typically be the proj.4 string that describes the projection of the map. gx : array_like The first three entries of this array are the left roughness length, right roughness length, and elevation, followed by the x-cooridinate values for each point. (1D) gy : array_like The first three entries of this array are the number of points for a specific line, the line type, and a null value represented as 0.0. (1D) n_lines : int Number of lines in the map n_pts : int Total number of points in the map epsg : int EPSG number for identifying SRS map_type : int ID representing the type of map from MapType bbox : tuple Bounding box holding the global extent of the object. This has the following values (minx, miny, maxx, maxy). """ _map_type_codes = { "elevation": 0, "roughness": 1, "landcover": 6, "landmask": 15, } def __init__( self, header, gx, gy, n_lines, n_pts, crs, map_type, bbox, lctable=None ): # Always store gx and gy as a list if isinstance(gx, np.ndarray): gx = gx.tolist() if isinstance(gy, np.ndarray): gy = gy.tolist() if not isinstance(bbox, BBox): raise ValueError("bbox value should be a windkit.BBox object") self.header = header self.gx = gx self.gy = gy self.n_lines = n_lines self.n_pts = n_pts self.crs = crs if crs is not None: self.crs = pyproj.CRS.from_user_input(crs) self.map_type = map_type self.bbox = bbox self.lctable = lctable def __repr__(self): # pragma: no cover repr_list = [ repr(self.header), repr(self.gx), repr(self.gy), repr(self.n_lines), repr(self.n_pts), repr(self.crs.to_string()), repr(self.map_type), repr(self.bbox), ] return "WaspVectorMap(" + ", ".join(repr_list) + ")" def __str__(self): # pragma: no cover out = """ {0.header} Contains {0.n_lines} contour lines It has the extent of {0.bbox} """.format(self) return out @classmethod def from_geodf(cls, gdf, lctable=None): """ Initialize a _WaspVectorMap from a geodataframe .. warning:: This function is experimental and its signature may change. Parameters ---------- gdf: geopandas.GeoDataFrame Vectormap as a geodataframe map_type: str Either "elevation" or "roughness" to split combo map """ args, kwargs = _gdf_to_wvm(gdf, lctable=None) return cls(*args, **kwargs) def _to_geodf(self): """ Convert _WaspVectorMap to GeoDataFrame object. """ return _wvm_to_gdf(self.gx, self.gy, self.n_lines, self.crs) def to_wasp_raster_map(self, res, bounds=None): """ Converts a _WaspVectorMap object to a WaspRasterMap object. .. warning:: This function is experimental and its signature may change. Parameters ---------- res : float Resolution of x and y dimensions (dx, dy) bounds : tuple, windkit.spatial.BBox, optional Bounding box of the output raster map. In the format: (xmin, ymin, xmax, ymax). By default (None), the entire geodataframe bounding box is used. Returns ------- WaspRasterMap RasterMap class instance """ from .raster_map import _WaspRasterMap # Here to prevent circular imports if isinstance(bounds, BBox): bounds = bounds.bounds() # if no bounding box is specified, just plot the whole map based on bbox from vector map if bounds is None: bounds = self.bbox.bounds() xmin, ymin, xmax, ymax = bounds nx = int((xmax - xmin) / res) ny = int((ymax - ymin) / res) if self.map_type == "elevation": convert = Rvea0290.elegridder_mod.elevationmaptogrid elif self.map_type in ["roughness", "landcover", "landmask"]: convert = Rvea0290.rghgridder_mod.roughnessmaptogrid else: err = f"Don't know how to convert WaspVectorMap of map_type {self.map_type} to WaspRasterMap" raise ValueError(err) values, _ = convert(self.gx, self.gy, self.n_lines, xmin, ymin, nx, ny, res) return _WaspRasterMap( values, xmin, ymin, nx, ny, res, self.crs, self.map_type, lctable=self.lctable, ) def _vector_to_wasp_format(gdf, lctable=None): """ Creates a _WaspVectorMap object from a geodataframe. .. warning:: This function is experimental and its signature may change. Parameters ---------- gdf : geopandas.GeoDataFrame Vectormap as a geodataframe Returns ------- _WaspVectorMap : _WaspVectorMap """ if isinstance(gdf, _WaspVectorMap): return gdf args, kwargs = _gdf_to_wvm(gdf, lctable=lctable) return _WaspVectorMap(*args, **kwargs) def _gdf_to_wvm(gdf, lctable=None): """ Create arguments and keyword-agument to create a _WaspVectorMap from a geodataframe Parameters ---------- gdf : geopandas.GeoDataFrame Vectormap as a geodataframe elev_col : str name of elevation column in geodataframe z0_cols : bool names of z0_left and z0_right in geodataframe Returns ------- args : tuple Arguments needed for _WaspVectorMap __init__ kwargs : dict Key-word aguments for _WaspVectorMap __init__ """ if not isinstance(gdf, gpd.GeoDataFrame): raise ValueError( cleandoc( """This is not a GeoDataFrame. Perhaps you are passing in a combination of a GeoDataFrame and a landcover table?""" ) ) # Wasp only works with single part geometries gdf = _explode_gdf(gdf) # Get the map_type based con column names, and throw errors / warnings based on type map_type = _get_map_type(gdf) # If we have non-elevation maps we have left-right columns, get their names if map_type in ["roughness", "landcover", "landmask"]: lr_cols = _LR_COLS[map_type] else: lr_cols = 2 * (VECTORMAP_ELEV_COL,) coords = gdf.get_coordinates() gx, gy, _, n_lines, ierror = Rvea0287.waspvectormap.from_gpd( coords.index, coords.x.values, coords.y.values, gdf[lr_cols[0]].values, gdf[lr_cols[1]].values, _MAP_TYPE_CODES[map_type], ) _handle_fort_errors(ierror) n_pts = coords.index.size crs = gdf.crs bbox = BBox.from_cornerpts(*gdf.total_bounds, crs) args = ("PyWAsP map", gx, gy, n_lines, n_pts, crs, map_type, bbox) kwargs = {"lctable": lctable if map_type == "landcover" else None} return args, kwargs def _wvm_to_gdf(gx, gy, n_lines, crs=None): """ Create geopandas.GeoDataFrame from _WaspVectorMap arrays gx, gy, and n_lines (the number of features). Parameters ---------- gx, gy : array_like _WaspVectorMap arrays n_lines : int Number of features crs : str, int, tuple, dict, pyproj.CRS Coordinate Reference System. Returns ------- geopandas.GeoDataFrame Vector map in geopandas format. """ ( index, x, y, z0_left, z0_right, elev, map_type, ierror, ) = Rvea0287.waspvectormap.to_gpd(gx, gy, len(gx) - n_lines * 3, n_lines) _handle_fort_errors(ierror) geom = linestrings(x, y, indices=index) if all(map_type == MapTypes.elevation): data = {VECTORMAP_ELEV_COL: elev} elif all(map_type == MapTypes.roughness): data = {VECTORMAP_ROUL_COL: z0_left, VECTORMAP_ROUR_COL: z0_right} elif all(map_type == MapTypes.landcover): data = { VECTORMAP_IDL_COL: z0_left.astype(int), VECTORMAP_IDR_COL: z0_right.astype(int), } elif all(map_type == MapTypes.landmask): data = {VECTORMAP_LMASKL_COL: z0_left, VECTORMAP_LMASKR_COL: z0_right} else: raise ValueError("More than 1 map type found!") gdf = gpd.GeoDataFrame(data, geometry=geom, crs=crs) return gdf def _wasp_format_to_vector(wvm): """ Convert _WaspVectorMap to geopandas.GeoDataFrame .. warning:: This function is experimental and its signature may change. Parameters ---------- wvm : _WaspVectorMap VectorMap to be converted to GeoDataFrame Returns ------- geopandas.GeoDataFrame Resulting geopandas.GeoDataFrame """ return _wvm_to_gdf(wvm.gx, wvm.gy, wvm.n_lines, crs=wvm.crs)