Skip to content

spherical voronoi

Spherical voronoi utils.

This module contains spherical voronoi implementation based on SphericalVoronoi function from scipy library.

generate_voronoi_regions(
    seeds,
    max_meters_between_points=10000,
    num_of_multiprocessing_workers=-1,
    multiprocessing_activation_threshold=None,
)

Generate Thessien polygons for a given list of seeds.

Function generates Thessien polygons on a sphere using SphericalVoronoi algorithm from scipy library.

PARAMETER DESCRIPTION
seeds

Seeds used for generating regions. If list, the points are expected to be in WGS84 coordinates (lat, lon). Otherwise, a GeoDataFrame will be transformed into WGS84.

TYPE: Union[GeoDataFrame, List[Point]]

max_meters_between_points

Maximal distance in meters between two points in the resulting polygon. Higher number results lower resolution of a polygon. Defaults to 10_000 (10 kilometers).

TYPE: int DEFAULT: 10000

num_of_multiprocessing_workers

Number of workers used for multiprocessing. Defaults to -1 which results in a total number of available cpu threads. 0 and 1 values disable multiprocessing. Similar to n_jobs parameter from scikit-learn library.

TYPE: int DEFAULT: -1

multiprocessing_activation_threshold

Number of seeds required to start processing on multiple processes. Activating multiprocessing for a small amount of points might not be feasible. Defaults to 100.

TYPE: int DEFAULT: None

RETURNS DESCRIPTION
list[MultiPolygon]

List[MultiPolygon]: List of MultiPolygons cut into up to 4 polygons based on quadrants of a sphere.

RAISES DESCRIPTION
ValueError

If less than 4 seeds are provided.

ValueError

If any seed is duplicated.

ValueError

If any seed is outside WGS84 coordinates domain.

