From 01951136991de877e3af90b2b09c8d91f7272c37 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 1 Jul 2026 09:07:12 -0500 Subject: [PATCH] feat(waterdata): add chunk_granularity to control OGC chunk fan-out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OGC getters chunk a multi-value request only as far as the server's ~8 KB URL limit forces — the fewest sub-requests. But because every sub-request paginates, splitting a large result further is usually quota-neutral, so that conservative default can be needlessly coarse: ten states pulled as one under-limit request page just as many times as ten per-state requests would. Add `waterdata.chunk_granularity(level)`, a context manager that lets a caller who knows their pull is large opt into a finer split — trading the same pages for more, smaller sub-requests (smoother progress, more even concurrency, a smaller unit of retry/resume). The level is "low", "medium", or "high" (typed as `GranularityLevel`, a Literal, so a type checker rejects anything else; an invalid string raises ValueError at the `with`). Each level caps how many sub-chunks a multi-value argument is split into, derived from the default fan-out concurrency (`API_USGS_CONCURRENT`): high = the full width, medium a quarter, low a sixteenth (32 / 8 / 2 by default). Capping the aggressive end at the concurrency width bounds the blast radius so an accidental "high" on a huge list can't explode into thousands of sub-requests. There is no "off" level — not entering the block is off. It is a scoped `with` block, not an env var, because the library can't tell in advance whether a query is large (a short-window query might fit one page, where extra chunks only burn quota). Implementation: a soft `ChunkPlan._refine` pass runs after the hard byte pass; it only ever splits further, so the url_limit invariant holds and it never raises. The resolved per-axis cap is read from a contextvar (Ambient) set by the context manager at plan-construction time. Exported (with the `GranularityLevel` type) from `dataretrieval.waterdata` and the top-level `dataretrieval` package. Co-Authored-By: Claude Opus 4.8 (1M context) --- NEWS.md | 2 + dataretrieval/__init__.py | 8 + dataretrieval/ogc/chunking.py | 147 ++++++++++++++++- dataretrieval/ogc/planning.py | 97 +++++++++-- dataretrieval/waterdata/__init__.py | 3 + docs/source/userguide/errors.rst | 35 ++++ tests/utils_test.py | 18 ++ tests/waterdata_chunking_test.py | 248 ++++++++++++++++++++++++++++ 8 files changed, 542 insertions(+), 16 deletions(-) diff --git a/NEWS.md b/NEWS.md index a86a0314..c2d4b259 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**07/01/2026:** Added `waterdata.chunk_granularity(...)` — a context manager to control how finely the OGC `waterdata` getters split multi-value requests. By default the getters chunk only as much as the server's ~8 KB URL limit forces (the fewest sub-requests). But because every sub-request paginates, splitting a large result further is usually quota-neutral, so a caller who *knows* their pull is large can opt into a finer split for smoother progress, more even concurrency, and a smaller unit of retry/resume: `with waterdata.chunk_granularity("high"): df, md = waterdata.get_daily(monitoring_location_id=many_sites)`. The level is one of `"low"`, `"medium"`, or `"high"` — typed as `waterdata.GranularityLevel` (a `typing.Literal`), so a type checker rejects any other value and an invalid string raises `ValueError` at the `with`. Each level caps how many sub-chunks a multi-value argument is split into — `"low"` / `"medium"` / `"high"` cap at 2 / 8 / 32. That ceiling is a granularity constant, deliberately independent of the fan-out concurrency (`API_USGS_CONCURRENT`): how finely a query splits is orthogonal to how many sub-requests run at once. Capping the aggressive end at 32 bounds the blast radius — an accidental `"high"` on a huge list can't explode into thousands of sub-requests. There is no "off" level: not entering the block *is* off. It is deliberately a scoped `with` block rather than an automatic behavior or an environment variable, since the library can't tell in advance whether a query is large — a short-window query might fit in a single page, where extra chunks would only burn quota. Also exported at the top level as `dataretrieval.chunk_granularity`. + **06/23/2026:** **Breaking change (1.2.0):** the minimum supported Python is now **3.10** (`requires-python = ">=3.10"`). 3.9 support was already effectively broken — the `waterdata` module's dependencies (`anyio`, the test stack) require 3.10+, and the `waterdata` test modules already skipped on <3.10. `anyio` is now declared as a direct dependency (it is imported directly by `waterdata`), and the CI/ruff/mypy targets move to 3.10. Also fully removed the deprecated `variable_info` metadata property: the `NWIS_Metadata` override only warned and returned `None` (it relied on the defunct `get_pmcodes`), and the `BaseMetadata` abstract is gone too since nothing implemented it — accessing `.variable_info` now raises `AttributeError`. `site_info` is unaffected. **06/23/2026:** **Breaking change (1.2.0):** removed the `nadp` module and the deprecated `samples` module ahead of the 1.2.0 release. `nadp` was deprecated on 05/01/2026 — NADP is not a USGS data source, so retrieve NADP data directly from https://nadp.slh.wisc.edu/. The `samples.get_usgs_samples` shim (a deprecated forward to the modern getter) is gone; use `waterdata.get_samples()` instead. `import dataretrieval.nadp` / `import dataretrieval.samples` now raise `ModuleNotFoundError`. diff --git a/dataretrieval/__init__.py b/dataretrieval/__init__.py index c9df1c45..ec79a724 100644 --- a/dataretrieval/__init__.py +++ b/dataretrieval/__init__.py @@ -44,6 +44,11 @@ URLTooLong, ) +# Chunk-granularity control (a context manager) and its level type. Defined with +# the chunker in ``dataretrieval.ogc.chunking``; surfaced here for a stable +# public path ``from dataretrieval import chunk_granularity``. +from dataretrieval.ogc.chunking import GranularityLevel, chunk_granularity + # Resumable chunk-interruption exceptions. They are defined in # ``dataretrieval.ogc.interruptions`` rather than ``dataretrieval.exceptions`` # because they carry pandas/httpx state and a resumable ``ChunkedCall`` handle, @@ -93,5 +98,8 @@ "ChunkInterrupted", "QuotaExhausted", "ServiceInterrupted", + # chunk-granularity control (defined in ogc.chunking) + "chunk_granularity", + "GranularityLevel", "__version__", ] diff --git a/dataretrieval/ogc/chunking.py b/dataretrieval/ogc/chunking.py index 00d2766c..5bbcec56 100644 --- a/dataretrieval/ogc/chunking.py +++ b/dataretrieval/ogc/chunking.py @@ -9,6 +9,12 @@ cartesian product of chunks. Requests that already fit get a trivial single-step plan — ``ChunkedCall`` has one code path either way. +Granularity: the planner is conservative by default — it splits only as far as +the byte limit forces. A caller who knows their result is large can opt into a +finer split via the ``chunk_granularity`` context manager +(``"low"`` / ``"medium"`` / ``"high"``); the resolved cap drives +:meth:`ChunkPlan._refine`. See ``chunk_granularity`` for the why and the when. + This module owns the *execution* half — the event loop and bounded concurrency that drive a plan to completion (``ChunkedCall``) plus the public ``multi_value_chunked`` decorator. The neighboring concerns live in @@ -69,8 +75,9 @@ import functools import os from collections.abc import Callable, Iterator +from contextlib import contextmanager from contextvars import copy_context -from typing import Any, cast +from typing import Any, Literal, cast, get_args import httpx import pandas as pd @@ -172,6 +179,130 @@ def get_active_client() -> httpx.AsyncClient | None: return _chunked_client.get() +# Chunk-granularity dial: opt-in to fan a query out *more finely* than the byte +# limit alone requires. Scoped to a ``with chunk_granularity(...):`` block (a +# ContextVar), deliberately NOT an env var (see :func:`chunk_granularity` for +# why). The ambient holds the resolved cap on sub-chunks per axis; ``0`` (the +# default, outside any block) means "chunk only as much as the byte limit needs". +_granularity: Ambient[int] = Ambient("ogc_chunk_granularity", 0) + +#: The three accepted granularity levels, as a typing ``Literal`` so a type +#: checker rejects any other value at the call site. +GranularityLevel = Literal["low", "medium", "high"] + +#: Valid levels derived from the type, so ``GranularityLevel`` stays the single +#: source of truth for what ``_resolve_granularity`` accepts (mirrors the +#: ``get_args``-based ``_VALID_ON_TIE`` / ``_VALID_FILE_TYPES`` in sibling +#: modules). +_VALID_LEVELS: tuple[GranularityLevel, ...] = get_args(GranularityLevel) + +# Granularity's own ceiling on sub-chunks per axis — deliberately NOT tied to the +# concurrency default: fan-out *volume* (how many sub-requests a query becomes) +# is orthogonal to how many run at once (``API_USGS_CONCURRENT``). 32 is a sane +# single-axis ceiling that also bounds the blast radius of an accidental +# ``"high"`` on a very long list; the milder levels are a quarter and a +# sixteenth of it (the three are spaced 4x apart). +_GRANULARITY_MAX_CHUNKS = 32 +_GRANULARITY_LEVELS: dict[str, int] = { + "low": _GRANULARITY_MAX_CHUNKS // 16, # 2 + "medium": _GRANULARITY_MAX_CHUNKS // 4, # 8 + "high": _GRANULARITY_MAX_CHUNKS, # 32 +} + + +def _resolve_granularity(level: GranularityLevel) -> int: + """ + Map a granularity level name to its per-axis sub-chunk cap. + + Parameters + ---------- + level : {"low", "medium", "high"} + The user-supplied level. + + Returns + ------- + int + The maximum sub-chunks per multi-value axis for that level + (``2`` / ``8`` / ``32``). + + Raises + ------ + ValueError + If ``level`` is anything other than ``"low"``, ``"medium"``, or + ``"high"`` — raised eagerly (at the ``with`` statement) so a typo fails + loudly rather than silently doing nothing. + """ + if level not in _VALID_LEVELS: + raise ValueError( + f"chunk_granularity level must be one of {_VALID_LEVELS}; got {level!r}." + ) + return _GRANULARITY_LEVELS[level] + + +@contextmanager +def chunk_granularity(level: GranularityLevel) -> Iterator[None]: + """ + Scope how finely the OGC getters chunk multi-value requests. + + By default the Water Data / NGWMN getters chunk a request only as much as + the server's ~8 KB URL-byte limit forces — the fewest sub-requests that + fit. That is the safe default, but it can be *needlessly* conservative: + because every sub-request paginates, splitting a large result further is + usually quota-neutral (ten states pulled as one under-limit request page + just as many times as ten per-state requests would). This context manager + lets a caller who *knows* their pull is large ask for that finer split — + trading the same pages for more, smaller sub-requests, which gives smoother + progress, more even concurrency, and a smaller unit of retry/resume. + + Because the library can't tell in advance whether a query is large (ten + states over a short window might fit in a single page, where extra chunks + would only burn quota), this is a *deliberate* per-call knob rather than an + automatic behavior or a process-wide environment variable — scoping it to a + ``with`` block keeps an aggressive setting from leaking into unrelated calls + and accidentally spending quota. Outside any block the getters use the + conservative default; there is no "off" level because *not* entering the + block is off. Only the OGC getters (Water Data, NGWMN) read this; wrapping a + legacy NWIS call in the block is a harmless no-op. + + Parameters + ---------- + level : {"low", "medium", "high"} + How aggressively to chunk within the block. Each level caps how many + sub-chunks a single multi-value argument is split into — ``2`` / ``8`` / + ``32`` for ``"low"`` / ``"medium"`` / ``"high"`` — or one value per + sub-request once the argument has fewer values than the cap. The ceiling + is fixed (it is *not* tied to ``API_USGS_CONCURRENT``: how finely a query + splits is orthogonal to how many sub-requests run at once); capping + ``"high"`` at ``32`` keeps an accidental aggressive level on a very long + list from exploding into thousands of sub-requests. (With several + multi-value arguments the per-argument counts still multiply.) + + Yields + ------ + None + + Raises + ------ + ValueError + If ``level`` isn't ``"low"``, ``"medium"``, or ``"high"`` — raised on + ``with`` entry, before any request is issued. + + Examples + -------- + >>> from dataretrieval import waterdata + >>> with waterdata.chunk_granularity("high"): + ... df, md = waterdata.get_daily( + ... monitoring_location_id=many_sites, parameter_code="00060" + ... ) # doctest: +SKIP + + See Also + -------- + ChunkPlan._refine : the planning-side effect of the level. + """ + with _granularity(_resolve_granularity(level)): + yield + + class ChunkedCall: """ Stateful handle for a chunked call. @@ -591,8 +722,9 @@ def multi_value_chunked( ``async def fetch(args) -> (df, response)``, and drives it to completion via :meth:`ChunkedCall.resume`. The plan splits multi-value list params and the cql-text filter so each sub-request URL fits the - byte limit; an already-fitting request is a one-step plan. See the - module docstring for the concurrency model. + byte limit; an already-fitting request is a one-step plan, unless an + active :func:`chunk_granularity` block asks the plan to fan out more + finely. See the module docstring for the concurrency model. Parameters ---------- @@ -636,7 +768,14 @@ def wrapper( finalize: _Finalize = _passthrough_result, ) -> tuple[pd.DataFrame, Any]: limit = _OGC_URL_BYTE_LIMIT if url_limit is None else url_limit - plan = ChunkPlan(args, build_request, limit) + # Read the granularity dial from the ambient set by + # ``chunk_granularity`` (0 = off outside any such block; otherwise the + # per-axis sub-chunk cap). It only affects *planning*, done here up + # front, so a later resume — which re-issues the already-planned + # sub-requests — needs no snapshot. + plan = ChunkPlan( + args, build_request, limit, max_chunks_per_axis=_granularity.get() + ) retry_policy = RetryPolicy.from_env() # The concurrency cap is resolved inside ``resume()`` from # ``API_USGS_CONCURRENT``; ``1`` is a sequential gather, diff --git a/dataretrieval/ogc/planning.py b/dataretrieval/ogc/planning.py index 68c337ae..700a0e85 100644 --- a/dataretrieval/ogc/planning.py +++ b/dataretrieval/ogc/planning.py @@ -288,6 +288,21 @@ def _extract_axes(args: dict[str, Any]) -> list[_Axis]: return axes +def _split_at(chunks: list[list[str]], idx: int) -> None: + """Replace ``chunks[idx]`` in place with its two contiguous halves. + + The single primitive both planning passes use to fan an axis out. It + preserves the partition invariants every consumer relies on: *coverage* + (each atom survives, exactly once) and *contiguous, deterministic order* + (resume and :meth:`ChunkPlan.iter_sub_args` depend on it). Kept in one + place so those invariants can't drift between :meth:`ChunkPlan._plan` + (byte-driven) and :meth:`ChunkPlan._refine` (granularity-driven). + """ + chunk = chunks[idx] + mid = len(chunk) // 2 + chunks[idx : idx + 1] = [chunk[:mid], chunk[mid:]] + + class ChunkPlan: """ Strategy for issuing one user-level request as a sequence of @@ -312,7 +327,17 @@ class ChunkPlan: Factory that turns a kwargs dict into a sized httpx request, e.g. ``_construct_api_requests``. url_limit : int - Byte budget for the request (URL + body). + Byte budget for the request (URL + body) — a hard ceiling every + sub-request must fit. + max_chunks_per_axis : int, optional + Soft cap on sub-chunks per multi-value axis (default ``0`` = off). + ``0`` chunks only as much as ``url_limit`` requires — the most + conservative plan, fewest sub-requests. A positive cap fans each axis + out into up to ``min(len(atoms), max_chunks_per_axis)`` pieces (never + fewer than the byte budget already forces), so a large multi-page pull + is issued as more, smaller sub-requests. Set from the resolved + :func:`~dataretrieval.ogc.chunking.chunk_granularity` level; see + :meth:`_refine`. Attributes ---------- @@ -344,6 +369,7 @@ def __init__( args: dict[str, Any], build_request: Callable[..., httpx.Request], url_limit: int, + max_chunks_per_axis: int = 0, ) -> None: self.args = args self.axes: list[_Axis] = [] @@ -352,10 +378,10 @@ def __init__( axes = _extract_axes(args) if not axes: - # No chunkable axis: nothing to split. If the single request fits, - # run it verbatim (the common passthrough). ``_safe_request_bytes`` - # treats an un-constructable URL (httpx.InvalidURL, > 64 KB) as over - # budget. + # No chunkable axis: nothing to split, and ``granularity`` has + # nothing to act on either. If the single request fits, run it + # verbatim (the common passthrough). ``_safe_request_bytes`` treats + # an un-constructable URL (httpx.InvalidURL, > 64 KB) as over budget. if _safe_request_bytes(build_request, args, url_limit) <= url_limit: return # Over budget. A filter the chunker doesn't manage — cql-json — is @@ -388,14 +414,29 @@ def __init__( except httpx.InvalidURL: initial_request = None + fits = False if initial_request is not None: self.canonical_url = str(initial_request.url) - if _request_bytes(initial_request) <= url_limit: - return + fits = _request_bytes(initial_request) <= url_limit + + # A request that already fits and hasn't opted into finer chunking is + # the common passthrough: leave ``axes``/``chunks`` empty so + # ``total == 1`` and ``iter_sub_args`` yields the original args + # verbatim. Only when ``max_chunks_per_axis`` asks for extra fan-out do + # we set the axes up to be refined below. + if fits and max_chunks_per_axis <= 0: + return self.axes = axes self.chunks = {axis.arg_key: [list(axis.atoms)] for axis in axes} - self._plan(build_request, url_limit) + if not fits: + # Hard pass: greedy-halve until every worst-case sub-request fits + # the byte budget (may raise ``Unchunkable``). + self._plan(build_request, url_limit) + # Soft pass: optionally split further than the byte budget requires. + # Purely additive — never re-raises, and the byte budget stays + # satisfied; a no-op at ``max_chunks_per_axis <= 0``. + self._refine(max_chunks_per_axis) if self.canonical_url is None: # Original URL was un-constructable (httpx.InvalidURL); fall @@ -447,10 +488,42 @@ def _plan( f"sub-request). Reduce input sizes, shorten or simplify " f"the filter, or split the call manually." ) - axis_chunks = self.chunks[biggest_axis.arg_key] - chunk = axis_chunks[biggest_idx] - mid = len(chunk) // 2 - axis_chunks[biggest_idx : biggest_idx + 1] = [chunk[:mid], chunk[mid:]] + _split_at(self.chunks[biggest_axis.arg_key], biggest_idx) + + def _refine(self, max_chunks_per_axis: int) -> None: + """ + Fan each axis out more finely than the byte budget alone requires — + the granularity dial (see + :func:`~dataretrieval.ogc.chunking.chunk_granularity` for why a caller + would want this). + + Each axis is split until it holds at least + ``min(len(atoms), max_chunks_per_axis)`` chunks — up to the cap, or one + atom per chunk for a shorter axis. Purely additive — only ever *splits* + existing chunks, so the byte pass's work and the ``url_limit`` invariant + are both preserved (an axis the byte pass already split past the cap is + left alone), and it never raises. A no-op at ``max_chunks_per_axis <= 0``. + + Parameters + ---------- + max_chunks_per_axis : int + Soft cap on sub-chunks per axis (``0`` = off), the resolved + granularity level. A shorter axis simply saturates at one atom per + chunk. + """ + if max_chunks_per_axis <= 0: + return + for axis in self.axes: + chunks = self.chunks[axis.arg_key] + target = min(len(axis.atoms), max_chunks_per_axis) + # ``target <= len(atoms)`` guarantees a splittable chunk each pass, so + # this reaches exactly ``target`` and terminates. Split the chunk with + # the most *atoms* (``_plan`` splits by *bytes*): here we even out + # cardinality for smooth fan-out, not URL size. Ties take the lowest + # index, keeping the split deterministic. + while len(chunks) < target: + idx, _ = max(enumerate(chunks), key=lambda kv: len(kv[1])) + _split_at(chunks, idx) def _worst_case_args(self) -> dict[str, Any]: """ diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 99b6e178..d70a3d74 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -9,6 +9,7 @@ from __future__ import annotations +from dataretrieval.ogc.chunking import GranularityLevel, chunk_granularity from dataretrieval.ogc.filters import FILTER_LANG # Public API exports @@ -50,6 +51,8 @@ "PROFILE_LOOKUP", "SERVICES", "WATERDATA_SERVICES", + "GranularityLevel", + "chunk_granularity", "get_channel", "get_codes", "get_combined_metadata", diff --git a/docs/source/userguide/errors.rst b/docs/source/userguide/errors.rst index e2dc3ef1..10b1a2c7 100644 --- a/docs/source/userguide/errors.rst +++ b/docs/source/userguide/errors.rst @@ -96,6 +96,41 @@ condition clears -- only the unfinished sub-requests are re-issued. except ChunkInterrupted as again: exc = again +Chunk a large request more finely +================================= + +By default the getters split an over-large request only as much as the +server's ~8 KB URL limit forces -- the fewest sub-requests. Because each +sub-request paginates, splitting a large result further is usually +quota-neutral (ten states pulled as one under-limit request page just as many +times as ten per-state requests would), so if you *know* your pull is large +you can ask for a finer split with ``chunk_granularity`` -- trading the same +pages for more, smaller sub-requests, which gives smoother progress, more even +concurrency, and a smaller unit of retry/resume. It is a scoped ``with`` +block, so an aggressive setting can't leak into unrelated calls and +accidentally spend quota: + +.. code-block:: python + + from dataretrieval import waterdata + + with waterdata.chunk_granularity("high"): + df, md = waterdata.get_daily( + monitoring_location_id=many_sites, parameter_code="00060" + ) + +The level is one of ``"low"``, ``"medium"``, or ``"high"`` (an invalid value +raises ``ValueError`` at the ``with``). Each caps how many sub-chunks a +multi-value argument is split into -- ``2`` / ``8`` / ``32`` for +``"low"`` / ``"medium"`` / ``"high"``. That ceiling is fixed, deliberately +independent of the fan-out concurrency (``API_USGS_CONCURRENT``): how finely a +query splits is orthogonal to how many sub-requests run at once. Capping the +aggressive end at 32 is a guardrail -- an accidental ``"high"`` on a very long +list can't explode into thousands of sub-requests. There is no "off" level: +simply don't enter the block unless you already expect a large, multi-page +result -- on a query that would have fit in a single page, extra chunks only +burn quota. + The full taxonomy ================= diff --git a/tests/utils_test.py b/tests/utils_test.py index b3201419..8ca4630d 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -208,6 +208,24 @@ def test_chunk_interruptions_exported_at_top_level(self): dataretrieval.ChunkInterrupted, dataretrieval.DataRetrievalError ) + def test_chunk_granularity_exported_at_top_level_and_waterdata(self): + """The ``chunk_granularity`` context manager and its ``GranularityLevel`` + type are reachable both from the top level (``from dataretrieval import + chunk_granularity``) and from the user-facing ``dataretrieval.waterdata`` + namespace, and both resolve to the single objects defined in + ``dataretrieval.ogc.chunking``.""" + import dataretrieval + from dataretrieval import waterdata + from dataretrieval.ogc import chunking + + assert dataretrieval.chunk_granularity is chunking.chunk_granularity + assert waterdata.chunk_granularity is chunking.chunk_granularity + assert dataretrieval.GranularityLevel is chunking.GranularityLevel + assert waterdata.GranularityLevel is chunking.GranularityLevel + for name in ("chunk_granularity", "GranularityLevel"): + assert name in dataretrieval.__all__ + assert name in waterdata.__all__ + class Test_BaseMetadata: """Tests of BaseMetadata""" diff --git a/tests/waterdata_chunking_test.py b/tests/waterdata_chunking_test.py index 5a946946..6e0a3ec1 100644 --- a/tests/waterdata_chunking_test.py +++ b/tests/waterdata_chunking_test.py @@ -40,8 +40,13 @@ from dataretrieval.ogc import chunking as _chunking from dataretrieval.ogc import retry as _retry_mod from dataretrieval.ogc.chunking import ( + _GRANULARITY_LEVELS, + _GRANULARITY_MAX_CHUNKS, ChunkedCall, _chunked_client, + _granularity, + _resolve_granularity, + chunk_granularity, get_active_client, multi_value_chunked, ) @@ -2105,3 +2110,246 @@ async def fetch(args): assert "finalized" in df.columns assert md[0] == "METADATA" assert calls["finalize"] >= 1 + + +# --------------------------------------------------------------------------- +# Chunk granularity: the opt-in dial (``"low"`` / ``"medium"`` / ``"high"``) to +# fan a query out MORE finely than the byte limit alone requires +# (``ChunkPlan._refine`` + the ``chunk_granularity`` context manager). +# ``_fake_build``'s base is 200 bytes, so a handful of short atoms sits far +# under ``url_limit=8000`` — the byte pass passes it through untouched, and any +# splitting below is the granularity cap alone. ``ChunkPlan`` takes the resolved +# integer cap (``max_chunks_per_axis``) directly; ``chunk_granularity`` / +# ``_resolve_granularity`` map the level names onto it. +# --------------------------------------------------------------------------- + + +def test_zero_cap_preserves_passthrough(): + """``max_chunks_per_axis=0`` (the default) must not perturb the existing + plan: a multi-value request that fits the byte limit is still the trivial + passthrough (no axes, ``total == 1``), byte-for-byte the pre-feature + behavior.""" + args = {"monitoring_location_id": ["A", "B", "C", "D"]} + plan = ChunkPlan(args, _fake_build, url_limit=8000, max_chunks_per_axis=0) + assert plan.axes == [] + assert plan.total == 1 + assert list(plan.iter_sub_args()) == [args] + + +@pytest.mark.parametrize( + ("max_chunks_per_axis", "expected_pieces"), + [(0, 1), (2, 2), (8, 8), (16, 10), (32, 10)], +) +def test_cap_ramps_then_saturates(max_chunks_per_axis, expected_pieces): + """A single 10-atom axis that fits the byte limit splits into + ``min(10, cap)`` pieces: 1 (off), 2, 8, then saturating at 10 (one atom per + chunk) once the cap overshoots the atom count. Monotonic and bounded, and + whenever it splits the partition is a cover — every atom exactly once. (The + cap-0 passthrough has no axis to cover; see the passthrough test.)""" + atoms = [f"S{i:02d}" for i in range(10)] + plan = ChunkPlan( + {"monitoring_location_id": atoms}, + _fake_build, + url_limit=8000, + max_chunks_per_axis=max_chunks_per_axis, + ) + assert plan.total == expected_pieces + if max_chunks_per_axis: + flattened = [ + a for chunk in plan.chunks["monitoring_location_id"] for a in chunk + ] + assert sorted(flattened) == sorted(atoms) + + +def test_cap_bounds_fan_out_for_a_long_axis(): + """The cap is a quota guardrail: at the ``"high"`` cap a 100-atom axis + fans into ``cap`` pieces — NOT 100 singletons — so an accidental + ``chunk_granularity("high")`` on a huge list can't detonate into hundreds + of sub-requests. Every atom is still covered exactly once.""" + high = _GRANULARITY_LEVELS["high"] + atoms = [f"X{i:03d}" for i in range(100)] + plan = ChunkPlan( + {"monitoring_location_id": atoms}, + _fake_build, + url_limit=8000, + max_chunks_per_axis=high, + ) + assert plan.total == high + flattened = [a for chunk in plan.chunks["monitoring_location_id"] for a in chunk] + assert sorted(flattened) == sorted(atoms) + + +def test_cap_below_byte_split_does_not_reduce_fan_out(): + """The cap is purely additive — it can only split further, never coarsen. + A request the byte budget already fans into K>2 chunks is untouched by a + cap of 2 (below K), so the byte-driven plan is preserved.""" + # Heavy axis of four 30-char atoms; a limit tight enough that the byte pass + # must drive every atom into its own sub-request (4 pieces > the cap of 2). + args = {"monitoring_location_id": ["X" * 30, "Y" * 30, "Z" * 30, "W" * 30]} + baseline = ChunkPlan(args, _fake_build, url_limit=250, max_chunks_per_axis=0) + assert baseline.total > 2 # byte pass alone already fanned out past 2 + refined = ChunkPlan(args, _fake_build, url_limit=250, max_chunks_per_axis=2) + # cap 2 < baseline pieces → refine is a no-op here. + assert refined.total == baseline.total + + +def test_cap_never_exceeds_the_byte_budget(): + """Refining on top of an over-budget request keeps the hard invariant: + every sub-request still fits ``url_limit`` (splitting only ever shrinks + a chunk), and the fan-out is at least what the byte pass required.""" + args = {"monitoring_location_id": ["X" * 30, "Y" * 30, "Z" * 30, "W" * 30]} + limit = 310 + byte_only = ChunkPlan(args, _fake_build, url_limit=limit, max_chunks_per_axis=0) + plan = ChunkPlan(args, _fake_build, url_limit=limit, max_chunks_per_axis=32) + assert plan.total >= byte_only.total + for sub in plan.iter_sub_args(): + assert _safe_request_bytes(_fake_build, sub, limit) <= limit + + +def test_cap_refines_the_filter_axis(): + """The dial treats the cql-text ``filter`` axis like any other: an + under-budget filter of N top-level OR-clauses is split along that axis + into ``min(N, cap)`` pieces.""" + clauses = [f"p='{i}'" for i in range(8)] + args = {"filter": " OR ".join(clauses)} + plan = ChunkPlan(args, _fake_build, url_limit=8000, max_chunks_per_axis=4) + assert len(plan.chunks["filter"]) == 4 # min(8, 4) + assert plan.total == 4 + + +def test_cap_multiplies_across_axes(): + """With more than one multi-value axis the per-axis caps multiply — + documented behavior the caller opts into. Two 6-atom axes at a cap of 4 + yield a 4x4 cartesian product.""" + args = { + "monitoring_location_id": [f"L{i}" for i in range(6)], + "parameter_code": [f"{i:05d}" for i in range(6)], + } + plan = ChunkPlan(args, _fake_build, url_limit=8000, max_chunks_per_axis=4) + assert len(plan.chunks["monitoring_location_id"]) == 4 + assert len(plan.chunks["parameter_code"]) == 4 + assert plan.total == 16 + + +def test_cap_does_not_mask_unchunkable(): + """A request with nothing to split that still busts the byte limit must + raise ``Unchunkable`` regardless of the cap — the soft pass has no axis to + act on and must not swallow the hard failure.""" + args = {"monitoring_location_id": "one-huge-scalar"} + with pytest.raises(Unchunkable): + ChunkPlan(args, _fake_build, url_limit=10, max_chunks_per_axis=32) + + +@pytest.mark.parametrize("level", ["low", "medium", "high"]) +def test_resolve_granularity_maps_each_level_to_its_cap(level): + """Each level name resolves to its per-axis sub-chunk cap from the table + (a positive int).""" + assert _resolve_granularity(level) == _GRANULARITY_LEVELS[level] + assert _resolve_granularity(level) >= 1 + + +def test_granularity_levels_ordered_and_spaced(): + """The three caps hold the properties callers rely on: strictly increasing, + ``"high"`` saturating the declared ceiling, ``"low"`` still a real split + (>= 2), and each level spaced 4x from the next. The ceiling is a granularity + constant, deliberately independent of the concurrency default.""" + low = _GRANULARITY_LEVELS["low"] + medium = _GRANULARITY_LEVELS["medium"] + high = _GRANULARITY_LEVELS["high"] + assert low < medium < high + assert high == _GRANULARITY_MAX_CHUNKS + assert low >= 2 + assert medium == high // 4 + assert low == medium // 4 + + +@pytest.mark.parametrize( + "bad", + [ + "off", # a dead keyword form + "LOW", # wrong case — exact match only + " low ", # stray whitespace + 5, # the old integer levels are gone + None, # None not accepted + ["low"], # unhashable → the ``not in`` check still rejects it cleanly + ], +) +def test_resolve_granularity_rejects_everything_but_the_three_levels(bad): + """Only the three exact level strings are accepted; every other value — a + representative of each rejected shape (dead keyword, wrong case, whitespace, + old int, ``None``, unhashable) — raises ``ValueError`` so a typo fails + loudly.""" + with pytest.raises(ValueError, match="chunk_granularity level must be"): + _resolve_granularity(bad) + + +def test_chunk_granularity_scopes_and_restores_the_ambient(): + """The context manager resolves the level to its cap, publishes it on the + ambient for the block, and restores the previous value on exit — including + proper nesting.""" + assert _granularity.get() == 0 + with chunk_granularity("high"): + assert _granularity.get() == _GRANULARITY_LEVELS["high"] + with chunk_granularity("low"): + assert _granularity.get() == _GRANULARITY_LEVELS["low"] + assert _granularity.get() == _GRANULARITY_LEVELS["high"] # outer restored + assert _granularity.get() == 0 # default (off) outside any block + + +def test_chunk_granularity_validates_on_entry(): + """An invalid level raises at ``with`` entry — before any request is + issued — and leaves the ambient untouched.""" + with pytest.raises(ValueError, match="chunk_granularity level must be"): + with chunk_granularity("aggressive"): + pass + assert _granularity.get() == 0 + + +def test_chunk_granularity_high_drives_end_to_end_fan_out(): + """End-to-end: the same fitting request passes through as a single call by + default, but fans into several sub-requests inside a + ``chunk_granularity("high")`` block — and the combined result still + recovers every atom exactly once.""" + sites = [f"S{i:02d}" for i in range(8)] + + calls: list[tuple[str, ...]] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + async def fetch(args): + chunk = tuple(args["monitoring_location_id"]) + calls.append(chunk) + return pd.DataFrame({"site": list(chunk)}), _ok_response() + + # Default: comfortably under the byte limit → one passthrough call. + df_plain, _ = fetch({"monitoring_location_id": sites}) + assert len(calls) == 1 + assert sorted(df_plain["site"]) == sorted(sites) + + calls.clear() + with chunk_granularity("high"): + df_fine, _ = fetch({"monitoring_location_id": sites}) + # 8 atoms at the "high" cap (>= 8) → 8 singleton sub-requests. + assert len(calls) == 8 + assert all(len(chunk) == 1 for chunk in calls) + # Union across chunks recovers the original set, once each. + assert sorted(a for chunk in calls for a in chunk) == sorted(sites) + assert sorted(df_fine["site"]) == sorted(sites) + + +def test_chunk_granularity_low_is_a_gentle_split(): + """``"low"`` is the mildest opt-in: an under-limit request fans into just + the ``"low"`` cap's worth of pieces, not singletons.""" + low = _GRANULARITY_LEVELS["low"] + sites = [f"S{i:02d}" for i in range(8)] + calls: list[int] = [] + + @multi_value_chunked(build_request=_fake_build, url_limit=8000) + async def fetch(args): + calls.append(len(args["monitoring_location_id"])) + return pd.DataFrame(), _ok_response() + + with chunk_granularity("low"): + fetch({"monitoring_location_id": sites}) + # low cap → that many sub-requests, together covering all 8 sites. + assert len(calls) == low + assert sum(calls) == 8