Source code for pywasp.io.rastermap

# pylint:disable=I1101
"""
Contains class and class methods to manipulate surface raster files

The data is read using rasterio, so all raster formats are supported, but only
GRD files are able for output. Data is stored in the WAsP memory format.

If manipulating data for outside of WAsP, it is recommended that you use
rasterio or another raster library, and then only read in the data to this
class for use with the pywasp functionality.

Public Class:
    `RasterMap`
        Stores a raster map of either roughness or elevation in WAsP memory format
"""

import inspect
import logging
import warnings
from inspect import cleandoc

import numpy as np
import xarray as xr
import pyproj

import windkit as wk
import windkit
from windkit._vectormap_helpers import _MAP_TYPE_CODES
from windkit.spatial import BBox, get_crs, add_crs
from windkit.raster_map import _roughness_to_landcover

import pywasp
from pywasp._errors import PywaspError
from ..core import Rvea0290  #

logger = logging.getLogger(__name__)


[docs] def rastermap_to_vectormap(da, dz=None, gsize=None, lctable=None, as_geopandas=True): """ Convert raster map to vector map .. warning:: This function is experimental and its signature may change. Parameters ---------- da : xarray.DataArray Raster map to be converted, should have a name of "elevation", "roughness", or "landcover". This is set by default when using "read_raster_map" dz : float, optional Spacing of elevation in z-direction, only for elevation maps, by default 10 m gsize : int, optional Precalculation grid size can increase on error, by default 1000000 lctable : windkit.LandCoverTable Landcover table, required if converting landcover rastermap as_geopandas : bool Convert to Geopandas based vectormap or keep as WaspVectorMap Returns ------- WaspVectorMap or geopandas.GeoDataFrame Vector representation of the input rastermap. """ # Initialize fields if gsize is None: gsize = 1000000 # Get the maptype map_type = da.name # Set a sensible dz if none is provided. if map_type == "elevation": if dz is None: vmin, vmax = da.values.min(), da.values.max() span = vmax - vmin if span > 100.0: dz = 10.0 else: dz = span / 10 dz = max(1.0, dz) if (da.max() - da.min() - dz) < dz: logger.warning( inspect.cleandoc( """ Specified contouring distance is smaller than elevation differences within map. We assume a flat map with the mean of the pixels specified. If this is not desired, reduce the contour spacing using the `dz` argument. """ ) ) bbox = windkit.spatial.BBox.from_ds(da) elev_gpd = windkit.vector_map.create_flat_vector_map( bbox.crs, bbox=bbox, map_type="elevation", elev=float(da.mean()) ) if as_geopandas: return elev_gpd else: return pywasp.io.vectormap.vectormap_to_waspformat(elev_gpd) if map_type == "landcover" and lctable is None: raise ValueError("Must supply lctable for landcover raster.") wrm = rastermap_to_waspformat(da, lctable) vectormap = wrm.to_wasp_vectormap(dz, gsize) if as_geopandas: return pywasp.io.vectormap.waspformat_to_vectormap(vectormap, lctable) else: warnings.warn( "Using WaspVectorMap is deprecated. Future version of pywasp will only support geopandas for vectormaps!", FutureWarning, ) return vectormap
# Wasp format class
[docs] class WaspRasterMap: """ Stores a roughness or elevation map in a raster format usable by WAsP routines. Parameters ---------- values : `ndarray` Raster values of either elevation or roughness type x0, y0 : float Lower Left x and y values (Cell Center value) nx, ny : int Number of cells in x and y dimension res : float Resolution of x and y dimensions (dx, dy) crs : int, str, tuple, dict, pyproj.crs.CRS Coordinate Reference System map_type : str Type of map. List of options: - 'elevation' - 'roughness' - 'landcover' lctable : LandCoverTable LandCoverTable class (Default: None) """
[docs] def __init__(self, values, minx, miny, nx, ny, res, crs, map_type, lctable=None): self.values = values self.minx = minx self.miny = miny self.nx = nx self.ny = ny self.res = res self.map_type = map_type self.lctable = lctable crs = pyproj.crs.CRS(crs) self.crs = crs self.bbox = BBox.from_cornerpts( minx, miny, minx + (nx - 1) * res, miny + (ny - 1) * res, crs )
@property def _map_type_code(self): """returns the integer code for the maptype""" return _MAP_TYPE_CODES[self.map_type] def __repr__(self): return cleandoc( """ WAsP Raster Map Map type: {0.map_type} (code: {0._map_type_code}) minx, miny: {0.minx}, {0.miny} nx, ny: {0.nx}, {0.ny} res: {0.res} """.format( self ) ) def __str__(self): # pragma:no cover return self.__repr__()
[docs] @classmethod def from_rastermap(cls, da, lctable=None): """Create WaspRasterMap from raster object""" return rastermap_to_waspformat(da, lctable)
[docs] def to_landcover(self): """Convert roughness map to landcover map""" if self.map_type != "roughness": raise ValueError( """Can only convert a roughness map to a landcover map. This map is of type {0}""".format( self.map_type ) ) lc_map, lct = _roughness_to_landcover(self.values) return WaspRasterMap( lc_map, self.minx, self.miny, self.nx, self.ny, self.res, self.crs, "landcover", lctable=lct, )
def _landcover_to_roughness(self, lctable=None, variable="z0"): """Converts landcover map to roughness Parameters ---------- lctable: LandCoverTable Lookup table mapping LandCoverID to various flow parameters Default set to None (no lookup table to be used) variable : str String identifying the flow parameter to be extracted from lctable Default set to 'z0', i.e. roughness Returns ------- RasterMap Roughness raster map Raises ------ ValueError If input map is not of type landcover ValueError landcover map that doesn't have a LandCoverTable Notes ----- If lctable is provided it will overwrite any stored lookup table. Roughness map is made by mapping the landcover classes to roughness lengths via the IDs. """ if self.map_type != "landcover": raise ValueError("Input map not of type 'landcover'.") if self.lctable is None and lctable is None: raise ValueError( "Input map does not have a LandCoverTable, please supply one" ) if lctable is None and self.lctable is not None: lctable = self.lctable landcover_converter = lambda cell: lctable[cell][variable] vfunc = np.vectorize(landcover_converter) rou_raster = vfunc(self.values) return self.__class__( rou_raster, self.minx, self.miny, self.nx, self.ny, self.res, crs=self.crs, map_type="roughness", ) def _to_rastermap(self): """Convert to a xarray.DataArray based rastermap""" values = self.values.reshape((self.nx, self.ny), order="F").transpose() x = np.arange(self.nx) * self.res + self.minx y = np.arange(self.ny) * self.res + self.miny coords = { "south_north": (("south_north",), y), "west_east": (("west_east",), x), } dims = ("south_north", "west_east") da = xr.DataArray(values, coords=coords, dims=dims, name=self.map_type) da = add_crs(da, self.crs) return da
[docs] def to_wasp_vectormap(self, dz=10.0, gsize=1000000): """ Convert raster map to vector map .. warning:: This function is experimental and its signature may change. Parameters ---------- dz : float, optional Spacing of elevation in z-direction, only for elevation maps, by default 10 m gsize : int, optional Precalculation grid size can increase on error, by default 1000000 """ from .vectormap import WaspVectorMap n_pts = 0 if self.map_type == "elevation": # Check for dz being positive if dz <= 0.0: raise PywaspError( "WaspRastermap.to_wasp_vectormap(): spacing elevation in z-direction must be positive." ) gx, gy, n_lines, n_pts, ierr = Rvea0290.elemapper_mod.elevationgridtomap( self.values, self.nx, self.ny, self.minx, self.miny, self.res, self.res, dz, gsize, ) if ierr > 0: err = "Precalculation mapsize gsize(%d) is too small" % gsize raise ValueError(err) gx = gx[0:n_pts] gy = gy[0:n_pts] elif self.map_type in ["roughness", "landcover", "landmask"]: gsize = Rvea0290.rghmapper_mod.getmapsize( self.values, self.nx, self.ny, self.minx, self.miny, self.res, self.res ) gx, gy, n_lines = Rvea0290.rghmapper_mod.roughnessgridtomap( self.values, self.nx, self.ny, self.minx, self.miny, self.res, self.res, gsize, _MAP_TYPE_CODES[self.map_type], ) n_pts = gsize - (n_lines * 3) else: err = f"map_type {self.map_type} not recognized, must be roughness, elevation, or landcover" raise ValueError(err) xmax = self.minx + self.nx * self.res ymax = self.miny + self.ny * self.res bbox = windkit.spatial.BBox.from_cornerpts( self.minx, self.miny, xmax, ymax, self.crs ) # There needs to be header file in the VectorMap, which is hardcoded here return WaspVectorMap( "From RasterMap", gx, gy, n_lines, n_pts, self.crs, self.map_type, bbox, lctable=self.lctable, )
[docs] def rastermap_to_waspformat(da, lctable=None): """ Convert raster to WaspRasterMap expected for WAsP fortran routines. .. warning:: This function is experimental and its signature may change. Parameters ---------- da : xarray.DataArray Raster to write. lctable: LandCoverTable LandCoverTable class with id's, roughnesses, displacements and a description Returns ------- WaspRasterMap Rastermap object to be used by WRF routines """ if isinstance(da, WaspRasterMap): return da args = _rastermap_to_waspformat(da, lctable) return WaspRasterMap(*args)
# Convert to wasp format def _rastermap_to_waspformat(da, lctable): """ Convert raster to input arguments for WaspRasterMap expected for WAsP fortran routines. Parameters ---------- da : xarray.DataArray Raster to write. lctable: LandCoverTable LandCoverTable class with id's, roughnesses, displacements and a description Returns ------- args: tuple Arguments to create a WaspRasterMap """ if da.name not in _MAP_TYPE_CODES.keys(): raise ValueError(f"Unknown maptype {da.name}.") if da.name == "landcover" and lctable is None: raise ValueError("lctable required for landcover map.") x, y = da.west_east.values, da.south_north.values nx, ny = len(x), len(y) xmin, ymin = np.min(x), np.min(y) dx, dy = x[1] - x[0], y[1] - y[0] # y[0] is top of domain so flip upside down if dy < 0: # needs_test values = da.values[::-1, :].flatten() else: values = da.values.flatten() crs = get_crs(da) return (values, xmin, ymin, nx, ny, dx, crs, da.name, lctable)