diff --git a/CHANGELOG.md b/CHANGELOG.md index 49efad5..96c0b90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +- Sort converter output by Hilbert distance against the CRS's total bounds instead of by raw WKB byte order. + ## [v0.2.15] - 2026-02-16 - Add a `get_default_collection` to the Registry diff --git a/tests/test_convert.py b/tests/test_convert.py index c57c002..fe31b82 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -1,6 +1,8 @@ import sys from pathlib import Path +import geopandas as gpd +import numpy as np import pytest from loguru import logger @@ -8,6 +10,7 @@ from vecorel_cli.convert import ConvertData from vecorel_cli.registry import Registry from vecorel_cli.validate import ValidateData +from vecorel_cli.vecorel.hilbert import crs_total_bounds, hilbert_distances_from_bounds @pytest.fixture(autouse=True) @@ -41,6 +44,46 @@ def test_converter(tmp_folder, choice): assert validation_result.errors == [] +@pytest.mark.parametrize("choice", ["example"]) +def test_converter_hilbert_curve_order(tmp_folder, choice): + """The converter must emit rows in Hilbert-curve order against the CRS's + total bounds. This is what makes per-partition outputs of the same dataset + mergeable without re-sorting: they all share a CRS-derived Hilbert grid.""" + dest = tmp_folder / "converted.parquet" + ConvertData(choice).convert(dest, cache=(test_path / choice)) + + gdf = gpd.read_parquet(dest) + assert len(gdf) > 1, "Need at least 2 rows to verify ordering." + + total_bounds = crs_total_bounds(gdf.crs) + bounds = gdf.geometry.bounds.to_numpy(dtype=np.float64, copy=False) + keys = hilbert_distances_from_bounds(bounds, total_bounds) + + diffs = np.diff(keys) + if np.any(diffs < 0): + bad = int(np.argmax(diffs < 0)) + raise AssertionError( + "Output rows are not in non-decreasing Hilbert order against the " + f"CRS total bounds ({total_bounds}).\n" + f" First out-of-order pair at index {bad}->{bad + 1}: " + f"key {int(keys[bad])} > {int(keys[bad + 1])}." + ) + + +def test_crs_total_bounds_geographic(): + # Geographic CRS without a more restrictive area_of_use falls back to world. + assert crs_total_bounds("EPSG:4326") == (-180.0, -90.0, 180.0, 90.0) + + +def test_crs_total_bounds_etrs89(): + # EPSG:4258 is geographic with a European area_of_use; the returned bounds + # must reflect that AoU rather than world bounds (otherwise Hilbert keys + # for European data would all be quantised into one tiny cell). + bounds = crs_total_bounds("EPSG:4258") + assert bounds[0] > -180.0 and bounds[2] < 180.0, bounds + assert bounds[1] > -90.0 and bounds[3] < 90.0, bounds + + def test_not_existing_converter(tmp_folder): with pytest.raises(Exception, match="Converter 'not_existing' not found"): converter = ConvertData("not_existing") diff --git a/vecorel_cli/const.py b/vecorel_cli/const.py index a6fde98..08ddeea 100644 --- a/vecorel_cli/const.py +++ b/vecorel_cli/const.py @@ -4,3 +4,9 @@ GEOPARQUET_VERSIONS = ["1.0.0", "1.1.0"] GEOPARQUET_DEFAULT_VERSION = "1.1.0" + +# Default level used when sorting features along a Hilbert curve. A level of N +# splits each axis of the reference bounds into 2**N cells; 16 gives a +# 65,536 x 65,536 grid, fine enough for world-scale references at sub-km +# resolution near the equator. +HILBERT_DEFAULT_LEVEL = 16 diff --git a/vecorel_cli/conversion/base.py b/vecorel_cli/conversion/base.py index b847f64..2274484 100644 --- a/vecorel_cli/conversion/base.py +++ b/vecorel_cli/conversion/base.py @@ -26,6 +26,7 @@ from ..cli.util import display_pandas_unrestricted from ..encoding.geoparquet import GeoParquet from ..vecorel.collection import Collection +from ..vecorel.hilbert import hilbert_sort_geodataframe from ..vecorel.schemas import Schemas from ..vecorel.typing import Sources from ..vecorel.util import get_fs, name_from_uri, stream_file @@ -408,7 +409,12 @@ def convert( self.info("Removing Z geometry dimension") gdf.geometry = gdf.geometry.force_2d() - gdf.sort_values("geometry", inplace=True, ignore_index=True) + # Sort by Hilbert distance against the CRS's total bounds. This gives + # row groups good spatial locality, and — crucially — produces the same + # ordering for any independently-converted partition of the same dataset + # (they share a CRS, therefore a Hilbert reference grid), so per-file + # outputs can be merged later without re-sorting. + gdf = hilbert_sort_geodataframe(gdf) # 8. Remove all columns that are not listed drop_columns = list(set(gdf.columns) - set(actual_columns.values())) diff --git a/vecorel_cli/vecorel/hilbert.py b/vecorel_cli/vecorel/hilbert.py new file mode 100644 index 0000000..06697c0 --- /dev/null +++ b/vecorel_cli/vecorel/hilbert.py @@ -0,0 +1,111 @@ +"""Hilbert curve sorting against CRS-stable reference bounds. + +Sorting by Hilbert distance gives row groups good spatial locality, which +improves bbox-pushdown query performance and downstream compression. We +deliberately sort against the *CRS's* declared extent rather than the +dataframe's own bbox: that way, two independently produced files (for +example two provincial parts that will be merged later) end up using the +same Hilbert reference grid, and their orderings are comparable. +""" + +from __future__ import annotations + +import numpy as np + +from ..const import HILBERT_DEFAULT_LEVEL + +# Geographic world bounds, used as the fallback when a CRS exposes no +# `area_of_use` (e.g. some custom or aggregated CRSs). +_GEOGRAPHIC_WORLD_BOUNDS = (-180.0, -90.0, 180.0, 90.0) + + +def crs_total_bounds(crs) -> tuple[float, float, float, float]: + """Return a stable, deterministic ``(xmin, ymin, xmax, ymax)`` covering the + CRS's valid extent expressed in the CRS's own coordinate units. + + For geographic CRSs this returns the CRS's declared ``area_of_use`` in + degrees. For projected CRSs the geographic ``area_of_use`` is projected + onto the CRS axes (sampled along a 25x25 grid to capture curvature). + + The result is independent of any particular dataset: it depends only on + the CRS. Two independently-converted parts of the same dataset therefore + share the same Hilbert reference grid and their orderings can be merged + without recomputation. + """ + from pyproj import CRS, Transformer + + if crs is None: + return _GEOGRAPHIC_WORLD_BOUNDS + + c = CRS.from_user_input(crs) + aou = c.area_of_use + + if c.is_geographic: + if aou is None: + return _GEOGRAPHIC_WORLD_BOUNDS + return (float(aou.west), float(aou.south), float(aou.east), float(aou.north)) + + if aou is None: + raise ValueError( + f"Cannot derive total bounds for projected CRS {crs!r}: no area_of_use defined" + ) + + transformer = Transformer.from_crs("EPSG:4326", c, always_xy=True) + n = 25 + xs = np.linspace(aou.west, aou.east, n) + ys = np.linspace(aou.south, aou.north, n) + grid_x, grid_y = np.meshgrid(xs, ys) + px, py = transformer.transform(grid_x.ravel(), grid_y.ravel()) + px = np.asarray(px, dtype=np.float64) + py = np.asarray(py, dtype=np.float64) + mask = np.isfinite(px) & np.isfinite(py) + if not mask.any(): + raise ValueError(f"Could not project area_of_use into CRS {crs!r}") + return ( + float(px[mask].min()), + float(py[mask].min()), + float(px[mask].max()), + float(py[mask].max()), + ) + + +def hilbert_distances_from_bounds( + bounds: np.ndarray, + total_bounds: tuple[float, float, float, float], + level: int = HILBERT_DEFAULT_LEVEL, +) -> np.ndarray: + """Compute per-row Hilbert distances from feature bbox rows + CRS bounds. + + ``bounds`` must be an ``(N, 4)`` float64 array of ``[xmin, ymin, xmax, ymax]`` + per feature. Returns a ``(N,)`` uint64 array of Hilbert keys; sorting by + this key gives the Hilbert-curve ordering at the given resolution. + """ + from geopandas.tools.hilbert_curve import _continuous_to_discrete_coords, _encode + + x, y = _continuous_to_discrete_coords(bounds, level, list(total_bounds)) + return _encode(level, x, y).astype(np.uint64, copy=False) + + +def hilbert_sort_geodataframe(gdf, level: int = HILBERT_DEFAULT_LEVEL): + """Return a copy of ``gdf`` sorted by Hilbert distance against the CRS's + total bounds. The original index is reset. + + If ``gdf.crs`` is None or has no area_of_use, falls back to geographic + world bounds; in that case the resulting order is still stable but Hilbert + keys may not be comparable to other independently-produced outputs. + """ + if gdf.empty: + return gdf.reset_index(drop=True) + + total = crs_total_bounds(gdf.crs) + bounds_2d: np.ndarray = gdf.geometry.bounds.to_numpy(dtype=np.float64, copy=False) + keys = hilbert_distances_from_bounds(bounds_2d, total, level=level) + order = np.argsort(keys, kind="stable") + return gdf.iloc[order].reset_index(drop=True) + + +__all__ = [ + "crs_total_bounds", + "hilbert_distances_from_bounds", + "hilbert_sort_geodataframe", +]