Skip to content

H3Regionalizer

Bases: Regionalizer

H3 Regionalizer.

H3 Regionalizer allows the given geometries to be divided into H3 cells - hexagons with pentagons as a very rare exception

Source code in srai/regionalizers/h3_regionalizer.py
class H3Regionalizer(Regionalizer):
    """
    H3 Regionalizer.

    H3 Regionalizer allows the given geometries to be divided
    into H3 cells - hexagons with pentagons as a very rare exception
    """

    def __init__(self, resolution: int, buffer: bool = True) -> None:
        """
        Init H3Regionalizer.

        Args:
            resolution (int): Resolution of the cells. See [1] for a full comparison.
            buffer (bool, optional): Whether to fully cover the geometries with
                H3 Cells (visible on the borders). Defaults to True.

        Raises:
            ValueError: If resolution is not between 0 and 15.

        References:
            1. https://h3geo.org/docs/core-library/restable/
        """
        if not (0 <= resolution <= 15):
            raise ValueError(f"Resolution {resolution} is not between 0 and 15.")

        self.resolution = resolution
        self.buffer = buffer

    def transform(self, gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
        """
        Regionalize a given GeoDataFrame.

        Transforms given geometries into H3 cells of given resolution
        and optionally applies buffering.

        Args:
            gdf (gpd.GeoDataFrame): (Multi)Polygons to be regionalized.

        Returns:
            gpd.GeoDataFrame: H3 cells.

        Raises:
            ValueError: If provided GeoDataFrame has no crs defined.
        """
        gdf_wgs84 = gdf.to_crs(crs=WGS84_CRS)

        gdf_exploded = self._explode_multipolygons(gdf_wgs84)
        gdf_buffered = self._buffer(gdf_exploded) if self.buffer else gdf_exploded

        h3_indexes = (
            seq(gdf_buffered[GEOMETRY_COLUMN])
            .map(self._polygon_shapely_to_h3)
            .flat_map(lambda polygon: h3.polygon_to_cells(polygon, self.resolution))
            .distinct()
            .to_list()
        )

        gdf_h3 = self._gdf_from_h3_indexes(h3_indexes)

        # there may be too many cells because of too big buffer
        if self.buffer:
            gdf_h3_clipped = gdf_h3.sjoin(gdf_exploded[[GEOMETRY_COLUMN]]).drop(
                columns="index_right"
            )
            gdf_h3_clipped = gdf_h3_clipped[~gdf_h3_clipped.index.duplicated(keep="first")]
        else:
            gdf_h3_clipped = gdf_h3

        gdf_h3_clipped.index.name = REGIONS_INDEX

        return gdf_h3_clipped.to_crs(gdf.crs)

    def _polygon_shapely_to_h3(self, polygon: geometry.Polygon) -> h3.Polygon:
        """
        Convert Shapely Polygon to H3 Polygon.

        Args:
            polygon (geometry.Polygon): Shapely polygon to be converted.

        Returns:
            h3.Polygon: Converted polygon.
        """
        exterior = [coord[::-1] for coord in list(polygon.exterior.coords)]
        interiors = [
            [coord[::-1] for coord in list(interior.coords)] for interior in polygon.interiors
        ]
        return h3.Polygon(exterior, *interiors)

    def _gdf_from_h3_indexes(self, h3_indexes: List[str]) -> gpd.GeoDataFrame:
        """
        Convert H3 Indexes to GeoDataFrame with geometries.

        Args:
            h3_indexes (List[str]): H3 Indexes.

        Returns:
            gpd.GeoDataFrame: H3 cells.
        """
        return gpd.GeoDataFrame(
            None,
            index=h3_indexes,
            geometry=[self._h3_index_to_shapely_polygon(h3_index) for h3_index in h3_indexes],
            crs=WGS84_CRS,
        )

    def _h3_index_to_shapely_polygon(self, h3_index: str) -> geometry.Polygon:
        """
        Convert H3 Index to Shapely polygon.

        Args:
            h3_index (str): H3 Index to be converted.

        Returns:
            geometry.Polygon: Converted polygon.
        """
        return self._polygon_h3_to_shapely(h3.cells_to_polygons([h3_index])[0])

    def _polygon_h3_to_shapely(self, polygon: h3.Polygon) -> geometry.Polygon:
        """
        Convert H3 Polygon to Shapely Polygon.

        Args:
            polygon (h3.Polygon): H3 Polygon to be converted.

        Returns:
            geometry.Polygon: Converted polygon.
        """
        return geometry.Polygon(
            shell=[coord[::-1] for coord in polygon.outer],
            holes=[[coord[::-1] for coord in hole] for hole in polygon.holes],
        )

    def _buffer(self, gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
        """
        Buffer geometries to generate H3 cells that cover them entirely.

        Filling the geometries using h3.polygon_to_cells() with H3 cells does
        not cover fully the geometries around the borders, because some H3 cells' centers fall out
        of the geometries. To overcome that a buffering is needed to incorporate these cells back.
        Buffering is done in meters and is liberal, which means that performing
        h3.polygon_to_cells() on new geometries results in some H3 cells not intersecting with the
        original geometries. Spatial join with the original geometries is needed later to solve
        this issue.

        Notes:
            According to [1] the ratio between the biggest and smallest hexagons at a given
            resolution is at maximum ~2. From that we can deduce, that the maximum edge length
            ratio becomes t_max / t_min = sqrt(2). As we would like to buffer at least 1 edge length
            regardless of the size, we need to multiply by at least sqrt(2).

        Args:
            gdf (gpd.GeoDataFrame): Geometries.

        Returns:
            gpd.GeoDataFrame: Geometries buffered around the edges.

        References:
            1. https://h3geo.org/docs/core-library/restable/#hexagon-min-and-max-areas
        """
        buffer_distance_meters = 2 * h3.average_hexagon_edge_length(self.resolution, unit="m")
        buffered_geometries = gdf.geometry.apply(
            lambda polygon: buffer_geometry(polygon, buffer_distance_meters)
        )

        return gpd.GeoDataFrame(
            geometry=buffered_geometries,
            index=gdf.index,
        )

__init__

__init__(resolution: int, buffer: bool = True) -> None

Init H3Regionalizer.

PARAMETER DESCRIPTION
resolution

Resolution of the cells. See [1] for a full comparison.

TYPE: int

buffer

Whether to fully cover the geometries with H3 Cells (visible on the borders). Defaults to True.

TYPE: bool DEFAULT: True

RAISES DESCRIPTION
ValueError

If resolution is not between 0 and 15.

References
  1. https://h3geo.org/docs/core-library/restable/
Source code in srai/regionalizers/h3_regionalizer.py
def __init__(self, resolution: int, buffer: bool = True) -> None:
    """
    Init H3Regionalizer.

    Args:
        resolution (int): Resolution of the cells. See [1] for a full comparison.
        buffer (bool, optional): Whether to fully cover the geometries with
            H3 Cells (visible on the borders). Defaults to True.

    Raises:
        ValueError: If resolution is not between 0 and 15.

    References:
        1. https://h3geo.org/docs/core-library/restable/
    """
    if not (0 <= resolution <= 15):
        raise ValueError(f"Resolution {resolution} is not between 0 and 15.")

    self.resolution = resolution
    self.buffer = buffer

transform

transform(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame

Regionalize a given GeoDataFrame.

Transforms given geometries into H3 cells of given resolution and optionally applies buffering.

PARAMETER DESCRIPTION
gdf

(Multi)Polygons to be regionalized.

TYPE: gpd.GeoDataFrame

RETURNS DESCRIPTION
gpd.GeoDataFrame

gpd.GeoDataFrame: H3 cells.

RAISES DESCRIPTION
ValueError

If provided GeoDataFrame has no crs defined.

Source code in srai/regionalizers/h3_regionalizer.py
def transform(self, gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """
    Regionalize a given GeoDataFrame.

    Transforms given geometries into H3 cells of given resolution
    and optionally applies buffering.

    Args:
        gdf (gpd.GeoDataFrame): (Multi)Polygons to be regionalized.

    Returns:
        gpd.GeoDataFrame: H3 cells.

    Raises:
        ValueError: If provided GeoDataFrame has no crs defined.
    """
    gdf_wgs84 = gdf.to_crs(crs=WGS84_CRS)

    gdf_exploded = self._explode_multipolygons(gdf_wgs84)
    gdf_buffered = self._buffer(gdf_exploded) if self.buffer else gdf_exploded

    h3_indexes = (
        seq(gdf_buffered[GEOMETRY_COLUMN])
        .map(self._polygon_shapely_to_h3)
        .flat_map(lambda polygon: h3.polygon_to_cells(polygon, self.resolution))
        .distinct()
        .to_list()
    )

    gdf_h3 = self._gdf_from_h3_indexes(h3_indexes)

    # there may be too many cells because of too big buffer
    if self.buffer:
        gdf_h3_clipped = gdf_h3.sjoin(gdf_exploded[[GEOMETRY_COLUMN]]).drop(
            columns="index_right"
        )
        gdf_h3_clipped = gdf_h3_clipped[~gdf_h3_clipped.index.duplicated(keep="first")]
    else:
        gdf_h3_clipped = gdf_h3

    gdf_h3_clipped.index.name = REGIONS_INDEX

    return gdf_h3_clipped.to_crs(gdf.crs)