Source code in srai/regionalizers/_spherical_voronoi.py
def generate_voronoi_regions(
    seeds: Union[gpd.GeoDataFrame, list[Point]],
    max_meters_between_points: int = 10_000,
    num_of_multiprocessing_workers: int = -1,
    multiprocessing_activation_threshold: Optional[int] = None,
) -> list[MultiPolygon]:
    """
    Generate Thessien polygons for a given list of seeds.

    Function generates Thessien polygons on a sphere using
    SphericalVoronoi algorithm from scipy library.

    Args:
        seeds (Union[gpd.GeoDataFrame, List[Point]]): Seeds used for generating regions.
            If list, the points are expected to be in WGS84 coordinates (lat, lon).
            Otherwise, a GeoDataFrame will be transformed into WGS84.
        max_meters_between_points (int, optional): Maximal distance in meters between two points
            in the resulting polygon. Higher number results lower resolution of a polygon.
            Defaults to 10_000 (10 kilometers).
        num_of_multiprocessing_workers (int, optional): Number of workers used for multiprocessing.
            Defaults to -1 which results in a total number of available cpu threads.
            `0` and `1` values disable multiprocessing.
            Similar to `n_jobs` parameter from `scikit-learn` library.
        multiprocessing_activation_threshold (int, optional): Number of seeds required to start
            processing on multiple processes. Activating multiprocessing for a small
            amount of points might not be feasible. Defaults to 100.

    Returns:
        List[MultiPolygon]: List of MultiPolygons cut into up to 4 polygons based
            on quadrants of a sphere.

    Raises:
        ValueError: If less than 4 seeds are provided.
        ValueError: If any seed is duplicated.
        ValueError: If any seed is outside WGS84 coordinates domain.
    """
    if isinstance(seeds, gpd.GeoDataFrame):
        seeds, region_ids = _parse_geodataframe_seeds(seeds)
    else:
        region_ids = list(range(len(seeds)))

    if len(seeds) < 4:
        raise ValueError("Minimum 4 seeds are required.")

    duplicated_seeds_ids = _get_duplicated_seeds_ids(seeds, region_ids)
    if duplicated_seeds_ids:
        raise ValueError(f"Duplicate seeds present: {duplicated_seeds_ids}.")

    if not _check_if_in_bounds(seeds):
        raise ValueError("Seeds outside Earth WGS84 bounding box.")

    num_of_multiprocessing_workers = _parse_num_of_multiprocessing_workers(
        num_of_multiprocessing_workers
    )
    multiprocessing_activation_threshold = _parse_multiprocessing_activation_threshold(
        multiprocessing_activation_threshold
    )

    unit_sphere_ellipsoid = Ellipsoid(
        semimajor_axis=1, semiminor_axis=1, name="Unit Sphere", model="Unit"
    )
    mapped_points = [_map_to_geocentric(pt.x, pt.y, unit_sphere_ellipsoid) for pt in seeds]
    sphere_points = np.array([[pt[0], pt[1], pt[2]] for pt in mapped_points])

    radius = 1
    center = np.array([0, 0, 0])
    sv = SphericalVoronoi(sphere_points, radius, center, threshold=SCIPY_THRESHOLD)
    sv.sort_vertices_of_regions()

    total_regions = len(sv.regions)
    region_ids = list(range(total_regions))

    activate_multiprocessing = (
        num_of_multiprocessing_workers > 1 and total_regions >= multiprocessing_activation_threshold
    )

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=RuntimeWarning)

        # generate all spherical polygons

        generate_spherical_polygons_parts_func = partial(
            _generate_spherical_polygons_parts,
            sv=sv,
        )

        if activate_multiprocessing:
            spherical_polygons_parts = [  # noqa: FURB179
                polygon_part_tuple
                for polygon_part_tuples in process_map(
                    generate_spherical_polygons_parts_func,
                    region_ids,
                    desc="Generating spherical polygons",
                    max_workers=num_of_multiprocessing_workers,
                    chunksize=ceil(total_regions / (4 * num_of_multiprocessing_workers)),
                )
                for polygon_part_tuple in polygon_part_tuples
            ]
        else:
            spherical_polygons_parts = [
                polygon_part_tuple
                for region_id in tqdm(region_ids, desc="Generating spherical polygons")
                for polygon_part_tuple in generate_spherical_polygons_parts_func(
                    region_id=region_id
                )
            ]

        # identify all edges

        hashed_vertices: dict[VertexHash, npt.NDArray[np.float32]] = {}
        hashed_edges: set[EdgeHash] = set()

        regions_parts: dict[int, list[tuple[int, list[EdgeHash]]]] = {}

        for (
            region_id,
            sphere_part_id,
            spherical_polygon_points,
        ) in spherical_polygons_parts:
            if region_id not in regions_parts:
                regions_parts[region_id] = []

            n = len(spherical_polygon_points)
            polygon_vertices_hashes = []
            polygon_edges = []

            # Hash all vertices
            for i in range(n):
                start: npt.NDArray[np.float32] = spherical_polygon_points[i]
                start_hash = hashlib.sha256(start.data).digest()
                hashed_vertices[start_hash] = start
                polygon_vertices_hashes.append(start_hash)

            # Map all edges
            for i in range(n):
                start_hash = polygon_vertices_hashes[i]
                end_hash = polygon_vertices_hashes[(i + 1) % n]

                if start_hash == end_hash:
                    continue

                polygon_edges.append((start_hash, end_hash))

                if (start_hash, end_hash) not in hashed_edges and (
                    end_hash,
                    start_hash,
                ) not in hashed_edges:
                    hashed_edges.add(
                        (
                            start_hash,
                            end_hash,
                        )
                    )

            regions_parts[region_id].append((sphere_part_id, polygon_edges))

        # interpolate unique ones

        interpolated_edges: dict[EdgeHash, list[tuple[float, float]]]

        interpolate_polygon_edge_func = partial(
            _interpolate_polygon_edge,
            hashed_vertices=hashed_vertices,
            ell=unit_sphere_ellipsoid,
            max_meters_between_points=max_meters_between_points,
        )

        if activate_multiprocessing:
            interpolated_edges = {
                hashed_edge: interpolated_edge
                for hashed_edge, interpolated_edge in zip(
                    hashed_edges,
                    process_map(
                        interpolate_polygon_edge_func,
                        hashed_edges,
                        desc="Interpolating edges",
                        max_workers=num_of_multiprocessing_workers,
                        chunksize=ceil(len(hashed_edges) / (4 * num_of_multiprocessing_workers)),
                    ),
                )
            }
        else:
            interpolated_edges = {
                hashed_edge: interpolate_polygon_edge_func(hashed_edge)
                for hashed_edge in tqdm(hashed_edges, desc="Interpolating edges")
            }

        # use interpolated edges to map spherical polygons into regions

        generated_regions: list[MultiPolygon] = []
        _generate_sphere_parts()

        for region_id in tqdm(region_ids, desc="Generating polygons"):
            multi_polygon_parts: list[Polygon] = []

            for sphere_part_id, region_polygon_edges in regions_parts[region_id]:
                polygon_points: list[tuple[float, float]] = []

                for edge_start, edge_end in region_polygon_edges:
                    if (edge_start, edge_end) in interpolated_edges:
                        interpolated_edge = interpolated_edges[(edge_start, edge_end)]
                    else:
                        interpolated_edge = interpolated_edges[(edge_end, edge_start)][::-1]

                    interpolated_edge = _fix_edge(
                        interpolated_edge,
                        SPHERE_PARTS_BOUNDING_BOXES[sphere_part_id].bounds,
                        prev_lon=polygon_points[-1][0] if polygon_points else None,
                        prev_lat=polygon_points[-1][1] if polygon_points else None,
                    )

                    polygon_points.extend(interpolated_edge)

                polygon = Polygon(polygon_points)
                polygon = make_valid(polygon)
                polygon = polygon.intersection(SPHERE_PARTS_BOUNDING_BOXES[sphere_part_id])
                if isinstance(polygon, GeometryCollection):
                    for geometry in polygon.geoms:
                        if isinstance(geometry, (Polygon, MultiPolygon)):
                            polygon = geometry
                            break
                    else:
                        raise RuntimeError(
                            f"Intersection with a quadrant did not produce any Polygon. ({polygon})"
                        )

                if isinstance(polygon, Polygon):
                    multi_polygon_parts.append(polygon)
                elif isinstance(polygon, MultiPolygon):
                    multi_polygon_parts.extend(polygon.geoms)
                elif isinstance(polygon, (LineString, MultiLineString, Point)):
                    pass
                else:
                    raise RuntimeError(str(type(polygon)))

            multi_polygon = MultiPolygon(multi_polygon_parts)
            generated_regions.append(multi_polygon)

    return generated_regions

