From f9745b1ee213e0e8351a7bad79c15680ea79ead5 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Wed, 15 Apr 2026 10:35:39 -0700 Subject: [PATCH] Fix bump OOM: int32 coords, default-count cap, per-chunk dask partitioning (#1206) - Change locs dtype from uint16 to int32 to fix silent coordinate wrap-around for rasters with dimensions > 65535 - Cap default count (w*h//10) at 10M bumps to prevent unbounded eager allocation on large rasters - Add memory guard that raises MemoryError when bump arrays would exceed 50% of available RAM - Replace closure-based dask paths with per-chunk partitioning via dask.delayed + da.block, so each task only serializes its own bump subset instead of the entire locs/heights arrays --- xrspatial/bump.py | 159 ++++++++++++++++++++++++----------- xrspatial/tests/test_bump.py | 81 ++++++++++++++++++ 2 files changed, 192 insertions(+), 48 deletions(-) diff --git a/xrspatial/bump.py b/xrspatial/bump.py index e5a4ef78..cdcea695 100644 --- a/xrspatial/bump.py +++ b/xrspatial/bump.py @@ -17,6 +17,28 @@ class cupy(object): from xrspatial.utils import ArrayTypeFunctionMapping, _validate_scalar, ngjit +# Upper bound on bump count to prevent accidental OOM from the default +# w*h//10 heuristic. 16 bytes per bump (int32 loc pair + float64 height), +# so 10M bumps ~ 160 MB. +_MAX_DEFAULT_COUNT = 10_000_000 + + +def _available_memory_bytes(): + """Best-effort estimate of available memory in bytes.""" + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + return int(line.split()[1]) * 1024 + except (OSError, ValueError, IndexError): + pass + try: + import psutil + return psutil.virtual_memory().available + except (ImportError, AttributeError): + pass + return 2 * 1024 ** 3 + @ngjit def _finish_bump(width, height, locs, heights, spread): @@ -45,58 +67,88 @@ def _bump_cupy(data, width, height, locs, heights, spread): return cupy.asarray(_finish_bump(width, height, locs, heights, spread)) -def _bump_dask_numpy(data, width, height, locs, heights, spread): - def _chunk_bump(block, block_info=None): - info = block_info[0] - y_start, y_end = info['array-location'][0] - x_start, x_end = info['array-location'][1] - chunk_h, chunk_w = block.shape - - mask = ((locs[:, 0] >= x_start) & (locs[:, 0] < x_end) & - (locs[:, 1] >= y_start) & (locs[:, 1] < y_end)) - - if not np.any(mask): - return np.zeros((chunk_h, chunk_w)) +def _partition_bumps(data, locs, heights, spread): + """Split bumps into per-chunk subsets so closures stay small. - local_locs = locs[mask] - local_locs[:, 0] -= x_start - local_locs[:, 1] -= y_start + Returns a dict mapping ``(yi, xi)`` chunk indices to + ``(local_locs, local_heights)`` pairs (or *None* when a chunk has + no bumps). Coordinates in *local_locs* are relative to the chunk + origin. Bumps whose spread overlaps a chunk boundary are assigned + to the chunk that contains their centre pixel. + """ + y_offsets = np.concatenate([[0], np.cumsum(data.chunks[0])]) + x_offsets = np.concatenate([[0], np.cumsum(data.chunks[1])]) + ny = len(data.chunks[0]) + nx = len(data.chunks[1]) + + partitions = {} + for yi in range(ny): + y0, y1 = int(y_offsets[yi]), int(y_offsets[yi + 1]) + for xi in range(nx): + x0, x1 = int(x_offsets[xi]), int(x_offsets[xi + 1]) + mask = ((locs[:, 0] >= x0) & (locs[:, 0] < x1) & + (locs[:, 1] >= y0) & (locs[:, 1] < y1)) + if np.any(mask): + local_locs = locs[mask].copy() + local_locs[:, 0] -= x0 + local_locs[:, 1] -= y0 + partitions[(yi, xi)] = (local_locs, heights[mask].copy()) + else: + partitions[(yi, xi)] = None + return partitions - return _finish_bump(chunk_w, chunk_h, local_locs, heights[mask], spread) - return da.map_blocks( - _chunk_bump, data, - dtype=np.float64, - meta=np.array((), dtype=np.float64), - ) +def _bump_dask_numpy(data, width, height, locs, heights, spread): + import dask + + partitions = _partition_bumps(data, locs, heights, spread) + rows = [] + for yi, ch in enumerate(data.chunks[0]): + row = [] + for xi, cw in enumerate(data.chunks[1]): + ch_int, cw_int = int(ch), int(cw) + part = partitions[(yi, xi)] + if part is None: + row.append(da.zeros((ch_int, cw_int), + chunks=(ch_int, cw_int), + dtype=np.float64)) + else: + p_locs, p_heights = part + delayed = dask.delayed(_finish_bump)( + cw_int, ch_int, p_locs, p_heights, spread) + row.append(da.from_delayed(delayed, + shape=(ch_int, cw_int), + dtype=np.float64)) + rows.append(row) + return da.block(rows) def _bump_dask_cupy(data, width, height, locs, heights, spread): - def _chunk_bump(block, block_info=None): - info = block_info[0] - y_start, y_end = info['array-location'][0] - x_start, x_end = info['array-location'][1] - chunk_h, chunk_w = block.shape - - mask = ((locs[:, 0] >= x_start) & (locs[:, 0] < x_end) & - (locs[:, 1] >= y_start) & (locs[:, 1] < y_end)) - - if not np.any(mask): - return cupy.zeros((chunk_h, chunk_w)) - - local_locs = locs[mask] - local_locs[:, 0] -= x_start - local_locs[:, 1] -= y_start - - return cupy.asarray( - _finish_bump(chunk_w, chunk_h, local_locs, heights[mask], spread) - ) - - return da.map_blocks( - _chunk_bump, data, - dtype=np.float64, - meta=cupy.array((), dtype=np.float64), - ) + import dask + + def _finish_bump_cupy(cw, ch, p_locs, p_heights, spread): + return cupy.asarray(_finish_bump(cw, ch, p_locs, p_heights, spread)) + + partitions = _partition_bumps(data, locs, heights, spread) + rows = [] + for yi, ch in enumerate(data.chunks[0]): + row = [] + for xi, cw in enumerate(data.chunks[1]): + ch_int, cw_int = int(ch), int(cw) + part = partitions[(yi, xi)] + if part is None: + row.append(da.zeros((ch_int, cw_int), + chunks=(ch_int, cw_int), + dtype=np.float64)) + else: + p_locs, p_heights = part + delayed = dask.delayed(_finish_bump_cupy)( + cw_int, ch_int, p_locs, p_heights, spread) + row.append(da.from_delayed(delayed, + shape=(ch_int, cw_int), + dtype=np.float64)) + rows.append(row) + return da.block(rows) def bump(width: int = None, @@ -286,13 +338,24 @@ def heights(locations, src, src_range, height = 20): liny = range(h) if count is None: - count = w * h // 10 + count = min(w * h // 10, _MAX_DEFAULT_COUNT) + + # 16 bytes per bump: 2 x int32 coords + 1 x float64 height + required_bytes = count * 16 + available = _available_memory_bytes() + if required_bytes > 0.5 * available: + raise MemoryError( + f"bump() with count={count:,} requires ~{required_bytes / 1e9:.1f} GB " + f"for location/height arrays, but only " + f"{available / 1e9:.1f} GB is available. " + f"Pass a smaller count explicitly." + ) if height_func is None: height_func = lambda bumps: np.ones(len(bumps)) # noqa # create 2d array of random x, y for bump locations - locs = np.empty((count, 2), dtype=np.uint16) + locs = np.empty((count, 2), dtype=np.int32) locs[:, 0] = np.random.choice(linx, count) locs[:, 1] = np.random.choice(liny, count) diff --git a/xrspatial/tests/test_bump.py b/xrspatial/tests/test_bump.py index 82c433dd..74f37741 100644 --- a/xrspatial/tests/test_bump.py +++ b/xrspatial/tests/test_bump.py @@ -179,3 +179,84 @@ def test_bump_spread_reaches_both_sides_1102(): # Pixels well outside spread should be 0 assert out[5, 9] == 0, "well outside spread should be 0" assert out[0, 0] == 0, "corner should be 0" + + +# --- Issue #1206 regression tests --- + +def test_bump_locs_use_int32_not_uint16(): + """Coordinates > 65535 must not wrap around (#1206).""" + from xrspatial.bump import _finish_bump + + # Place a bump at x=70000. With uint16 this wraps to 4464. + locs = np.array([[70000, 50]], dtype=np.int32) + heights = np.array([10.0]) + out = _finish_bump(80000, 100, locs, heights, spread=0) + + assert out[50, 70000] > 0, "bump should land at the actual coordinate" + assert out[50, 4464] == 0, "uint16 wrap-around location should be empty" + + +def test_bump_default_count_capped(): + """Default count should not exceed _MAX_DEFAULT_COUNT (#1206).""" + from xrspatial.bump import _MAX_DEFAULT_COUNT + + # 20000 x 20000 → w*h//10 = 40M, should be capped to 10M + result = bump(width=20000, height=20000, spread=0) + assert result.shape == (20000, 20000) + # The number of non-zero pixels can't exceed count (no spread means + # only center pixels get values), so verify the cap took effect + nonzero = np.count_nonzero(result.values) + assert nonzero <= _MAX_DEFAULT_COUNT + + +def test_bump_memory_guard_raises(): + """Memory guard should reject huge explicit count (#1206).""" + import pytest + + # Request more bumps than memory can hold (50 billion) + with pytest.raises(MemoryError, match="bump.*count"): + bump(width=100, height=100, count=50_000_000_000) + + +@dask_array_available +def test_bump_dask_closure_size(): + """Dask task graph should not serialize full locs into every chunk (#1206). + + With per-chunk partitioning, the total serialized data in the graph + should be proportional to the total bump count, not count * n_chunks. + """ + import pickle + + agg = xr.DataArray( + da.zeros((100, 100), chunks=(25, 25), dtype=np.float64), + dims=['y', 'x'], + ) + np.random.seed(42) + result = bump(agg=agg, count=200, spread=1) + graph_bytes = len(pickle.dumps(result.data.__dask_graph__())) + + # With the old closure approach, 200 bumps * 16 chunks * 16 bytes/bump + # = ~51 kB minimum from duplicated arrays. With partitioning, the total + # is ~200 * 16 = 3.2 kB plus overhead. Allow generous headroom but + # ensure it's well below the duplication threshold. + max_expected = 200 * 16 * 4 # 4x overhead for pickle framing + assert graph_bytes < max_expected, ( + f"graph size {graph_bytes} bytes suggests locs are duplicated per chunk" + ) + + +@dask_array_available +def test_bump_dask_partitioned_matches_numpy(): + """Partitioned dask path should produce the same result as numpy + when all bumps fit in a single chunk (#1206).""" + agg_np = xr.DataArray(np.zeros((30, 40)), dims=['y', 'x']) + agg_dask = agg_np.copy() + agg_dask.data = da.from_array(agg_dask.data, chunks=(30, 40)) + + np.random.seed(123) + result_np = bump(agg=agg_np, count=50, spread=2) + + np.random.seed(123) + result_dask = bump(agg=agg_dask, count=50, spread=2) + + np.testing.assert_array_equal(result_np.values, result_dask.values)