Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 111 additions & 48 deletions xrspatial/bump.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,28 @@ class cupy(object):

from xrspatial.utils import ArrayTypeFunctionMapping, _validate_scalar, ngjit

# Upper bound on bump count to prevent accidental OOM from the default
# w*h//10 heuristic. 16 bytes per bump (int32 loc pair + float64 height),
# so 10M bumps ~ 160 MB.
_MAX_DEFAULT_COUNT = 10_000_000


def _available_memory_bytes():
"""Best-effort estimate of available memory in bytes."""
try:
with open('/proc/meminfo', 'r') as f:
for line in f:
if line.startswith('MemAvailable:'):
return int(line.split()[1]) * 1024
except (OSError, ValueError, IndexError):
pass
try:
import psutil
return psutil.virtual_memory().available
except (ImportError, AttributeError):
pass
return 2 * 1024 ** 3


@ngjit
def _finish_bump(width, height, locs, heights, spread):
Expand Down Expand Up @@ -45,58 +67,88 @@ def _bump_cupy(data, width, height, locs, heights, spread):
return cupy.asarray(_finish_bump(width, height, locs, heights, spread))


def _bump_dask_numpy(data, width, height, locs, heights, spread):
def _chunk_bump(block, block_info=None):
info = block_info[0]
y_start, y_end = info['array-location'][0]
x_start, x_end = info['array-location'][1]
chunk_h, chunk_w = block.shape

mask = ((locs[:, 0] >= x_start) & (locs[:, 0] < x_end) &
(locs[:, 1] >= y_start) & (locs[:, 1] < y_end))

if not np.any(mask):
return np.zeros((chunk_h, chunk_w))
def _partition_bumps(data, locs, heights, spread):
"""Split bumps into per-chunk subsets so closures stay small.

local_locs = locs[mask]
local_locs[:, 0] -= x_start
local_locs[:, 1] -= y_start
Returns a dict mapping ``(yi, xi)`` chunk indices to
``(local_locs, local_heights)`` pairs (or *None* when a chunk has
no bumps). Coordinates in *local_locs* are relative to the chunk
origin. Bumps whose spread overlaps a chunk boundary are assigned
to the chunk that contains their centre pixel.
"""
y_offsets = np.concatenate([[0], np.cumsum(data.chunks[0])])
x_offsets = np.concatenate([[0], np.cumsum(data.chunks[1])])
ny = len(data.chunks[0])
nx = len(data.chunks[1])

partitions = {}
for yi in range(ny):
y0, y1 = int(y_offsets[yi]), int(y_offsets[yi + 1])
for xi in range(nx):
x0, x1 = int(x_offsets[xi]), int(x_offsets[xi + 1])
mask = ((locs[:, 0] >= x0) & (locs[:, 0] < x1) &
(locs[:, 1] >= y0) & (locs[:, 1] < y1))
if np.any(mask):
local_locs = locs[mask].copy()
local_locs[:, 0] -= x0
local_locs[:, 1] -= y0
partitions[(yi, xi)] = (local_locs, heights[mask].copy())
else:
partitions[(yi, xi)] = None
return partitions

return _finish_bump(chunk_w, chunk_h, local_locs, heights[mask], spread)

return da.map_blocks(
_chunk_bump, data,
dtype=np.float64,
meta=np.array((), dtype=np.float64),
)
def _bump_dask_numpy(data, width, height, locs, heights, spread):
import dask

partitions = _partition_bumps(data, locs, heights, spread)
rows = []
for yi, ch in enumerate(data.chunks[0]):
row = []
for xi, cw in enumerate(data.chunks[1]):
ch_int, cw_int = int(ch), int(cw)
part = partitions[(yi, xi)]
if part is None:
row.append(da.zeros((ch_int, cw_int),
chunks=(ch_int, cw_int),
dtype=np.float64))
else:
p_locs, p_heights = part
delayed = dask.delayed(_finish_bump)(
cw_int, ch_int, p_locs, p_heights, spread)
row.append(da.from_delayed(delayed,
shape=(ch_int, cw_int),
dtype=np.float64))
rows.append(row)
return da.block(rows)


def _bump_dask_cupy(data, width, height, locs, heights, spread):
def _chunk_bump(block, block_info=None):
info = block_info[0]
y_start, y_end = info['array-location'][0]
x_start, x_end = info['array-location'][1]
chunk_h, chunk_w = block.shape

mask = ((locs[:, 0] >= x_start) & (locs[:, 0] < x_end) &
(locs[:, 1] >= y_start) & (locs[:, 1] < y_end))

if not np.any(mask):
return cupy.zeros((chunk_h, chunk_w))

local_locs = locs[mask]
local_locs[:, 0] -= x_start
local_locs[:, 1] -= y_start

return cupy.asarray(
_finish_bump(chunk_w, chunk_h, local_locs, heights[mask], spread)
)

return da.map_blocks(
_chunk_bump, data,
dtype=np.float64,
meta=cupy.array((), dtype=np.float64),
)
import dask

def _finish_bump_cupy(cw, ch, p_locs, p_heights, spread):
return cupy.asarray(_finish_bump(cw, ch, p_locs, p_heights, spread))