ecef2geodetic_vectorized(x, y, z, ell, deg=True)

Modified function for mapping ecdf to geodetic values from ellipsoid.

This function is a modified copy of ecdf2geodetic function from pymap3d library and is redistributed under BSD-2 license.

PARAMETER DESCRIPTION
x

X coordinates.

TYPE: NDArray[float32]

y

Y coordinates.

TYPE: NDArray[float32]

z

Z coordinates.

TYPE: NDArray[float32]

ell

Ellipsoid object.

TYPE: Ellipsoid

deg

Flag whether to return values in degrees. Defaults to True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
NDArray[float32]

npt.NDArray[np.float32]: Parsed latitudes and longitudes

Source code in srai/regionalizers/_spherical_voronoi.py
def ecef2geodetic_vectorized(
    x: npt.NDArray[np.float32],
    y: npt.NDArray[np.float32],
    z: npt.NDArray[np.float32],
    ell: Ellipsoid,
    deg: bool = True,
) -> npt.NDArray[np.float32]:
    """
    Modified function for mapping ecdf to geodetic values from ellipsoid.

    This function is a modified copy of `ecdf2geodetic` function from `pymap3d` library
    and is redistributed under BSD-2 license.

    Args:
        x (npt.NDArray[np.float32]): X coordinates.
        y (npt.NDArray[np.float32]): Y coordinates.
        z (npt.NDArray[np.float32]): Z coordinates.
        ell (Ellipsoid): Ellipsoid object.
        deg (bool, optional): Flag whether to return values in degrees. Defaults to True.

    Returns:
        npt.NDArray[np.float32]: Parsed latitudes and longitudes
    """
    with suppress(NameError):
        x = np.asarray(x)
        y = np.asarray(y)
        z = np.asarray(z)

    r = np.sqrt(x**2 + y**2 + z**2)

    E = np.sqrt(ell.semimajor_axis**2 - ell.semiminor_axis**2)

    # eqn. 4a
    u = np.sqrt(0.5 * (r**2 - E**2) + 0.5 * np.hypot(r**2 - E**2, 2 * E * z))

    Q = np.hypot(x, y)

    huE = np.hypot(u, E)

    # eqn. 4b
    try:
        with warnings.catch_warnings(record=False):
            warnings.simplefilter("error")
            Beta = np.arctan(huE / u * z / np.hypot(x, y))
    except (ArithmeticError, RuntimeWarning):
        is_zero_dimensions = len(x.shape) == 0

        if is_zero_dimensions:
            if np.isclose(z, 0):
                Beta = 0
            elif z > 0:
                Beta = np.pi / 2
            else:
                Beta = -np.pi / 2
        else:
            _beta_arr = []

            for _x, _y, _z, _u, _huE in zip(x, y, z, u, huE):
                try:
                    with warnings.catch_warnings(record=False):
                        warnings.simplefilter("error")
                        _beta = np.arctan(_huE / _u * _z / np.hypot(_x, _y))
                except (ArithmeticError, RuntimeWarning):
                    if np.isclose(_z, 0):
                        _beta = 0
                    elif _z > 0:
                        _beta = np.pi / 2
                    else:
                        _beta = -np.pi / 2
                _beta_arr.append(_beta)

            Beta = np.asarray(_beta_arr)

    # eqn. 13
    dBeta = ((ell.semiminor_axis * u - ell.semimajor_axis * huE + E**2) * np.sin(Beta)) / (
        ell.semimajor_axis * huE * 1 / np.cos(Beta) - E**2 * np.cos(Beta)
    )

    Beta += dBeta

    # eqn. 4c
    lat = np.arctan(ell.semimajor_axis / ell.semiminor_axis * np.tan(Beta))

    with suppress(NameError):
        # patch latitude for float32 precision loss
        lim_pi2 = np.pi / 2 - np.finfo(dBeta.dtype).eps
        lat = np.where(Beta >= lim_pi2, np.pi / 2, lat)
        lat = np.where(Beta <= -lim_pi2, -np.pi / 2, lat)

    lon = np.arctan2(y, x)

    # eqn. 7
    cosBeta = np.cos(Beta)
    with suppress(NameError):
        # patch altitude for float32 precision loss
        cosBeta = np.where(Beta >= lim_pi2, 0, cosBeta)
        cosBeta = np.where(Beta <= -lim_pi2, 0, cosBeta)

    alt = np.hypot(z - ell.semiminor_axis * np.sin(Beta), Q - ell.semimajor_axis * cosBeta)

    # inside ellipsoid?
    inside = (
        x**2 / ell.semimajor_axis**2 + y**2 / ell.semimajor_axis**2 + z**2 / ell.semiminor_axis**2
        < 1
    )

    try:
        if inside.any():
            # avoid all false assignment bug
            alt[inside] = -alt[inside]
    except (TypeError, AttributeError):
        if inside:
            alt = -alt

    if deg:
        lat = np.degrees(lat)
        lon = np.degrees(lon)

    return lat, lon, alt