"""Tools for converting a line map to polygon map
The LineMap class provides comparison, conversion, and plotting
methods.
The lines2poly function provides a simple functional conversion
"""
# In this module a node is defined as an endpoint of a line. It results that
# there are two nodes for each lines and different nodes may be at the same
# position: in that case the nodes are adjacent.
import numpy as np
import pandas as pd
from shapely.geometry import LinearRing, LineString, Polygon
from shapely.geometry.polygon import orient as shapely_orient
import windkit.map_conversion.poly2lines as poly2lines
from windkit.geospatial_imports import HAS_GEOPANDAS, requires_geopandas
from windkit.map_conversion.helper_functions import (
_dict_to_df,
_get_AL,
_get_angle,
_sort_counterclockwise_points,
)
from windkit.plot._helpers import HAS_MATPLOTLIB, requires_matplotlib
from windkit.plot.color import Color, _get_valid_color
if HAS_GEOPANDAS:
import geopandas as gpd
from geopandas.testing import assert_geodataframe_equal
if HAS_MATPLOTLIB:
import matplotlib.pyplot as plt
def _check_line_gdf_format(gdf):
required = ["geometry", "id_left", "id_right"]
for column_name in required:
if column_name not in gdf:
raise Exception(f"Missing required column: {column_name}")
if not (np.array(gdf.geom_type) == "LineString").all():
raise Exception("Geometry column should contain LineStrings only")
def _is_valid_buffer(buffer):
if isinstance(buffer, float) or isinstance(buffer, int):
return buffer >= 0
else:
return (np.array(buffer) >= 0).all()
class LineMap:
"""
Class to convert a line geodataframe to a polygon geodataframe.
The path argument has the priority over the line_gdf argument.
Parameters
----------
lines: GeoDataFrame or string or path
A geodataframe including the columns geometry filled with LineString
and id filled with integers or float, or path to such a geodataframe.
"""
def __init__(self, lines):
requires_geopandas()
if isinstance(lines, gpd.GeoDataFrame):
line_gdf = lines
else:
line_gdf = gpd.read_file(lines)
_check_line_gdf_format(line_gdf)
self._line_gdf = line_gdf.reset_index()
self._original_n = len(self._line_gdf)
self._visited = None
self._nodes = None
self._AL_nodes = None
self._tight_bbox = None
self._xleft, self._ydown, self._xright, self._yup = None, None, None, None
self._middle = None
self._poly_gdf = None
self._buffer = None
def __string__(self):
"""Return the object representation."""
return f"Line map.\nGeodataframe: {self._line_gdf}"
@property
def line_gdf(self):
"""Return the original line geodataframe."""
lines = self._line_gdf.iloc[: self._original_n]
lines.set_index(lines.loc[:, "index"].astype(int), inplace=True)
return lines.drop(columns="index")
def assert_lines_equal(self, map2, **kwargs):
"""
Evaluate lines dataframes equality with another map.
Sort the points in each geometry and the geometries in the dataframes.
Parameters
----------
map2: ConvertibleMap
The map that has the line gdf to compare to.
**kwargs:
Will be passed to assert_geodataframe_equal
"""
df1 = sort_line_gdf(self.line_gdf)
df2 = sort_line_gdf(map2.line_gdf)
try:
assert_geodataframe_equal(df1, df2, check_dtype=False, **kwargs)
return True
except AssertionError:
return False
def plot(
self,
plot_endpoints=False,
landcover_table=None,
color_lines=False,
cmap=None,
norm=None,
ignore_collisions=True,
**kwargs,
):
"""
Plot the original line GeoDataFrame.
Parameters
----------
plot_endpoints : bool, optional. Default: False
Whether or not to plot the endpoints of the lines.
landcover_table: windkit LandCoverTable or dict or None. Default: None
Map ids to roughness values. If None, use ids for coloring lines.
color_lines : bool or string: "right" or "left", optional. Default: False
Whether to use the left or right id to color the lines.
The lines are colored according to "z0" if available else "id".
If False, the lines will be uniform in color.
cmap : matplotlib.colors.Colormap, optional. Default: None
Colormap to color the lines. If cmap and norm are set to None and
color_lines is True, use a default colormap.
norm : matplotlib.colors.BoundaryNorm, optional. Default: None
norm used when coloring the lines. If cmap and norm are set to None
and color_lines is True, use a default norm.
ignore_collisions : bool, optional. Default: True
If ignore_collisions is False, cmap is None and color_lines is True,
(i.e. default color is used), the function will raise an error if two
different rougness values are mapped to the same colors.
"""
if isinstance(color_lines, str):
side = color_lines
color_lines = True
else:
side = "left"
try:
line_gdf = self._get_line_gdf_with_z0(
side=side, landcover_table=landcover_table
)
except (AttributeError, KeyError):
line_gdf = self.line_gdf
if color_lines:
column = "z0" if "z0" in line_gdf.columns else f"id_{side}"
else:
column = None
if cmap is None and norm is None and (column == "z0"):
try:
color = Color._from_lc(landcover_table)
except Exception as e:
color = _get_valid_color(line_gdf, ignore_collisions)
cmap, norm = color.cmap, color.norm
ax = line_gdf.plot(aspect=1, cmap=cmap, column=column, **kwargs)
if plot_endpoints:
self._endpoints_plot(ax=ax)
def _get_line_gdf_with_z0(self, landcover_table=None, side="left"):
"""Return a copy of the line dataframe with roughness values.
This is only possible if we have a landcover table.
"""
line_gdf = self.line_gdf
if isinstance(landcover_table, dict):
landcover_table = _dict_to_df(landcover_table)
if landcover_table is not None:
id_ = line_gdf.loc[:, f"id_{side}"]
line_gdf["z0"] = np.array(landcover_table.loc[id_, "z0"])
else:
raise AttributeError(
"Cannot add roughness values without a landcover table."
)
return line_gdf
def _endpoints_plot(self, **kwargs):
requires_matplotlib()
if self._nodes is None:
self._nodes = self._get_nodes()
coords = np.array([np.array(c) for c in self._nodes[: 2 * self._original_n]])
if "ax" in kwargs:
kwargs["ax"].plot(coords[:, 0], coords[:, 1], "r.")
else:
plt.plot(coords[:, 0], coords[:, 1], "r.")
def to_poly_map(self, bbox=None, buffer=0, **kwargs):
"""Compute and return the polygon map.
Parameters
----------
bbox: 4-uple, optional. Default: None
x_left, y_down, x_right, y_up. If None, the bbox will fit the data.
buffer: float or 4-uple, optional. Default: 0
If lines of the gdf touches the border continuously, the border will
be moved by the value of the buffer. The buffer has to be non negative.
null buffer may cause invalid polygons if a line touches a border
continuously.
**kwargs: rtol and atol can be used to assess that two points are at the same
place with np.isclose.
"""
if not _is_valid_buffer(buffer):
raise ValueError("Invalid value for buffer argument.")
self._buffer = buffer
self._nodes = self._get_nodes()
self._AL_nodes = _get_AL(self._nodes, **kwargs)
bbox = self._get_buffered_borders(bbox_user=bbox)
self._xleft, self._ydown, self._xright, self._yup = bbox
self._middle = ((self._xleft + self._xright) / 2, (self._yup + self._ydown) / 2)
self._make_all_polygons(**kwargs)
# reset the dataframe for next time / for getting the correct lines
self._line_gdf = self._line_gdf.iloc[: self._original_n]
return poly2lines.PolygonMap(self._poly_gdf)
def _make_all_polygons(self, **kwargs):
"""Create/update the attribute poly_gdf."""
requires_geopandas()
self._link_borders()
# nodes have to be updated after linking.
self._nodes = self._get_nodes()
self._AL_nodes = _get_AL(self._nodes, **kwargs)
self._visited = np.zeros(2 * len(self._line_gdf), bool)
polygons = []
polygons_id = []
polygons_nodes = []
holes = []
holes_id = []
holes_nodes = []
for i in range(2 * self._original_n):
if not self._visited[i]:
polygon_nodes = self._get_polygon(i)
points = self._nodes2polygon_points(polygon_nodes)
id_, type_ = self._get_id_and_type(polygon_nodes)
if type_ == "shell":
polygons.append(Polygon(points))
polygons_id.append(id_)
polygons_nodes.append(polygon_nodes)
elif type_ == "hole":
holes.append(Polygon(points))
holes_id.append(id_)
holes_nodes.append(polygon_nodes)
polygons, polygons_id = self._cut_holes(
polygons, holes, holes_id, polygons_id, polygons_nodes, holes_nodes
)
self._poly_gdf = gpd.GeoDataFrame(
{"geometry": polygons, "id": polygons_id}, crs=self.line_gdf.crs
)
def _cut_holes(
self, polygons, holes, holes_id, polygons_id, polygons_nodes, holes_nodes
):
"""Return a list of polygon without intersections."""
areas = np.array([Polygon(h).area for h in holes])
sort_holes = np.argsort(-areas)
holes = np.array(holes, dtype=object)[sort_holes]
holes_id = np.array(holes_id)[sort_holes]
holes_nodes = np.array(holes_nodes, dtype=object)[sort_holes]
covered = np.zeros(len(holes), dtype=bool)
for i, poly_int in enumerate(holes):
for j, poly_ext in enumerate(polygons):
if poly_ext.contains(poly_int) and not poly_int.covers(poly_ext):
if holes_id[i] != polygons_id[j]:
self._matching_error(
[holes_id[i], polygons_id[j]],
np.r_[holes_nodes[i], polygons_nodes[j]],
)
h = [p.coords for p in poly_ext.interiors] + [
poly_int.exterior.coords
]
polygons[j] = shapely_orient(
Polygon(poly_ext.exterior.coords, holes=h)
)
covered[i] = True
if not covered.all():
holes_id = holes_id[~covered]
if len(np.unique(holes_id)) != 1:
self._matching_error(
np.unique(holes_id),
np.concatenate(holes_nodes[~covered]),
)
block = Polygon(
shell=[
(self._xleft, self._yup),
(self._xleft, self._ydown),
(self._xright, self._ydown),
(self._xright, self._yup),
],
holes=[h.exterior.coords for h in holes[~covered]],
)
block = shapely_orient(block)
polygons.append(block)
polygons_id.append(holes_id[0])
return polygons, polygons_id
def _matching_error(self, common_ids, nodes=None):
common_ids = np.array(common_ids)
error_message = (
f"\nMore than one id found: {', '.join((common_ids).astype(str))}"
)
if nodes is not None:
lines = np.array(nodes) // 2
error_message += f" for line(s) {', '.join((lines).astype(str))}."
raise ValueError(error_message)
def _get_buffered_borders(self, bbox_user=None):
self.total_bounds
expected_sign = np.array([-1, -1, 1, 1])
if bbox_user is not None:
buffer = bbox_user - self._tight_bbox
where_bbox_user = buffer * expected_sign > 0
for i in range(4):
if buffer[i] * expected_sign[i] < 0:
raise ValueError(f"bbox[{i}] inside map.")
else:
buffer = expected_sign * self._buffer
where_bbox_user = [0, 0, 0, 0]
bbox = self._add_buffers(buffer, where_bbox_user)
return bbox
@property
def total_bounds(self):
"""
Return the coordinates of the borders (left, down, right, top).
Return the ordinate of the bottom and the top and the absissa of the
right and left borders. Not that the extreme points can be in the
middle of a line.
"""
if self._tight_bbox is None:
points = []
cum_len = [0]
for i in range(self._original_n):
points += self._node2line_points(2 * i, get_full_line=1)
cum_len.append(len(points))
x = np.array([p[0] for p in points])
y = np.array([p[1] for p in points])
self._tight_bbox = np.array([min(x), min(y), max(x), max(y)])
return self._tight_bbox
def _add_buffers(self, buffer, where_bbox_user):
border_end_points = self._get_node_points(self._get_borders_indices())
x = np.array([p[0] for p in border_end_points])
y = np.array([p[1] for p in border_end_points])
for i in [0, 2]:
if (
(x == self._tight_bbox[i])
& ~(y == self._tight_bbox[1])
& ~(y == self._tight_bbox[3])
).any():
buffer[i] = 0
if where_bbox_user[i]:
raise ValueError(
f"Cannot extend the map on {f'left' if i == 0 else 'right'} border because open lines stop."
)
for i in [1, 3]:
if (
(y == self._tight_bbox[i])
& ~(x == self._tight_bbox[0])
& ~(x == self._tight_bbox[2])
).any():
buffer[i] = 0
if where_bbox_user[i]:
raise ValueError(
f"Cannot extend the map on {'bottom' if i == 1 else 'top'} border because open lines stop."
)
return self._tight_bbox + buffer
def _get_nodes(self):
"""Get all the lines endpoints or nodes."""
nodes = np.zeros(2 * len(self._line_gdf), dtype=tuple)
for i, line in enumerate(self._line_gdf.loc[:, "geometry"]):
nodes[2 * i] = line.coords[0]
nodes[2 * i + 1] = line.coords[-1]
return nodes
def _get_node_points(self, nodes):
"""Return the coordinates of the nodes."""
points = []
for node in nodes:
line = self._line_gdf.loc[node // 2, "geometry"]
if node % 2 == 0:
point = (line.coords[0][0], line.coords[0][1])
else:
point = (line.coords[-1][0], line.coords[-1][1])
points.append(point)
return points
def _get_borders_indices(self):
"""Return the index of the nodes on the edge."""
AL_count = [len(self._AL_nodes[i]) for i in range(len(self._AL_nodes))]
borders = np.where(np.array(AL_count) == 0)[0]
return borders
def _get_sorted_borders_with_corners(self):
"""
Return the border nodes indices sorted.
Returns
-------
points: list of tuples
"""
requires_geopandas()
corners = [
(self._xleft, self._ydown),
(self._xleft, self._yup),
(self._xright, self._ydown),
(self._xright, self._yup),
]
borders = self._get_borders_indices()
points = np.unique(self._get_node_points(borders) + corners, axis=0)
points = [(p[0], p[1]) for p in points]
ref = (self._xleft, self._yup)
sorted_indices = _sort_counterclockwise_points(self._middle, ref, points)
return np.array(points)[sorted_indices]
def _link_borders(self):
sorted_borders = self._get_sorted_borders_with_corners()
border_lines = []
for i in range(len(sorted_borders)):
points = [sorted_borders[i], sorted_borders[i - 1]]
border_lines.append(LineString(points))
id_ = [-1] * len(border_lines)
self._line_gdf = pd.concat(
[
self._line_gdf,
gpd.GeoDataFrame(
{"geometry": border_lines, "id_left": id_, "id_right": id_},
crs=self.line_gdf.crs,
),
],
ignore_index=True,
)
def _get_polygon(self, next_node):
"""
Return a polygon starting with line1 and point node i.
To get a polygon starting with a node, always take the line with the
smallest angle and continue until the polygon is closed
Parameters
----------
next_node: integer
index of the node in the list of nodes.
Returns
-------
poly : list of int
nodes forming the polygon starting with next_node.
"""
first_node = next_node
polygon_nodes = []
k = 0
while (next_node != first_node) or (k == 0):
polygon_nodes.append(next_node)
next_node = self._get_next_node(next_node)
k += 1
if k > len(self._line_gdf):
raise ValueError(
"Lines cannot create closed polygon, please update linemap to close areas."
)
self._visited[next_node] = True
return polygon_nodes
def _get_next_node(self, i_node):
"""
Return the next line of a given polygon.
Parameters
----------
i_node: int
index of the node considered.
Returns
-------
next_node: int
index of the node to continue the polygon with.
"""
AL_node = self._AL_nodes[i_node]
angles = np.zeros(len(AL_node))
for j, j_node in enumerate(AL_node):
a, b, c = self._get_angle_points(i_node, j_node)
angles[j] = _get_angle(a, b, c)
j = np.argmin(angles)
next_node = AL_node[j] // 2 * 2 + 1 - AL_node[j] % 2
return next_node
def _get_angle_points(self, i, j):
"""
Return the 3 points a, b, c that form an angle bac given two lines ab, ac.
Parameters
----------
i, j: int
the node i is part of a line (line1) and j part of a line (line2)
Returns
-------
a, b1, b2: tuple
points so that a belongs to line 1 and line2, b1 to line1 and b2 to
line2
"""
line1 = self._line_gdf.loc[i // 2, "geometry"]
line2 = self._line_gdf.loc[j // 2, "geometry"]
if i % 2 == 0:
a = line1.coords[0]
b1 = line1.coords[1]
else:
a = line1.coords[-1]
b1 = line1.coords[-2]
if j % 2 == 0:
b2 = line2.coords[1]
else:
b2 = line2.coords[-2]
return a, b1, b2
def _nodes2polygon_points(self, polygon_nodes):
"""
Return the points of the corresponding polygon.
points: list of tuples
Can be used to build a polygon object
"""
points = []
for node in polygon_nodes:
points += self._node2line_points(node)
return points
def _node2line_points(self, i, get_full_line=0):
"""
Return all the points of the line expect one.
The points are returned as tuple. They may be returned in order or in
inverse order depending on the node used: in order if the node is the
start of the line, revered if it is the last. The other node is not
returned as it belongs to the next line, or to the first if the line is
the last of the polygon.
"""
line = self._line_gdf.loc[i // 2, "geometry"]
if i % 2 == 1:
points = [
(line.coords[j][0], line.coords[j][1])
for j in range(len(line.coords) - 1 + get_full_line)
]
else:
points = [
(line.coords[-j][0], line.coords[-j][1])
for j in range(1 - get_full_line, len(line.coords))
]
return points
def _get_id_and_type(self, nodes):
"""
Return the id of a polygon.
Parameters
----------
nodes : list of int
nodes belonging to a polygon
"""
nodes = np.array(nodes)
points = self._nodes2polygon_points(nodes)
is_ccw = LinearRing(points).is_ccw
nodes = nodes[nodes < 2 * self._original_n]
common_id = np.zeros(len(nodes), dtype=int)
for i, node in enumerate(nodes):
side = "right" if node % 2 else "left"
common_id[i] = self._line_gdf.loc[node // 2, f"id_{side}"]
common_id = np.unique(common_id)
if len(common_id) != 1:
self._matching_error(common_id, nodes=nodes)
type_ = self._polygon_type(common_id[0], nodes, is_ccw)
return common_id[0], type_
def _polygon_type(self, id_, nodes, is_ccw):
if is_ccw:
type_ = "hole"
else:
type_ = "shell"
return type_
[docs]
def lines2poly(line_gdf, bbox=None, buffer=0):
"""Convert a geodataframe of lines to a geodataframe of polygons.
Parameters
----------
line_gdf: GeoDataFrame
A geodataframe including the columns geometry filled with LineString
and id filled with integers or float.
bbox: 4-uple, optional. Default: None
x_left, y_down, x_right, y_up
buffer: float, optional. Default: 0
If lines of the gdf touches the border continuously, this border will
be moved by the value of the buffer. The buffer has to be non negative.
null buffer may cause invalid polygons if a line touches a border
continuously.
"""
line_map = LineMap(line_gdf)
poly_map = line_map.to_poly_map(bbox=bbox, buffer=buffer)
return poly_map._poly_gdf
def sort_line_gdf(gdf):
"""Return the dataframe "sorted".
Sorted depending on the length of the line, which is unique in the example
maps. The order itself does not matter as long as we can get a unique order.
"""
requires_geopandas()
gdf.reset_index(inplace=True, drop=True)
lines = np.zeros(len(gdf), dtype=object)
id_left = np.zeros(len(gdf))
id_right = np.zeros(len(gdf))
for i in range(len(gdf)):
lines[i], id_left[i], id_right[i] = _line_sort(gdf.loc[i])
gdf = gpd.GeoDataFrame(
{
"geometry": [LineString(line) for line in lines],
"id_left": id_left,
"id_right": id_right,
},
)
line_length = [line.length for line in gdf.loc[:, "geometry"]]
if len(np.unique(line_length)) != len(line_length):
raise Exception("Current comparison criteria cannot separate some lines.")
sort_ = np.argsort(line_length)
return gdf.loc[sort_].reset_index(drop=True)
def _line_sort(df_row):
"""Ensure a given line is always given in the same direction.
Parameters
----------
df_row: pandas series
row of a map line geodataframe.
"""
id_left = df_row.loc["id_left"]
id_right = df_row.loc["id_right"]
points = np.array(df_row.loc["geometry"].coords)
sort = np.lexsort((points[:, 0], points[:, 1]))
if sort[0] > sort[-1]:
points = points[::-1]
id_left, id_right = id_right, id_left
if (points[0] == points[-1]).all():
points, was_ccw = poly2lines._poly_sort(points, line=True)
if not was_ccw:
id_left, id_right = id_right, id_left
return np.array(points), id_left, id_right