partitions = _partition_bumps(data, locs, heights, spread)
rows = []
for yi, ch in enumerate(data.chunks[0]):
row = []
for xi, cw in enumerate(data.chunks[1]):
ch_int, cw_int = int(ch), int(cw)
part = partitions[(yi, xi)]
if part is None:
row.append(da.zeros((ch_int, cw_int),
chunks=(ch_int, cw_int),
dtype=np.float64))
else:
p_locs, p_heights = part
delayed = dask.delayed(_finish_bump_cupy)(
cw_int, ch_int, p_locs, p_heights, spread)
row.append(da.from_delayed(delayed,
shape=(ch_int, cw_int),
dtype=np.float64))
rows.append(row)
return da.block(rows)


def bump(width: int = None,
Expand Down Expand Up @@ -286,13 +338,24 @@ def heights(locations, src, src_range, height = 20):
liny = range(h)

if count is None:
count = w * h // 10
count = min(w * h // 10, _MAX_DEFAULT_COUNT)

# 16 bytes per bump: 2 x int32 coords + 1 x float64 height
required_bytes = count * 16
available = _available_memory_bytes()
if required_bytes > 0.5 * available:
raise MemoryError(
f"bump() with count={count:,} requires ~{required_bytes / 1e9:.1f} GB "
f"for location/height arrays, but only "
f"{available / 1e9:.1f} GB is available. "
f"Pass a smaller count explicitly."
)

if height_func is None:
height_func = lambda bumps: np.ones(len(bumps)) # noqa

# create 2d array of random x, y for bump locations
locs = np.empty((count, 2), dtype=np.uint16)
locs = np.empty((count, 2), dtype=np.int32)
locs[:, 0] = np.random.choice(linx, count)
locs[:, 1] = np.random.choice(liny, count)

Expand Down
81 changes: 81 additions & 0 deletions xrspatial/tests/test_bump.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,84 @@ def test_bump_spread_reaches_both_sides_1102():
# Pixels well outside spread should be 0
assert out[5, 9] == 0, "well outside spread should be 0"
assert out[0, 0] == 0, "corner should be 0"


# --- Issue #1206 regression tests ---

def test_bump_locs_use_int32_not_uint16():
"""Coordinates > 65535 must not wrap around (#1206)."""
from xrspatial.bump import _finish_bump

# Place a bump at x=70000. With uint16 this wraps to 4464.
locs = np.array([[70000, 50]], dtype=np.int32)
heights = np.array([10.0])
out = _finish_bump(80000, 100, locs, heights, spread=0)

assert out[50, 70000] > 0, "bump should land at the actual coordinate"
assert out[50, 4464] == 0, "uint16 wrap-around location should be empty"


def test_bump_default_count_capped():
"""Default count should not exceed _MAX_DEFAULT_COUNT (#1206)."""
from xrspatial.bump import _MAX_DEFAULT_COUNT

# 20000 x 20000 → w*h//10 = 40M, should be capped to 10M
result = bump(width=20000, height=20000, spread=0)
assert result.shape == (20000, 20000)
# The number of non-zero pixels can't exceed count (no spread means
# only center pixels get values), so verify the cap took effect
nonzero = np.count_nonzero(result.values)
assert nonzero <= _MAX_DEFAULT_COUNT


def test_bump_memory_guard_raises():
"""Memory guard should reject huge explicit count (#1206)."""
import pytest

# Request more bumps than memory can hold (50 billion)
with pytest.raises(MemoryError, match="bump.*count"):
bump(width=100, height=100, count=50_000_000_000)


@dask_array_available
def test_bump_dask_closure_size():
"""Dask task graph should not serialize full locs into every chunk (#1206).

With per-chunk partitioning, the total serialized data in the graph
should be proportional to the total bump count, not count * n_chunks.
"""
import pickle

agg = xr.DataArray(
da.zeros((100, 100), chunks=(25, 25), dtype=np.float64),
dims=['y', 'x'],
)
np.random.seed(42)
result = bump(agg=agg, count=200, spread=1)
graph_bytes = len(pickle.dumps(result.data.__dask_graph__()))

# With the old closure approach, 200 bumps * 16 chunks * 16 bytes/bump
# = ~51 kB minimum from duplicated arrays. With partitioning, the total
# is ~200 * 16 = 3.2 kB plus overhead. Allow generous headroom but
# ensure it's well below the duplication threshold.
max_expected = 200 * 16 * 4 # 4x overhead for pickle framing
assert graph_bytes < max_expected, (
f"graph size {graph_bytes} bytes suggests locs are duplicated per chunk"
)


@dask_array_available
def test_bump_dask_partitioned_matches_numpy():
"""Partitioned dask path should produce the same result as numpy
when all bumps fit in a single chunk (#1206)."""
agg_np = xr.DataArray(np.zeros((30, 40)), dims=['y', 'x'])
agg_dask = agg_np.copy()
agg_dask.data = da.from_array(agg_dask.data, chunks=(30, 40))

np.random.seed(123)
result_np = bump(agg=agg_np, count=50, spread=2)

np.random.seed(123)
result_dask = bump(agg=agg_dask, count=50, spread=2)

np.testing.assert_array_equal(result_np.values, result_dask.values)
Loading